Amazon Web Services Feed
Testing data quality at scale with PyDeequ
You generally write unit tests for your code, but do you also test your data? Incoming data quality can make or break your application. Incorrect, missing, or malformed data can have a large impact on production systems. Examples of data quality issues include the following:
- Missing values can lead to failures in production system that require non-null values (
NullPointerException
) - Changes in the distribution of data can lead to unexpected outputs of machine learning (ML) models
- Aggregations of incorrect data can lead to wrong business decisions
In this post, we introduce PyDeequ, an open-source Python wrapper over Deequ (an open-source tool developed and used at Amazon). Deequ is written in Scala, whereas PyDeequ allows you to use its data quality and testing capabilities from Python and PySpark, the language of choice of many data scientists. PyDeequ democratizes and extends the power of Deequ by allowing you to use it alongside the many data science libraries that are available in that language. Furthermore, PyDeequ allows for fluid interface with Pandas DataFrames as opposed to restricting within Apache Spark DataFrames.
Deequ allows you to calculate data quality metrics on your dataset, define and verify data quality constraints, and be informed about changes in the data distribution. Instead of implementing checks and verification algorithms on your own, you can focus on describing how your data should look. Deequ supports you by suggesting checks for you. Deequ is implemented on top of Apache Spark and is designed to scale with large datasets (billions of rows) that typically live in a data lake, distributed file system, or a data warehouse. PyDeequ gives you access to this capability, but also allows you to use it from the familiar environment of your Python Jupyter notebook.
Deequ at Amazon
Deequ is used internally at Amazon to verify the quality of many large production datasets. Dataset producers can add and edit data quality constraints. The system computes data quality metrics on a regular basis (with every new version of a dataset), verifies constraints defined by dataset producers, and publishes datasets to consumers in case of success. In error cases, dataset publication can be stopped, and producers are notified to take action. Data quality issues don’t propagate to consumer data pipelines, reducing their blast radius.
Deequ is also used within Amazon SageMaker Model Monitor. Now with the availability of PyDeequ, you can use it from a broader set of environments— Amazon SageMaker notebooks, AWS Glue, Amazon EMR, and more.
Overview of PyDeequ
Let’s look at PyDeequ’s main components, and how they relate to Deequ (shown in the following diagram):
- Metrics computation – Deequ computes data quality metrics, that is, statistics such as completeness, maximum, or correlation. Deequ uses Spark to read from sources such as Amazon Simple Storage Service (Amazon S3) and compute metrics through an optimized set of aggregation queries. You have direct access to the raw metrics computed on the data.
- Constraint verification – As a user, you focus on defining a set of data quality constraints to be verified. Deequ takes care of deriving the required set of metrics to be computed on the data. Deequ generates a data quality report, which contains the result of the constraint verification.
- Constraint suggestion – You can choose to define your own custom data quality constraints or use the automated constraint suggestion methods that profile the data to infer useful constraints.
- Python wrappers – You can call each Deequ function using Python syntax. The wrappers translate the commands to the underlying Deequ calls and return their response.
Use case overview
As a running example, we use a customer review dataset provided by Amazon on Amazon S3. We intentionally follow the example in the post Test data quality at scale with Deequ to show the similarity in functionality and implementation. We begin the way many data science projects do: with initial data exploration and assessment in a Jupyter notebook.
If you’d like to follow along with a live Jupyter notebook, check out the notebook on our GitHub repo.
During the data exploration phase, you want to easily answer some basic questions about the data:
- Are the fields that are supposed to contain unique values really unique? Are there fields that are missing values?
- How many distinct categories are there in the categorical fields?
- Are there correlations between some key features?
- If there are two supposedly similar datasets (such as different categories or different time periods), are they really similar?
We also show you how to scale this approach to large-scale datasets, using the same code on an Amazon EMR cluster. This is how you’d likely do your ML training, and later as you move into a production setting.
Starting a PySpark session in a SageMaker notebook
To follow along with this post, open up a SageMaker notebook instance, clone the PyDeequ GitHub on the Sagemaker notebook instance, and run the test_data_quality_at_scale.ipynb notebook from the tutorials directory from the PyDeequ repository.
Let’s install our dependencies first in a terminal window:
Next, in a cell of our SageMaker notebook, we need to create a PySpark session:
Loading data
Load the dataset containing reviews for the category Electronics
into our Jupyter notebook:
After you load the DataFrame, you can run df.printSchema()
to view the schema of the dataset:
Data analysis
Before we define checks on the data, we want to calculate some statistics on the dataset; we call them metrics. As with Deequ, PyDeequ supports a rich set of metrics. For more information, see Test data quality at scale with Deequ or the GitHub repo. In the following example, we use the AnalysisRunner to capture the metrics you’re interested in:
The following table summarizes our findings.
Name | Instance | Value |
ApproxCountDistinct |
review_id |
3010972 |
Completeness |
review_id |
1 |
Compliance |
top star_rating |
0.74941 |
Correlation |
helpful_votes,total_votes |
0.99365 |
Correlation |
total_votes,star_rating |
-0.03451 |
Mean |
star_rating |
4.03614 |
Size |
* |
3120938 |
From this, we learn the following:
review_id
has no missing values and approximately 3,010,972 unique values- 9% of reviews have a
star_rating
of 4 or higher total_votes
andstar_rating
are not correlatedhelpful_votes
andtotal_votes
are strongly correlated- The average
star_rating
is 4.0 - The dataset contains 3,120,938 reviews
Defining and running tests for data
After analyzing and understanding the data, we want to verify that the properties we have derived also hold for new versions of the dataset. By defining assertions on the data distribution as part of a data pipeline, we can ensure that every processed dataset is of high quality, and that any application consuming the data can rely on it.
For writing tests on data, we start with the VerificationSuite and add checks on attributes of the data. In this example, we test for the following properties of our data:
- At least 3 million rows in total
review_id
is never NULLreview_id
is uniquestar_rating
has a minimum of 1.0 and maximum of 5.0marketplace
only containsUS
,UK
,DE
,JP
, orFR
year
does not contain negative values
This is the code that reflects the previous statements. For information about all available checks, see the GitHub repo. You can run this directly in the Spark shell as previously explained:
After calling run()
, PyDeequ translates your test description into Deequ, which translates it into a series of Spark jobs that are run to compute metrics on the data. Afterwards, it invokes your assertion functions (for example, lambda x: x == 1.0
for the minimum star rating check) on these metrics to see if the constraints hold on the data. The following table summarizes our findings.
Constraint | constraint_status | constraint_message |
SizeConstraint(Size(None)) |
Success |
|
MinimumConstraint(Minimum(star_rating,None)) |
Success |
|
MaximumConstraint(Maximum(star_rating,None)) |
Success |
|
CompletenessConstraint(Completeness(review_id,None)) |
Success |
|
UniquenessConstraint(Uniqueness(List(review_id))) |
Failure |
Value: 0.9926566948782706 does not meet the constraint requirement! |
CompletenessConstraint(Completeness(marketplace,None)) |
Success |
|
ComplianceConstraint(Compliance(marketplace contained in US,UK,DE,JP,FR,marketplace IS NULL OR marketplace IN (‘US’,’UK’,’DE’,’JP’,’FR’),None)) |
Success |
|
ComplianceConstraint(Compliance(year is non-negative,COALESCE(year, 0.0) >= 0,None)) |
Success |
Interestingly, the review_id
column isn’t unique, which resulted in a failure of the check on uniqueness. We can also look at all the metrics that Deequ computed for this check by running the following:
The following table summarizes our findings.
Name | Instance | Value |
Completeness | review_id |
1 |
Completeness | marketplace |
1 |
Compliance | marketplace contained in US,UK,DE,JP,FR |
1 |
Compliance | year is non-negative |
1 |
Maximum | star_rating |
5 |
Minimum | star_rating |
1 |
Size | * |
3120938 |
Uniqueness | review_id |
0.99266 |
Automated constraint suggestion
If you own a large number of datasets or if your dataset has many columns, it may be challenging for you to manually define appropriate constraints. Deequ can automatically suggest useful constraints based on the data distribution. Deequ first runs a data profiling method and then applies a set of rules on the result. For more information about how to run a data profiling method, see the GitHub repo.
The result contains a list of constraints with descriptions and Python code, so that you can directly apply it in your data quality checks. Call print(json.dumps(result_json))
to inspect the suggested constraints; the following table shows a subset.
Column | Constraint | Python code |
customer_id |
customer_id is not null |
.isComplete("customer_id") |
customer_id |
customer_id has type Integral |
.hasDataType("customer_id", ConstrainableDataTypes.Integral) |
customer_id |
customer_id has no negative values |
.isNonNegative("customer_id") |
helpful_votes |
helpful_votes is not null |
.isComplete("helpful_votes") |
helpful_votes |
helpful_votes has no negative values |
.isNonNegative("helpful_votes") |
marketplace |
marketplac e has value range “US”, “UK”, “DE”, “JP”, “FR” |
.isContainedIn("marketplace", ["US", "UK", "DE", "JP", "FR"]) |
product_title |
product_title is not null |
.isComplete("product_title") |
star_rating |
star_rating is not null |
.isComplete("star_rating") |
star_rating |
star_rating has no negative values |
.isNonNegative("star_rating") |
vine |
vine has value range “N”, “Y” |
.isContainedIn("vine", ["N", "Y"]) |
You can explore the other tutorials in the PyDeequ GitHub repo.
Scaling to production
So far, we’ve shown you how to use these capabilities in the context of data exploration using a Jupyter notebook running on a SageMaker notebook instance. As your project matures, you need to use the same capabilities on larger and larger datasets, and in a production environment. With PyDeequ, it’s easy to make that transition. The following diagram illustrates deployment options for local and production purposes on AWS.
Amazon EMR and AWS Glue interface with PyDeequ through the PySpark drivers that PyDeequ utilizes as its main engine. PyDeequ can run as a PySpark application in both contexts when the Deequ JAR is added the Spark context. You can run PyDeequ’s data validation toolkit after the Spark context and drivers are configured and your data is loaded into a DataFrame. We describe the Amazon EMR configuration options and use cases in this section (configurations 2 and 3 in the diagram).
Data exploration from a SageMaker notebook via an EMR cluster
As shown in configuration 2 in the diagram, you can connect to an EMR cluster from a SageMaker notebook to run PyDeequ. This enables you to explore much larger volumes of data than you can using a single notebook. Your Amazon EMR cluster must be running Spark v2.4.6, available with Amazon EMR version 5.31 or higher, in order to work with PyDeequ. After you have a running cluster that has those components and a SageMaker notebook, you configure a SparkSession
object using the following template to connect to your cluster. For more information about connecting a SageMaker notebook to Amazon EMR or the necessary IAM permissions, see Submitting User Applications with spark-submit.
In the SageMaker notebook, run the following JSON in a cell before you start your SparkSession
to configure your EMR cluster:
Start your SparkSession
object in a cell after the preceding configuration by running spark
. Then install PyDeequ onto your EMR cluster using the SparkContext
(default named sc
) with the following command:
Now you can start using PyDeequ from your notebook to run the same statements as before, but with much larger volumes of data.
Running a transient EMR cluster
Another way to leverage the power of an EMR cluster is to treat it as a transient cluster and run it in a headless configuration, as shown in configuration 3 in the diagram. We use spark-submit
in an EMR add-step
to run PyDeequ on Amazon EMR. For each of the following steps, make sure to replace the values in brackets accordingly.
- Create a bootstrap shell script and upload it to an S3 bucket. The following code is an example of
pydeequ-emr-bootstrap.sh:
- Create an EMR cluster via the AWS Command Line Interface (AWS CLI):
- Create your PySpark PyDeequ run script and upload into Amazon S3. The following code is our example of
pydeequ-test.py
:
- When your cluster is running and in the
WAITING
stage, submit your Spark job to Amazon EMR via the AWS CLI:
Congratulations, you have now submitted a PyDeequ PySpark job to Amazon EMR. Give the job a few minutes to run, after which you can view your results at the S3 output path specified on the last line of pydeequ-test.py
.
Afterwards, remember to clean up your results and spin down the EMR cluster using the following command:
Now you can use Amazon EMR to process large datasets in batch using PyDeequ to plug into your pipelines and provide scalable tests on your data.
More examples on GitHub
You can find examples of more advanced features on the Deequ GitHub page:
- Deequ provides more than data quality checks with fixed thresholds. Learn how to use anomaly detection on data quality metrics to apply tests on metrics that change over time.
- Deequ offers support for storing and loading metrics. Learn how to use the MetricsRepository for this use case.
- If your dataset grows over time or is partitioned, you can use Deequ’s incremental metrics computation For each partition, Deequ stores a state for each computed metric. To compute metrics for the union of partitions, Deequ can use these states to efficiently derive overall metrics without reloading the data.
Conclusion
This post showed you how to use PyDeequ for calculating data quality metrics, verifying data quality metrics, and profiling data to automate the configuration of data quality checks. PyDeequ is available via pip install
and on GitHub now for you to build your own data quality management pipeline.
Learn more about the inner workings of Deequ in the VLDB 2018 paper Automating large-scale data quality verification.
Stay tuned for another post demonstrating production workflows on AWS Glue.
About the Authors
Calvin Wang is a Data Scientist at AWS AI/ML. He holds a B.S. in Computer Science from UC Santa Barbara and loves using machine learning to build cool stuff.
Chris Ghyzel is a Data Engineer for AWS Professional Services. Currently, he is working with customers to integrate machine learning solutions on AWS into their production pipelines.
Veronika Megler, PhD, is Principal Data Scientist for Amazon.com Consumer Packaging. Until recently she was the Principal Data Scientist for AWS Professional Services. She enjoys adapting innovative big data, AI, and ML technologies to help companies solve new problems, and to solve old problems more efficiently and effectively. Her work has lately been focused more heavily on economic impacts of ML models and exploring causality.