AWS Kinesis and Lambda are a great combo for processing large amounts of data in real-time. However, there’s a common oversight that many developers make when integrating these two services.
There are established best practices for configuring Lambda’s EventSourceMapping [1] for Kinesis:
- Configure an
OnFailure
destination for failed records. - Enable
BisectBatchOnFunctionError
. - Override
MaximumRetryAttempts
. Choose a value that gives failed messages a reasonable number of retries before sending them to theOnFailure
destination.
These are great advice for building a resilient data processing pipeline and guarding against poison messages. But one crucial detail is missing: the payload captured in the OnFailure destination does not contain the event payload.
This is what will be captured in the OnFailure
destination.
{
"requestContext": {
"requestId": "cf6fa2a6-48e2-49a6-bc23-7ed2b154b176",
"functionArn": "arn:aws:lambda:us-east-1:xxx:function:kinesis-processor",
"condition": "RetryAttemptsExhausted",
"approximateInvokeCount": 4
},
"responseContext": {
"statusCode": 200,
"executedVersion": "$LATEST",
"functionError": "Unhandled"
},
"version": "1.0",
"timestamp": "2023-12-10T22:06:23.446Z",
"KinesisBatchInfo": {
"shardId": "shardId-000000000000",
"startSequenceNumber": "49647238774018758449143790847474952390342044683891376130",
"endSequenceNumber": "49647238774018758449143790941112301748593148785147772930",
"approximateArrivalOfFirstRecord": "2023-12-10T22:05:59.280Z",
"approximateArrivalOfLastRecord": "2023-12-10T22:06:19.196Z",
"batchSize": 3,
"streamArn": "arn:aws:kinesis:us-east-1:xxx:stream/MyKinesisStream"
}
}
The KinesisBatchInfo
contains the shard ID, sequence numbers and batch size. The actual data payload – the heart of the event that you need for processing and analysis – is not included.
This omission can lead to significant issues:
- Difficulty in debugging : Without the actual event data, pinpointing the cause of the failure becomes a complex task.
- Data loss : If you are not able to fetch the record before it expires from the stream, then the data is lost forever. Default retention for a Kinesis stream is 24 hours. If the error occurs on a Saturday, likely, your developers won’t see it until they are back in the office on Monday. By then, the record is no longer available in the stream.
- Increased complexity : Developers need to implement additional mechanisms to retrieve and store the original event data.
Most people don’t realise this and many have been caught off-guard when they encounter a failed message for the first time!
Hydrating event data
To handle this situation, you need to hydrate the event data yourself.
Instead of using an SQS queue as the OnFailure
destination directly:
Let’s add a layer of indirection:
- For the Kinesis function, use an SNS topic as the
OnFailure
destination. - Subscribe another Lambda function to this topic. This
hydrate
function retrieves the record from the Kinesis stream. - Because the
hydrate
function is invoked asynchronously by SNS, we can configure anOnSuccess
destination to write its return value to the original SQS DLQ. This way, we don’t have to maintain the code to write to SQS. However, the downside is that the message captured in the SQS queue is more verbose (see below). All we care about is theresponsePayload
where we enriched the originalOnFailure
message with theKinesisBatch
array.
{
"version": "1.0",
"timestamp": "2023-12-10T22:43:49.863Z",
"requestContext": {
"requestId": "2944cfad-d4c4-4b28-ad45-bfdd7f4526ef",
"functionArn": "arn:aws:lambda:us-east-1:374852340823:function:hydrate-kinesis-dlq-dev-hydrate:$LATEST",
"condition": "Success",
"approximateInvokeCount": 1
},
"requestPayload": {
"Records": [
{
"EventSource": "aws:sns",
"EventVersion": "1.0",
"EventSubscriptionArn": "arn:aws:sns:us-east-1:374852340823:hydrate-kinesis-dlq-dev-dlq-topic:aee08643-65ae-4caa-b117-de4c20cda524",
"Sns": {
"Type": "Notification",
"MessageId": "002d4204-3a47-5737-b1b5-1ff47b22a32b",
"TopicArn": "arn:aws:sns:us-east-1:374852340823:hydrate-kinesis-dlq-dev-dlq-topic",
"Subject": null,
"Message": "{\"requestContext\":{\"requestId\":\"5577e27b-a674-48cc-9fcb-aa6cf675e939\",\"functionArn\":\"arn:aws:lambda:us-east-1:374852340823:function:hydrate-kinesis-dlq-dev-kinesis\",\"condition\":\"RetryAttemptsExhausted\",\"approximateInvokeCount\":4},\"responseContext\":{\"statusCode\":200,\"executedVersion\":\"$LATEST\",\"functionError\":\"Unhandled\"},\"version\":\"1.0\",\"timestamp\":\"2023-12-10T22:43:49.459Z\",\"KinesisBatchInfo\":{\"shardId\":\"shardId-000000000000\",\"startSequenceNumber\":\"49647238774018758449143791361301066723689190929215782914\",\"endSequenceNumber\":\"49647238774018758449143791361304693501148036740885250050\",\"approximateArrivalOfFirstRecord\":\"2023-12-10T22:42:50.398Z\",\"approximateArrivalOfLastRecord\":\"2023-12-10T22:43:20.392Z\",\"batchSize\":4,\"streamArn\":\"arn:aws:kinesis:us-east-1:374852340823:stream/hydrate-kinesis-dlq-dev-KinesisStream-nrH7Hk48mKe2\"}}",
"Timestamp": "2023-12-10T22:43:49.506Z",
"SignatureVersion": "1",
"Signature": "MauQRhErmYexthVBYzNAsLk2LSkJLWBGdRZj1ubK4py1Mss7XSGM4dTlI3NUMct9xivRqNDt1n/iGRr1QVsF8zYCV5YKJUnYQeXZKkoL81RrTTtq2xg35IoU5VQz3gvvl6nwc5C765K+62W1/mPCGgy5byyepnr/FsFyXesl2wbQZ/nje26JYqT/4xaVm9BrExI83tXXhVMxcYymWs46/Qcq1MEWAhjPiTG4TmAdAB3vwX99ZAikqoIm7zL8TxG838Dy52B09T44GHunfyEcL5uIGHAOr1J7YHc+OVNsgSFxNa3yB+nQFEyHJso9pTeA+w2F7bYGeEVyhMSPjmuFsg==",
"SigningCertUrl": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-01d088a6f77103d0fe307c0069e40ed6.pem",
"UnsubscribeUrl": "https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:374852340823:hydrate-kinesis-dlq-dev-dlq-topic:aee08643-65ae-4caa-b117-de4c20cda524",
"MessageAttributes": {}
}
}
]
},
"responseContext": {
"statusCode": 200,
"executedVersion": "$LATEST"
},
"responsePayload": {
"requestContext": {
"requestId": "5577e27b-a674-48cc-9fcb-aa6cf675e939",
"functionArn": "arn:aws:lambda:us-east-1:374852340823:function:hydrate-kinesis-dlq-dev-kinesis",
"condition": "RetryAttemptsExhausted",
"approximateInvokeCount": 4
},
"responseContext": {
"statusCode": 200,
"executedVersion": "$LATEST",
"functionError": "Unhandled"
},
"version": "1.0",
"timestamp": "2023-12-10T22:43:49.459Z",
"KinesisBatchInfo": {
"shardId": "shardId-000000000000",
"startSequenceNumber": "49647238774018758449143791361301066723689190929215782914",
"endSequenceNumber": "49647238774018758449143791361304693501148036740885250050",
"approximateArrivalOfFirstRecord": "2023-12-10T22:42:50.398Z",
"approximateArrivalOfLastRecord": "2023-12-10T22:43:20.392Z",
"batchSize": 4,
"streamArn": "arn:aws:kinesis:us-east-1:374852340823:stream/hydrate-kinesis-dlq-dev-KinesisStream-nrH7Hk48mKe2"
},
"KinesisBatch": [
"Hello, theburningmonk!",
"Hello, theburningmonk!",
"Hello, theburningmonk!",
"Hello, theburningmonk!"
]
}
}
If you’re interested in trying this out yourself, then please take a look at this GitHub repo [2]. In particular, look at the hydrate
function to see how to retrieve the records from the Kinesis stream.
It’s worth noting that this approach also works for DynamoDB streams. In the GitHub repo linked above, you will find working examples for both Kinesis and DynamoDB streams.
Next steps
I hope this post helps you avoid a very common mistake people make when working with Kinesis and Lambda.
To learn more about building production-ready serverless applications, check out my upcoming workshop [3]. I’m working on some new lessons to incorporate the latest changes from re:Invent 2023. And I’m adding support for CDK as well, due to popular demand!
Hope to see you there.
Links
[1] EventSourceMapping documentation
[2] GitHub repo for a working demo
[3] Production-Ready Serverless workshop
The post The one mistake everyone makes when using Kinesis with Lambda appeared first on theburningmonk.com.