AWS Feed
Orchestrate AWS Glue DataBrew jobs using Amazon Managed Workflows for Apache Airflow
As the industry grows with more data volume, big data analytics is becoming a common requirement in data analytics and machine learning (ML) use cases. Analysts are building complex data transformation pipelines that include multiple steps for data preparation and cleansing. However, analysts may want a simpler orchestration mechanism with a graphical user interface that can scale for larger data volume and is easy to maintain. To support these requirements, you can use AWS Glue DataBrew for data preparation and Amazon Managed Workflows for Apache Airflow (Amazon MWAA) for orchestrating workflows. In this post, we discuss configuring this integration.
Glue DataBrew is a new visual data preparation tool that helps you clean and normalize data without writing code. Analysts can choose from over 250 ready-made transformations to automate data preparation tasks, such as filtering anomalies, converting data to standard formats, and correcting invalid values. Glue DataBrew is serverless, which helps analysts focus on exploring and transforming terabytes of raw data without needing to create clusters or manage any infrastructure. This reduces the time it takes to prepare data for analytics and ML by up to 80% compared to traditional approaches to data preparation.
Amazon MWAA is a fully managed service that makes it easy to run the open-source version of Apache Airflow on AWS and build workflows to run extract, transform, and load (ETL) jobs and data pipelines.
Prerequisites
Before you get started, make sure you have the following prerequisites:
- A basic understanding of Amazon Athena to run SQL queries
- A basic understanding of Amazon Simple Storage Service (Amazon S3)
- Permissions to create the DataBrew project and jobs, S3 buckets, Athena tables, and use Amazon MWAA
- An AWS Identity and Access Management (IAM) role that Airflow can assume to access DataBrew, Athena, and Amazon S3 resources (see Adding and removing IAM identity permissions for reference)
Data preparation
To illustrate the orchestration of DataBrew jobs using Amazon MWAA, we use the following publicly available datasets:
- Yellow taxi and green taxi trip records – Trip records that include fields capturing pick-up and drop-off dates and times, pick-up and drop-off locations, trip distances, and payment types
- Taxi zone lookup dataset – This dataset includes location IDs and their zone details
You should download these datasets to use in the following steps.
We create input and output S3 buckets with subfolders to capture the yellow taxi data, green taxi data, and lookup data. Then we upload the input data into the input S3 bucket in their respective folders. The following table outlines the overall structure in Amazon S3.
S3 Input Bucket Structure | S3 Output Bucket Structure | Airflow Bucket Structure |
Solution overview
To implement our solution, we first create a DataBrew project and DataBrew jobs for data preparation and cleansing. Then we integrate the DataBrew jobs with Amazon MWAA and schedule the workflow. We can validate the table data in Athena. The following diagram illustrates the architecture of this solution.
Create a DataBrew dataset for taxi lookup
After you upload the public datasets to the input S3 bucket (input-bucket-name
in respective folders), you can create a DataBrew dataset for the taxi lookup data.
- On the DataBrew console, choose Datasets.
- Choose Connect new dataset.
- For Dataset Name, enter a name (for this post,
ny-taxi-lookup
). - Connect to the
taxi_lookup.csv
public dataset. - Choose Create dataset.
Create a DataBrew project for green taxi data
To illustrate a DataBrew project with simple transformation logic, we create a recipe with the following steps:
- Use a zone lookup to pull location names based on
PULocationID
- Rename the column
Zone
topickup_zone
. - Use a zone lookup to pull location names based on
DOLocationID
- Rename the column
lpep_pickup_datetime
topickup_datetime
- Rename the column
lpep_dropoff_datetime
todropoff_datetime
- Add the new column
partition_column
using thedateTime
function
To create a DataBrew project for green taxi data, complete the following steps:
- On the DataBrew console, choose Projects.
- Choose Create project.
- For Project name, enter a name (for this post,
green-taxi
). - Under Select a dataset, select New dataset.
- For Enter your source from S3, enter the S3 input bucket path of the green taxi input CSV file.
- Under Permissions, for Role name, choose an IAM role that allows DataBrew to read and write from your input S3 bucket.
You can choose a role if you already created one, or create a new one.
- Choose Create Project.
The dataset can take approximately 2 minutes to load.
- After the dataset is loaded, choose Join.
- For Select Dataset, choose ny-taxi-lookup and choose Next.
- Choose Left join.
- For Table A, choose PULocationID.
- For Table B, choose LocationID.
- Under Column list, choose the following columns :
vendorid
,lpep_pickup_datetime
,lpep_dropoff_datetime
,passenger_count
,trip_distance
,total_amount
,PULocationID
,DOLocationID
, andZone
. - Choose Finish.
This process adds the first step in the recipe (use zone lookup to pull the location name based on PULocationID
).
- Choose Add step to add another transformation.
- Under Column Actions, choose Rename.
- For Source column, choose Zone.
- For New column name, enter
pickup_zone
. - Choose Apply.
This process adds the second step in the recipe (rename the column Zone
to pickup_zone
).
- Add the remaining steps to the recipe:
- Use zone lookup to pull the location name based on
DOLocationID
. - Under Column list, select limited list of columns from Table B. Here is list of the columns
vendorid
,lpep_pickup_datetime
,lpep_dropoff_datetime
,passenger_count
,trip_distance
,total_amount
,PULocationID
,DOLocationID
, andZone
. - Rename column
Zone
todropoff_zone
. - Rename column
lpep_pickup_datetime
topickup_datetime
. - Rename column
lpep_dropoff_datetime
todropoff_datetime
.
- Use zone lookup to pull the location name based on
We now add a new column named partition_column
.
- On the Functions menu, under Date functions, choose DATETIME.
- Create a new column called
partition_column
to use for partitioning in future steps. - Provide a description of the recipe version and choose Publish.
Create a DataBrew job for green taxi data
Now that our recipe is ready, we create a DataBrew job for our dataset.
- On the DataBrew console, choose Jobs.
- Choose Create job.
- For Job name¸ enter a name (for example,
green-taxi-job
). - For Job type, select Create a recipe job.
- For Run on, select Project.
- For Select a project, search for and choose your project (
green-taxi
). - Under Job output settings¸ for File type, choose your final storage format PARQUET (other options are available).
- For S3 location, enter your final S3 output bucket path.
- For Compression, choose Snappy (other options are available).
- Under Additional configurations, for Custom partition by column values, choose partition_column and choose Add.
- For File output storage, select Replace output files for each job run (for our use case, we want to do a full refresh).
- Under Permissions, for Role name¸ choose your IAM role.
- Choose Create job.
Now the job is ready. You can choose the job on the Jobs page, choose View Details, and view the details on the Data Lineage tab.
Repeat the steps from the green taxi sections to create a DataBrew project, job, and the corresponding recipe steps for the yellow taxi data:
- Use zone lookup to pull location names based on
PULocationID
and additional columnsvendorid
,lpep_pickup_datetime
,lpep_dropoff_datetime
,passenger_count
,trip_distance
,total_amount
,PULocationID
,DOLocationID
, andZone
. - Rename the column Zone to
pickup_zone
- Use zone lookup to pull location names based on
DOLocationID
and additional columnsvendorid
,lpep_pickup_datetime
,lpep_dropoff_datetime
,passenger_count
,trip_distance
,total_amount
,PULocationID
,DOLocationID
, andZone
. - Rename the column Zone to
dropoff_zone
. - Rename the column
tpep_pickup_datetime
topickup_datetime
. - Rename column
tpep_dropoff_datetime
todropoff_datetime
. - Add the new column
partition_column
using thedateTime
function.
The following diagram shows the data lineage for the yellow taxi data.
Create Athena tables
Create the following external tables on the Athena console:
- yellow_taxi – Contains the latest snapshot of every day’s yellow taxi data
- green_taxi – Contains the latest snapshot of every day’s green taxi data
- aggregate_summary – Contains the historical snapshot of every day’s aggregated data
Run the following DDL statements to create yellow_taxi
, green_taxi
, and aggregated_summary
tables:
Create an Airflow DAG
Your next step is to create an Airflow Directed Acyclic Graph (DAG).
- On the Amazon S3 console, create a local file called requirements.txt with the following content:
- Upload
requirements.txt
to the S3 bucketairflow-bucket-name
. - Create an Amazon MWAA cluster. For instructions, see Introducing Amazon Managed Workflows for Apache Airflow (MWAA).
- Create the local file
ny_taxi_brew_trigger.py
with the following code and upload it to the S3 bucketairflow-bucket-name/dags
(provide the location for the Athena query results and the name of the output bucket, and update thecron
entries if you want to change the job run frequency or time):
- Wait until the cron schedule entries from the script start the job.
You can use the Airflow DAGs to check the job run status.
Choose the job to see the visual representation of the DAG.
Data validation: Query the Athena tables
You can now test your solution by querying your tables in Athena.
- Run the following query on the
yellow_taxi
table to retrieve 10 sample rows:The following screenshot shows the query results.
- Run the following query on the
green_taxi
table to retrieve 10 sample rows:The following screenshot shows the query results.
- Run the following query on the
aggregate_summary
table to retrieve 10 sample rows:The following screenshot shows the query results.
Clean up
Complete the following steps to avoid incurring future charges:
- On the Amazon MWAA console, on the Environments page, select your cluster.
- Choose Delete.
- Choose Delete again to confirm the deletion.
- On the Amazon S3 console, delete the buckets
input-bucket-name
,output-bucket-name
, andairflow-bucket-name
. - On the Athena console, run the following commands to delete the tables you created:
- On the DataBrew console, on the Jobs page, select each job and on the Actions menu, choose Delete.
- On the Projects page, select each project and on the Actions menu, choose Delete.
- Select Delete attached recipe.
- Choose Delete again to confirm each deletion.
Conclusion
This post walked you through the steps to orchestrate DataBrew jobs with Amazon MWAA and schedule the workflow. To illustrate a simple transformation logic pipeline, we used DataBrew jobs to join two datasets, rename a column, and add a new column. We also used Athena to verify the results.
You can use this solution for your own use cases and orchestrate a data transformation pipeline with DataBrew and Amazon MWAA. If you have comments or feedback, please leave them in the comments section.
About the Authors
Sundeep Kumar is a Data Architect, Data Lake at AWS, helping customers build data lake and analytics platform and solutions. When not building and designing data lakes, Sundeep enjoys listening music and playing guitar.
Rahul Sonawane is a Principal Specialty Solutions Architect-Analytics at Amazon Web Services.