Amazon S3 is easily my top pick from the vast catalog of different AWS services. Considering its features and the consistency guarantees, it really is an engineering marvel.
Since Amazon S3 is often used as a starting point for various workloads, one might need to integrate S3 events with a workflow orchestration service - like AWS StepFunctions.
This blog post will discuss ways developers might wire the Amazon S3 and AWS StepFunctions together to achieve highly scalable workflows.
Let us dive in.
All the code used in this blog post could be found in this GitHub repository.
Using S3 Notifications and AWS Lambda function
Depending on your use case, this might be the most straightforward option available to you. The integration is well documented and taught in most AWS-related courses.
AWS CDK makes setting this architecture a breeze.
const machine = new sfn.StateMachine(this, "machine", {
definition: new sfn.Pass(this, "pass")
});
const bucket = new s3.Bucket(this, "bucket", {
removalPolicy: cdk.RemovalPolicy.DESTROY
});
/**
* Handler is responsible for forwarding the S3 event to the state machine.
*/
const notificationHandler = new nodeJsLambda.NodejsFunction(
this,
"notificationHandler",
{
handler: "handler",
entry: join(__dirname, "notification-handler.ts"),
environment: {
STATE_MACHINE_ARN: machine.stateMachineArn
}
}
);
notificationHandler.addToRolePolicy(
new iam.PolicyStatement({
actions: ["states:StartExecution"],
resources: [machine.stateMachineArn],
effect: iam.Effect.ALLOW
})
);
bucket.addEventNotification(
s3.EventType.OBJECT_CREATED_PUT,
new s3Notifications.LambdaDestination(notificationHandler),
{ prefix: "uploads/" }
);
Other prolific authors from the AWS community have written many articles regarding this pattern, so I will not repeat their words here. However, I will remind you about some limitations and gotchas that you might encounter while working with S3 Notifications.
Make sure you are not going to create an infinite event loop. This often happens whenever the target AWS Lambda function causes an S3 event that triggers it again.
Keep in mind the limitations of the S3 Notifications event filtering rules. Here is a link to the relevant AWS documentation page.
Keep in mind the delivery guarantees. This point is worth considering regardless of the service. I've read countless StackOverflow questions asking about duplicate events. Consult this documentation piece regarding event delivery guarantees.
Applying the storage-first pattern
In the AWS Serverless space, more and more engineers are talking about storage-first pattern where the data/event is first persisted and then reacted upon. This "store first and react later" approach makes it easy to handle heavy loads (through event batching) and ensures resiliency.
Luckily for us, Amazon S3 integrates with Amazon SQS, making augmenting our current infrastructure a breeze.
const s3EventsQueue = ...
const notificationHandler = ...
notificationHandler.addEventSource(
new lambdaEventSources.SqsEventSource(s3EventsQueue, {
/**
* Increase this number as you please.
*/
batchSize: 1,
enabled: true
})
);
bucket.addEventNotification(
s3.EventType.OBJECT_CREATED_PUT,
// Previously we were targeting the Lambda directly.
new s3Notifications.SqsDestination(s3EventsQueue),
{ prefix: "uploads/" }
);
It is essential to handle errors correctly in the poller AWS Lambda function. Remember that if your Lambda function throws an error, the whole batch of messages will be re-queued. I have yet to discover a better way of handling this than manually deleting messages when they are processed.
If you are interested in the implementation, here is mine.
Using AWS CloudTrail and Amazon EventBridge
For the longest time, I was confident that the only way of integrating AWS S3 with Amazon StepFunctions was through S3 Notifications utilizing an intermediate AWS Lambda function.
I rejoiced when exploring AWS CloudTrail where I learned about EventSelectors and AdvancedEventSelectors. These would allow me to write Amazon S3 and Amazon EventBridge together to achieve a fully "lambda-less" architecture.
The following sections touch on the event selectors rather than the AWS CloudTrail service itself. It would be best if you were familiar with AWS CloudTrail before proceeding.
What are AWS CloudTrail event selectors
AWS CloudTrail event selectors are nothing more than filtering rules one might apply to configure which events get captured by a given trail. It is vital to note that the event selectors only apply to the data events. To my best knowledge, you cannot filter the management events (only turn them off and on).
Update 24.11.2021
As my colleague @rosswilliams pointed out, before embarking on the CloudTrail implementation journey, keep in mind that AWS CloudTrail might deliver your events with up to 15 minutes delay. From my personal experience, the delay is relatively insignificant, and events are delivered instantly. Please keep this fact in mind.
For reference, head to the AWS CloudTrail FAQ, mainly the "Event payload, Timelines, and Delivery Frequency" section.
Utilizing the EventSelectors
The "regular" AWS CloudTrail event selectors allow you to track read and/or write events originating from a specific bucket. The list of all possible events is rather extensive.
The following is an example of how one might create an AWS CloudTrail event selector for an Amazon S3 bucket using AWS CDK.
const bucket = new s3.Bucket(this, "bucket", {
removalPolicy: cdk.RemovalPolicy.DESTROY
});
const trail = new cloudTrail.Trail(this, "trail", {
includeGlobalServiceEvents: false,
isMultiRegionTrail: false
});
trail.addS3EventSelector([{ bucket, objectPrefix: "uploads" }], {
includeManagementEvents: false,
readWriteType: cloudTrail.ReadWriteType.WRITE_ONLY
});
Just like in the case of S3 Notifications, we can specify the prefix
parameter. Unlike the S3 Notifications, we cannot specify the suffix
parameter.
Combined with the broad scope of WRITE_ONLY
events, this setup forces us to do much of our filtering on the AWS EventBridge side of things. Luckily for us, the service we are sending events to is designed with rich filtering capabilities in mind.
The following code creates an AWS EventBridge rule matching Amazon S3 events.
const bucket = ...
const machine = ...
const assetUploadedRule = new events.Rule(this, "assetUploadedRule", {
enabled: true,
eventPattern: {
source: ["aws.s3"],
detail: {
eventSource: ["s3.amazonaws.com"],
eventName: ["PutObject"],
requestParameters: {
bucketName: [bucket.bucketName]
}
}
},
targets: [new eventsTargets.SfnStateMachine(machine)]
});
And that's it! All the PutObject
events originating from the bucket you have applied the EventSelector on will be pushed to Amazon EventBridge and cause state machine invocation.
Utilizing the AdvancedEventSelectors
The "advanced" AWS CloudTrail event selectors offer more sophisticated event filtering capabilities. This is a good thing because every data event incurs AWS CloudTrail cost. Utilizing the advanced event selectors is a excellent way to optimize your CloudTrail costs.
The bad news is that the advanced event selectors are not supported via AWS CloudFormation, making the AWS CDK code a bit more involved.
The following code creates AdvancedEventSelector
using custom resources.
const trail = new cloudTrail.Trail(this, "trail", {
includeGlobalServiceEvents: false,
isMultiRegionTrail: false
});
const cfnTrail = trail.node.defaultChild as cloudTrail.CfnTrail;
const advancedEventSelectorResource = new customResources.AwsCustomResource(
this,
"advancedEventSelectorResource",
{
onCreate: {
action: "putEventSelectors",
service: "CloudTrail",
parameters: {
AdvancedEventSelectors: [
{
FieldSelectors: [
{ Field: "eventCategory", Equals: ["Data"] },
{ Field: "resources.type", Equals: ["AWS::S3::Object"] },
{ Field: "eventName", Equals: ["PutObject"] },
{
Field: "resources.ARN",
StartsWith: [bucket.arnForObjects("uploads/")]
}
],
Name: "Listens to the PutObject only"
}
],
TrailName: cfnTrail.ref
},
physicalResourceId:
customResources.PhysicalResourceId.fromResponse("TrailARN")
},
onDelete: {
action: "putEventSelectors",
service: "CloudTrail",
parameters: {
AdvancedSelectors: [],
TrailName: cfnTrail.ref
}
},
policy: customResources.AwsCustomResourcePolicy.fromSdkCalls({
resources: [trail.trailArn]
})
}
);
The AWS EventBridge rule declaration has not changed.
const bucket = ...
const machine = ...
const assetUploadedRule = new events.Rule(this, "assetUploadedRule", {
enabled: true,
eventPattern: {
source: ["aws.s3"],
detail: {
eventSource: ["s3.amazonaws.com"],
eventName: ["PutObject"],
requestParameters: {
bucketName: [bucket.bucketName]
}
}
},
targets: [new eventsTargets.SfnStateMachine(machine)]
});
If I were to choose between the "regular" and "advanced" event selectors, I would personally be inclined to pick the latter - mainly due to cost savings.
Closing words
There you have it. These were two ways one might integrate Amazon S3 events with AWS StepFunctions state machine. Of course, in the world of AWS, there are probably much more ways to do this. I wanted to point you towards the ones I find very useful.
As always, thank you for your valuable time.
You can find me on Twitter - @wm_matuszewski