AWS Feed
Effective data lakes using AWS Lake Formation, Part 2: Creating a governed table for streaming data sources
We announced the preview of AWS Lake Formation transactions, row-level security, and acceleration at AWS re:Invent 2020. In Part 1 of this series, we explained how to set up a governed table and add objects to it. In this post, we expand on this example, and demonstrate how to ingest streaming data into governed tables using Lake Formation transactions.
In typical streaming use cases, new data is continuously ingested into a data lake. This creates many small files that may impact query performance. In addition, a common requirement is to isolate queries from updates without requiring downtime or having to wait a long time for the updates to settle. Finally, some use cases such as periodic reports and iterative machine learning training require the ability to go back in time to a specific timestamp and query against it.
Lake Formation transactions provide an easy mechanism to add streaming datasets to your data lake while isolating any in-process queries. This ensures that queries can run concurrently without latency. Lake Formation acceleration automatically compacts small files to improve query performance.
In this post, we demonstrate how to ingest streaming data into your data lake using Lake Formation transactions in real time.
Architecture overview
AWS CloudTrail is an AWS service that helps you enable governance, compliance, and operational and risk auditing of your AWS account. Actions taken by a user, role, or an AWS service are recorded as events in CloudTrail. We relay these CloudTrail events into Amazon Kinesis Data Streams, process this data stream using an AWS Glue streaming job, and store the data in Amazon Simple Storage Service (Amazon S3) using AWS Lake Formation transactions.
The following diagram shows the solution architecture.
Setting up resources with AWS CloudFormation
This post provides an AWS CloudFormation template to get you started quickly. You can review and customize it to suit your needs. Some of the resources deployed by this stack incur costs when in use. If you prefer to set up these resources manually using the AWS Management Console, see the instructions in the appendix at the end of this post.
The CloudFormation template generates the following resources:
- An AWS Lambda function (for Lambda-backed AWS CloudFormation custom resources)
- An S3 bucket
- AWS Identity and Access Management (IAM) users, roles, and policies
- Lake Formation data lake settings and permissions
- Kinesis Data Streams
- An Amazon EventBridge rule
- An AWS Glue ETL job
When following the steps in this section, use the Region us-east-1
because as of this writing, this Lake Formation preview feature is available only in us-east-1
.
To create these resources, complete the following steps:
- Sign in to the CloudFormation console in
us-east-1
Region. - Choose Launch Stack:
- Choose Next.
- For DatalakeAdminUserNameand DatalakeAdminUserPassword, enter your IAM user name and password for data lake admin user.
- For DataAnalystUserNameand DataAnalystUserPassword, enter your IAM user name and password for data analyst user.
- For DataLakeBucketName, enter the name of your data lake bucket.
- For DatabaseName and TableName, leave as the default.
- Choose Next.
- On the next page, choose Next.
- Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
- Choose Create.
Stack creation should take approximately 3 minutes.
Setting up a governed table
Now you can create and configure your governed table in Lake Formation.
Creating a governed table
To create your governed table, complete the following steps:
- Sign in to the Lake Formation console in
us-east-1
Region using theDatalakeAdmin2
user. - Choose Tables.
- Choose Create table.
- For Name, enter
cloudtrail_governed
. - For Database, enter
lakeformation_tutorial_cloudtrail
. - Select Enable governed data access and management.
- Select Enable row based permissions.
- For Data is located in, choose Specified path in my account.
- Enter the path
s3://<datalake-bucket>/data/cloudtrail_governed/
. - For Classification, choose PARQUET.
- Choose Upload Schema.
- Enter the following JSON array into the text box:
- Choose Upload.
We now add two partition columns: eventsource and eventname.
- Choose Add column.
- For Column name, enter
eventsource
. - For Data type, choose String.
- Select Partition Key.
- Choose Add.
- Choose Add column.
- For Column name, enter
eventname
. - For Data type, choose String.
- Select Partition Key.
- Choose Add.
- Choose Submit.
Configuring Lake Formation permissions
You need to grant Lake Formation permissions for your governed table. Complete the following steps:
Table-level permissions
- Sign in to the Lake Formation console in
us-east-1
Region using theDatalakeAdmin2
user. - Under Permissions, choose Data permissions.
- Under Data permission, choose Grant.
- For Database, choose
lakeformation_tutorial_cloudtrail
. - For Table, choose
cloudtrail_governed
. - For IAM users and roles, choose the roles
GlueETLServiceRole-<CloudFormation stack name>
andLFRegisterLocationServiceRole-<CloudFormation stack name>
, and the userDatalakeAdmin2
. - Select Table permissions.
- Under Table permissions, select Alter, Insert, Drop, Delete, Select, and Describe.
- Choose Grant.
- Under Data permission, choose Grant.
- For Database, choose
lakeformation_tutorial_cloudtrail
. - For Table, choose
cloudtrail_governed
. - For IAM users and roles, choose the user
DataAnalyst2
. - Under Table permissions, select Select and Describe.
- Choose Grant.
Row-level permissions
Previously, we enabled row-level permissions on the table cloudtrail_goverend
. You need to grant row-level permissions to IAM roles and users so that your Glue job and Lake Formation compaction can process the table and your IAM users can read the table in Athena queries.
- Under Permissions, choose Data permissions.
- Under Data permission, choose Grant.
- For Database, choose
lakeformation_tutorial_cloudtrail
. - For Table, choose
cloudtrail_governed
. - For IAM users and roles, choose the roles
GlueETLServiceRole-<CloudFormation stack name>
andLFRegisterLocationServiceRole-<CloudFormation stack name>
, the usersDatalakeAdmin2
andDataAnalyst2
. - Select Row-based permissions.
- For Filter name, enter
allowAll
. - For Choose filter type, select Allow access to all rows.
- Choose Grant.
Data location permissions
- Under Permissions, choose Data locations.
- Under Data locations, choose Grant.
- For IAM users and roles, choose the role
GlueETLServiceRole-<CloudFormation stack name>
. - For Storage location, enter
s3://<datalake-bucket>/
. - Choose Grant.
Adding table objects into the governed table
If you want to copy data from your existing tables (such as JDBC or Amazon S3) into a governed table, it’s a good idea to run a standard AWS Glue job to load the data. On the other hand, if you want to stream data from your streaming sources (Kinesis or Kafka) into a governed table, it’s a good idea to run an AWS Glue streaming job to load the data.
Even if you place your data in the table location of the governed table, the data isn’t recognized yet. To make the governed table aware of the data, you need add the files containing your data to the governed table using Lake Formation transactions.
To add S3 objects to a governed table, call the UpdateTableObjects
API for the objects. You can call it using the AWS Command Line Interface (AWS CLI) and SDK, and also the AWS Glue ETL library (the API is called implicitly in the library). For this post, we use an AWS Glue Streaming job to ingest CloudTrail data from Kinesis Data Streams into a governed table using the AWS Glue ETL library to put objects into Amazon S3 and to call the UpdateTableObjects
API for the S3 objects implicitly. Let’s run the AWS Glue job cloudtrail_ingestion-<CloudFormation stack name>
, which is automatically created in the CloudFormation template.
- On the AWS Glue console, choose Jobs.
- Select the job named
cloudtrail_ingestion-<CloudFormation stack name>
. - On the Actions menu, choose Run job.
This job reads from the data stream, and writes into the governed table cloudtrail_governed
at 100-second intervals. After a few minutes, you can see new files being written into the data lake bucket.
Querying a governed table using Amazon Athena
Now everything is ready! Let’s start querying the governed table using Amazon Athena.
If it’s your first-time running queries on Athena, you need to configure the query result location. For more information, see Specifying a Query Result Location.
To utilize Lake Formation preview features, you need to create a special workgroup named AmazonAthenaLakeFormationPreview
, and join the workgroup. For more information, see Managing Workgroups.
Running a simple query
Sign in to the Athena console in us-east-1
Region using the DataAnalyst2
user. First, let’s preview the table by querying 10 records stored in the governed table:
The following screenshot shows the query result.
Running an analytic query
Next, let’s run an analytic query with aggregation to simulate real-world use cases:
The following screenshot shows the results. This query returns the total API calls per eventsource
and eventname
.
Running an analytic query with time travel
With governed tables, you can go back in time to a specifc timestamp and query against it. You can run Athena queries against governed tables that include a timestamp to target the state of the data at a particular date and time.
To submit a time travel query in Athena, add a WHERE
clause that sets the column __asOfDate
to the epoch time (long integer) representation of the required date and time.
Retrieve epoch time which is 5 minutes ago from now, you can run the following commands.
The command for Linux (GNU date command):
The command for OSX (BSD date command):
Let’s run the time travel query: (replace <epoch-milliseconds> with the timestamp you got).
The following screenshot shows the query result. The number of TotalEvents
is less than that in the previous result because this result is pointing to the older timestamp.
Cleaning up
Now to the final step, cleaning up the resources.
- Empty the Amazon S3 data lake bucket, then delete the bucket.
- Delete the CloudFormation stack. The governed table you created is also automatically deleted in stack deletion.
- Delete the Athena workgroup
AmazonAthenaLakeFormationPreview
.
Conclusion
In this post, we explained how to create a Lake Formation governed table with CloudTrail data coming from EventBridge via Kinesis Data Streams. In addition, we explained how to query against governed tables and how to run time travel queries for governed tables. With Lake Formation governed tables, you can achieve transactions, row-level security, and query acceleration. In part 3 of the series we will show you how to use ACID transactions on governed tables to perform multiple operations atomically and enforce isolation between operations.
Lake Formations transactions, row-level security, and acceleration are currently available for preview in the US East (N. Virginia) AWS Region. To get early access to these capabilities, sign up for the preview. You need to be approved for the preview to gain access to these features.
Appendix 1: Setting up resources via the console
Configuring IAM roles and IAM users
First, you need to set up two IAM roles: one for AWS Glue ETL jobs, another for the Lake Formation data lake location.
IAM policies
To create the IAM policies for your roles complete the following steps:
- On the IAM console, create a new policy for Amazon S3.
- Save the IAM policy as S3DataLakePolicy as follows (replace <datalake-bucket> with your bucket name):
- Create the new IAM policy named
LFTransactionPolicywith
the following statements:: - Create a new IAM policy named
LFLocationPolicy
with the following statements: - Create a new IAM policy named
LFQueryPolicy
with the following statements:
IAM role for AWS Glue ETL job
Create a new IAM role to run AWS Glue ETL jobs.
- On the IAM console, create the role
GlueETLServiceRole
with the following AWS Glue trust relationship for AWS Glue: - Attach the following AWS managed policies:
AWSGlueServiceRole
AWSLakeFormationDataAdmin
AmazonKinesisReadOnlyAccess
- Attach the following customer managed policies:
S3DataLakePolicy
LFTransactionPolicy
IAM role for AWS Lake Formation data lake location
To create your IAM role for Lake Formation, complete the following steps:
- Create a new IAM Role called
LFRegisterLocationServiceRole
with the following Lake Formation trust relationship: - Attach the following customer managed policies:
S3DataLakePolicy
LFLocationPolicy
This role is used to register locations with Lake Formation which then performs credential vending for Athena at query time.
IAM users
To create your IAM users, complete the following steps:
- Create an IAM user named
DatalakeAdmin2
. - Attach the following AWS managed policies:
AWSLakeFormationDataAdmin
AmazonAthenaFullAccess
IAMReadOnlyAccess
- Attach the customer managed policy
LFQueryPolicy
. - Create an IAM user named
dataAnalyst2
that can use Athena to query data. - Attach the AWS managed policy
AmazonAthenaFullAccess
. - Attach the customer managed policy
LFQueryPolicy
.
Configuring Kinesis Data Streams
To create your data stream, complete the following steps:
- On the Kinesis Data Streams console, choose Create data stream.
- For Data stream name, enter
cloudtrail
. - For Number of open shards, enter 2.
- Choose Create data stream.
Configuring EventBridge
For this post, we set up an EventBridge rule to capture CloudTrail API logs and relay them to Kinesis Data Streams.
- On the EventBridge console, choose Rules.
- Choose Create rule.
- For Name, enter
cloudtrail-to-kinesis
. - Select Event pattern.
- For Event matching pattern, choose Custom pattern.
- For Event pattern, enter the following JSON (replace <account-id>with your AWS account ID):
- For Target, choose Kinesis stream.
- For Stream, choose
cloudtrail
. - For Configure input, choose Part of the matched event.
- Enter
$.detail
. - Choose Create.
Creating a table for the data stream in the Glue Data Catalog
In this step, you create a table for your data stream in the AWS Glue Data Catalog.
- On the AWS Glue console, choose Tables.
- Choose Add tables.
- Select Add table manually.
- For Table name, enter
cloudtrail_kinesis
. - For Database, enter
lakeformation_tutorial_cloudtrail
. - Choose Next.
- For Select the type of source, choose Kinesis.
- For Stream name, enter
cloudtrail
. - For Kinesis source URL, enter https://kinesis.us-east-1.amazonaws.com.
- Choose Next.
- For Classification, choose JSON.
- Choose Next.
- Choose Add column.
- For Column name, enter
eventversion
. - For Column type, choose
string
. - Choose Add.
- Repeat the step 13-16 for the following columns.
- Column name:
useridentity
, Column type:struct
, StructSchema:STRUCT<type:STRING,principalid:STRING,arn:STRING,accountid:STRING,invokedby:STRING,accesskeyid:STRING,userName:STRING,sessioncontext:STRUCT<attributes:STRUCT<mfaauthenticated:STRING,creationdate:STRING>,sessionissuer:STRUCT<type:STRING,principalId:STRING,arn:STRING,accountId:STRING,userName:STRING>>>
- Column name:
eventtime
, Column type:string
- Column name:
eventsource
, Column type:string
- Column name:
eventname
, Column type:string
- Column name:
sourceipaddress
, Column type:string
- Column name:
useragent
, Column type:string
- Column name:
errorcode
, Column type:string
- Column name:
errormessage
, Column type:string
- Column name:
requestparameters
, Column type:string
- Column name:
responseelements
, Column type:string
- Column name:
additionaleventdata
, Column type:string
- Column name:
requestid
, Column type:string
- Column name:
eventid
, Column type:string
- Column name:
resources
, Column type:array
, ArraySchema:ARRAY<STRUCT<ARN:STRING,accountId:STRING,type:STRING>>
- Column name:
eventtype
, Column type:string
- Column name:
apiversion
, Column type:string
- Column name:
readonly
, Column type:boolean
- Column name:
recipientaccountid
, Column type:string
- Column name:
serviceeventdetails
, Column type:string
- Column name:
sharedeventid
, Column type:string
- Column name:
vpcendpointid
, Column type:string
- Column name:
- Click Next button
- Click Finish button
Configuring Lake Formation
You can follow the following steps to configure Lake Formation permissions:
Data lake settings
- On the Lake Formation console, under Permissions, choose Admins and database creators.
- In the Data lake administrators section, choose Grant.
- For IAM users and roles, choose your IAM user
DatalakeAdmin2
. - Choose Save.
- In the Database creatorssection, choose Grant.
- For IAM users and roles, choose the
LFRegisterLocationServiceRole
. - Select Create Database.
- Choose Grant.
Data lake locations
- Under Register and ingest, choose Data lake locations.
- Choose Register location.
- For Amazon S3 path, enter
s3://<datalake-bucket>/
. This needs to be the same bucket you listed inLFLocationPolicy
. Lake Formation uses this role to vend temporary Amazon S3 credentials to query services that need read/write access to the bucket and all prefixes under it. - For IAM role, choose the
LFRegisterLocationServiceRole
. - Choose Register location.
Data catalog settings
- Under Data catalog, choose Settings.
- Make sure that both check boxes for Use only IAM access control for new databases and Use only IAM access control for new tables in new databases are deselected.
- Under Data catalog, choose Databases.
- Choose Create database.
- Select Database.
- For Name, enter
lakeformation_tutorial_cloudtrail
. - Choose Create database.
Database-level permissions
- Under Permissions, choose Data permissions.
- Under Data permission, choose Grant.
- For Database, choose
lakeformation_tutorial_cloudtrail
. - For IAM users and roles, choose the role
GlueETLServiceRole
. - Select Database permissions.
- Under Database permissions, select Create Table, Alter, Drop, and Describe..
- Choose Grant.
This grants permission on the role GlueETLServiceRole
to create and alter tables in the database which it needs to create a governed table.
Table-level permissions
- Under Permissions, choose Data permissions.
- Under Data permission, choose Grant.
- For Database, choose
lakeformation_tutorial_cloudtrail
. - For Table, choose
cloudtrail_kinesis
. - For IAM users and roles, choose the role
GlueETLServiceRole
. - Select Table permissions.
- Under Table permissions, select Select and Describe.
- Choose Grant.
Creating an AWS Glue streaming job
To create your AWS Glue streaming job, complete the following steps:
- On the AWS Glue console, choose Jobs.
- Choose Add job.
- For Name, enter
cloudtrail_ingestion
. - For IAM role¸ choose
GlueETLServiceRole
. - For Type, choose Spark Streaming.
- For Glue version, choose Spark 2.4, Python 3 with improved job startup times (Glue Version 2.0).
- For This job runs, choose A new script authored by you.
- For S3 path where the script is stored, enter
s3://aws-glue-scripts-
<account-id>-us-east-1/lakeformation_tutorial_cloudtrail
. - For Temporary directory, enter
s3://aws-glue-temporary-
<account-id>-us-east-1/lakeformation_tutorial_cloudtrail
. - Under Security configuration, script libraries, and job parameters (optional), for Job parameters, provide the following:
- –datalake_bucket_name:<datalake-bucket>
- –database_name:
lakeformation_tutorial_cloudtrail
- –src_table_name:
cloudtrail_kinesis
- –dst_table_name:
cloudtrail_governed
- Choose Next.
- Choose Save job and edit script. (don’t choose any connections here)
- Enter the following code:
# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
import sys
from awsglue.transforms import ApplyMapping
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import DataFrame, Row
from awsglue.dynamicframe import DynamicFrameCollection
from awsglue.dynamicframe import DynamicFrame
from awsglue import DynamicFrame """
Job Parameters
----------
datalake_bucket_name : string S3 bucket name for storing a governed table
database_name : string A Lake Formation database which stores a governed table and the source Kinesis table
src_table_name : string A Glue table name for a Kinesis Data Stream
dst_table_name : string A Lake Formation governed table name
"""
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'datalake_bucket_name', 'database_name', 'src_table_name', 'dst_table_name'])
datalake_bucket_name = args["datalake_bucket_name"]
database_name = args["database_name"]
src_table_name = args["src_table_name"]
dst_table_name = args["dst_table_name"] sc = SparkContext()
glue_context = GlueContext(sc)
spark = glue_context.spark_session
job = Job(glue_context)
job.init(args['JOB_NAME'], args) def transaction_write(context, dfc) -> DynamicFrameCollection: """Write DynamicFrame into a governed table using Lake Formation transactions It is called in a processBatch method per a batch, so it makes one transaction per a batch. """ dynamic_frame = dfc.select(list(dfc.keys())[0]) tx_id = context.begin_transaction(read_only=False) sink = context.getSink( connection_type="s3", path=f"s3://{datalake_bucket_name}/data/{dst_table_name}/", enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE", partitionKeys=["eventsource", "eventname"], transactionId=tx_id ) sink.setFormat("glueparquet") sink.setCatalogInfo(catalogDatabase=database_name, catalogTableName=dst_table_name) try: sink.writeFrame(dynamic_frame) context.commit_transaction(tx_id) except Exception: context.abort_transaction(tx_id) raise return dfc # Create Spark DataFrame from the source Kinesis table
data_frame_kinesis = glue_context.create_data_frame.from_catalog( database=database_name, table_name=src_table_name, transformation_ctx="data_frame_kinesis", additional_options={ "startingPosition": "TRIM_HORIZON", "inferSchema": "true" }
) # Applying a mapping to drop useridentity and resources column because struct and array are not supported yet.
cloudtrail_mappings = [ ("eventversion", "string", "eventversion", "string"), ("eventtime", "string", "eventtime", "string"), ("eventsource", "string", "eventsource", "string"), ("eventname", "string", "eventname", "string"), ("awsregion", "string", "awsregion", "string"), ("sourceipaddress", "string", "sourceipaddress", "string"), ("useragent", "string", "useragent", "string"), ("errorcode", "string", "errorcode", "string"), ("errormessage", "string", "errormessage", "string"), ("requestid", "string", "requestid", "string"), ("eventid", "string", "eventid", "string"), ("eventtype", "string", "eventtype", "string"), ("apiversion", "string", "apiversion", "string"), ("readonly", "boolean", "readonly", "string"), ("recipientaccountid", "string", "recipientaccountid", "string"), ("sharedeventid", "string", "sharedeventid", "string"), ("vpcendpointid", "string", "vpcendpointid", "string")
] def processBatch(data_frame, batchId): """Process each batch triggered by Spark Structured Streaming. It is called per a batch in order to read DataFrame, apply the mapping, and call TransactionWrite method """ if data_frame.count() > 0: dynamic_frame = DynamicFrame.fromDF(data_frame, glue_context, "from_data_frame") dynamic_frame_apply_mapping = ApplyMapping.apply( frame=dynamic_frame, mappings=cloudtrail_mappings, transformation_ctx="apply_mapping" ) dynamic_frame_collection = transaction_write( glue_context, DynamicFrameCollection({"dynamic_frame": dynamic_frame_apply_mapping}, glue_context) ) # Read from the DataFrame coming via Kinesis, and run processBatch method for batches in every 100 seconds glue_context.forEachBatch( frame=data_frame_kinesis, batch_function=processBatch, options={ "windowSize": "100 seconds", "checkpointLocation": f"{args['TempDir']}/checkpoint/{database_name}/{src_table_name}/" }
)
job.commit()
- Choose Save.
- Choose Run.
Appendix 2: Set up to have more CloudTrail data to simulate larger data lake
You can choose from two different approaches to generate more events in CloudTrail to simulate a larger data lake:
- Enable an S3 data event in CloudTrail
- Set up an EventBridge event bus with other accounts where there are lots of events
About the Author
Noritaka Sekiyama is a Senior Big Data Architect at AWS Glue & Lake Formation. His passion is implementing software artifacts including libraries, utilities, connectors, tools, docs, and samples that help customers to build their data lakes easily and efficiently. Outside of work, he enjoys learning new technologies, watching anime, and playing with his children.