Amazon Web Services Feed
Introducing message archiving and analytics for Amazon SNS
This blog post is courtesy of Sebastian Caceres (AWS Consultant, DevOps), Otavio Ferreira (Sr. Manager, Amazon SNS), Prachi Sharma and Mary Gao (Software Engineers, Amazon SNS).
Today, we are announcing the release of a message delivery protocol for Amazon SNS based on Amazon Kinesis Data Firehose. This is a new way to integrate SNS with storage and analytics services, without writing custom code.
SNS provides topics for push-based, many-to-many pub/sub messaging to help you decouple distributed systems, microservices, and event-driven serverless applications. As applications grow, so does the need to archive messages to meet compliance goals. These archives can also provide important operational and business insights.
Previously, custom code was required to create data pipelines, using general-purpose SNS subscription endpoints, such as Amazon SQS queues or AWS Lambda functions. You had to manage data transformation, data buffering, data compression, and the upload to data stores.
Overview
With the new native integration between SNS and Kinesis Data Firehose, you can send messages to storage and analytics services, using a purpose-built SNS subscription type.
Once you configure a subscription, messages published to the SNS topic are sent to the subscribed Kinesis Data Firehose delivery stream. The messages are then delivered to the destination endpoint configured in the delivery stream, which can be an Amazon S3 bucket, an Amazon Redshift table, or an Amazon Elasticsearch Service index.
You can also use a third-party service provider as the destination of a delivery stream, including Datadog, New Relic, MongoDB, and Splunk. No custom code is required to bridge the services. For more information, see Fanout to Kinesis Data Firehose streams, in the SNS Developer Guide.
The new Kinesis Data Firehose subscription type and its destinations are part of the application-to-application (A2A) messaging offering of SNS. The addition of this subscription type expands the SNS A2A offering to include the following use cases:
- Run analytics on SNS messages, using Amazon Kinesis Data Analytics, Amazon Elasticsearch Service, or Amazon Redshift as a delivery stream destination. You can use this option to gain insights and detect anomalies in workloads.
- Index and search SNS messages, using Amazon Elasticsearch Service as a delivery stream destination. From there, you can create dashboards using Kibana, a data visualization and exploration tool.
- Store SNS messages for backup and auditing purposes, using S3 as a destination of choice. You can then use Amazon Athena to query the S3 bucket for analytics purposes.
- Apply transformation to SNS messages. For example, you may obfuscate personally identifiable information (PII) or protected health information (PHI) using a Lambda function invoked by the delivery stream.
- Feed SNS messages into cloud-based application monitoring and observability tools, using Datadog, New Relic, or Splunk as a destination. You can choose this option to enrich DevOps or marketing workflows.
As with all supported message delivery protocols, you can filter, monitor, and encrypt messages.
To simplify architecture and further avoid custom code, you can use an SNS subscription filter policy. This enables you to route only the relevant subset of SNS messages to the Kinesis Data Firehose delivery stream. For more information, see SNS message filtering.
To monitor the throughput, you can check the NumberOfMessagesPublished
and the NumberOfNotificationsDelivered
metrics for SNS, and the IncomingBytes
, IncomingRecords
, DeliveryToS3.Records
and DeliveryToS3.Success
metrics for Kinesis Data Firehose. For additional information, see Monitoring SNS topics using CloudWatch and Monitoring Kinesis Data Firehose using CloudWatch.
For security purposes, you can choose to have data encrypted at rest, using server-side encryption (SSE), in addition to encrypted in transit using HTTPS. For more information, see SNS SSE, Kinesis Data Firehose SSE, and S3 SSE.
Applying SNS message archiving and analytics in a use case
For example, consider an airline ticketing platform that operates in a regulated environment. The compliance framework requires that the company archives all ticket sales for at least 5 years.
The platform is based on an event-driven serverless architecture. It has a ticket seller Lambda function that publishes an event to an SNS topic for every ticket sold. The SNS topic fans out the event to subscribed systems that are interested in processing this type of event. In the preceding diagram, two systems are interested: one focused on payment processing, and another on fraud control. Each subscribed system is invoked by an SQS queue and an event processing Lambda function.
To meet the compliance goal on data retention, the airline company subscribes a Kinesis Data Firehose delivery stream to their existing SNS topic. They use an S3 bucket as the stream destination. After this, all events published to the SNS topic are archived in the S3 bucket.
The company can then use Athena to query the S3 bucket with standard SQL to run analytics and gain insights on ticket sales. For example, they can query for the most popular flight destinations or the most frequent flyers.
Subscribing a Kinesis Data Firehose stream to an SNS topic
You can set up a Kinesis Data Firehose subscription to an SNS topic using the AWS Management Console, the AWS CLI, or the AWS SDKs. You can also use AWS CloudFormation to automate the provisioning of these resources.
We use CloudFormation for this example. The provided CloudFormation template creates the following resources:
- An SNS topic
- An S3 bucket
- A Kinesis Data Firehose delivery stream
- A Kinesis Data Firehose subscription in SNS
- Two SQS subscriptions in SNS
- Two IAM roles with access to deliver messages:
- From SNS to Kinesis Data Firehose
- From Kinesis Data Firehose to S3
To provision the infrastructure, use the following template:
---
AWSTemplateFormatVersion: '2010-09-09'
Description: Template for creating an SNS archiving use case
Resources: ticketUploadStream: DependsOn: - ticketUploadStreamRolePolicy Type: AWS::KinesisFirehose::DeliveryStream Properties: S3DestinationConfiguration: BucketARN: !Sub 'arn:${AWS::Partition}:s3:::${ticketArchiveBucket}' BufferingHints: IntervalInSeconds: 60 SizeInMBs: 1 CompressionFormat: UNCOMPRESSED RoleARN: !GetAtt ticketUploadStreamRole.Arn ticketArchiveBucket: Type: AWS::S3::Bucket ticketTopic: Type: AWS::SNS::Topic ticketPaymentQueue: Type: AWS::SQS::Queue ticketFraudQueue: Type: AWS::SQS::Queue ticketQueuePolicy: Type: AWS::SQS::QueuePolicy Properties: PolicyDocument: Statement: Effect: Allow Principal: Service: sns.amazonaws.com Action: - sqs:SendMessage Resource: '*' Condition: ArnEquals: aws:SourceArn: !Ref ticketTopic Queues: - !Ref ticketPaymentQueue - !Ref ticketFraudQueue ticketUploadStreamSubscription: Type: AWS::SNS::Subscription Properties: TopicArn: !Ref ticketTopic Endpoint: !GetAtt ticketUploadStream.Arn Protocol: firehose SubscriptionRoleArn: !GetAtt ticketUploadStreamSubscriptionRole.Arn ticketPaymentQueueSubscription: Type: AWS::SNS::Subscription Properties: TopicArn: !Ref ticketTopic Endpoint: !GetAtt ticketPaymentQueue.Arn Protocol: sqs ticketFraudQueueSubscription: Type: AWS::SNS::Subscription Properties: TopicArn: !Ref ticketTopic Endpoint: !GetAtt ticketFraudQueue.Arn Protocol: sqs ticketUploadStreamRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Sid: '' Effect: Allow Principal: Service: firehose.amazonaws.com Action: sts:AssumeRole ticketUploadStreamRolePolicy: Type: AWS::IAM::Policy Properties: PolicyName: FirehoseticketUploadStreamRolePolicy PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - s3:AbortMultipartUpload - s3:GetBucketLocation - s3:GetObject - s3:ListBucket - s3:ListBucketMultipartUploads - s3:PutObject Resource: - !Sub 'arn:aws:s3:::${ticketArchiveBucket}' - !Sub 'arn:aws:s3:::${ticketArchiveBucket}/*' Roles: - !Ref ticketUploadStreamRole ticketUploadStreamSubscriptionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - sns.amazonaws.com Action: - sts:AssumeRole Policies: - PolicyName: SNSKinesisFirehoseAccessPolicy PolicyDocument: Version: '2012-10-17' Statement: - Action: - firehose:DescribeDeliveryStream - firehose:ListDeliveryStreams - firehose:ListTagsForDeliveryStream - firehose:PutRecord - firehose:PutRecordBatch Effect: Allow Resource: - !GetAtt ticketUploadStream.Arn
To test, publish a message to the SNS topic. After the delivery stream buffer interval of 60 seconds, the message appears in the destination S3 bucket. For information on message formats, see Amazon SNS message formats in Amazon Kinesis Data Firehose destinations.
Cleaning up
After testing, avoid incurring usage charges by deleting the resources you created during the walkthrough. If you used the CloudFormation template, delete all the objects from the S3 bucket before deleting the stack.
Conclusion
In this post, we show how SNS delivery to Kinesis Data Firehose enables you to integrate SNS with storage and analytics services. The example shows how to create an SNS subscription to use a Kinesis Data Firehose delivery stream to store SNS messages in an S3 bucket.
You can adapt this configuration for your needs for storage, encryption, data transformation, and data pipeline architecture. For more information, see Fanout to Kinesis Data Firehose streams in the SNS Developer Guide.
For details on pricing, see SNS pricing and Kinesis Data Firehose pricing. For more serverless learning resources, visit Serverless Land.