Hello everyone,
In this blog, I’ll walk you through building an event-driven pipeline that converts the contents in Dynamo DB into E*mbeddings, making them searchable via **OpenSearch* for vector search. The goal of this pipeline is to automatically handle the entire process whenever new content is added or existing content is modified in Dynamo DB
This event-driven architecture triggers each step in the process seamlessly, converting newly added or updated items into embeddings and storing them in OpenSearch
One of my key design goals is to minimize the level of coding for connecting services and reduce the reliance on Lambda Functions. Instead, I focused on leveraging AWS Services and Event Bridge to connect and automate the workflow.
This is how the workflow will look like
Before diving into the implementation Let’s prepare the scripts for inserting data and converting contents into embeddings and inserting them into OpenSearch
For this POC I am using this dataset from Kaggle https://www.kaggle.com/datasets/fernandogarciah24/top-1000-imdb-dataset
Code for inserting data into Dynamo DB:
import csv
import boto3
import uuid
# DynamoDB table name
DYNAMODB_TABLE_NAME = "content"
# Initialize DynamoDB client
dynamodb = boto3.resource('dynamodb',region_name="us-east-1")
table = dynamodb.Table(DYNAMODB_TABLE_NAME)
def process_csv_and_insert_to_dynamodb(csv_file_path):
try:
# Open and read the CSV file
with open(csv_file_path, mode='r', encoding='utf-8') as file:
csv_reader = csv.DictReader(file)
content_id = 0 # Start content_id from 0
# Iterate over each row in the CSV
for row in csv_reader:
# Prepare item for DynamoDB
item = {
'content_id': content_id,
'content_title': row['Series_Title'],
'genre': row['Genre'],
'overview': row['Overview']
}
# Insert item into DynamoDB
table.put_item(Item=item)
print(f"Inserted: {item}")
content_id += 1 # Increment content_id
if content_id == 65: #stopping at first 65 records for testing
break
except Exception as e:
print(f"Error: {e}")
# Provide the path to your CSV file
csv_file_path = "movies.csv"
# Call the function
process_csv_and_insert_to_dynamodb(csv_file_path)
Code for converting content into Embeddings:
import boto3
import json
import os
import time
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests.auth import HTTPBasicAuth
from requests_aws4auth import AWS4Auth
credentials = boto3.Session().get_credentials()
aws_auth = AWS4Auth(
credentials.access_key,
credentials.secret_key,
'us-east-1',
'aoss', # Service name for OpenSearch Serverless
session_token=credentials.token
)
QUEUE_URL = "queue url"
OPENSEARCH_ENDPOINT = "open search serverless endpoint"
INDEX_NAME = "contents"
AWS_REGION = 'us-east-1'
# AWS Clients
sqs = boto3.client('sqs', region_name=AWS_REGION)
bedrock_runtime = boto3.client('bedrock-runtime', region_name=AWS_REGION)
# OpenSearch Client
def get_opensearch_client():
return OpenSearch(
hosts=[{'host': OPENSEARCH_ENDPOINT, 'port': 443}],
http_auth=aws_auth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection
)
# Function to poll messages from SQS
def poll_sqs_messages():
response = sqs.receive_message(
QueueUrl=QUEUE_URL,
MaxNumberOfMessages=10, # Fetch up to 10 messages
WaitTimeSeconds=10
)
return response.get('Messages', [])
# Function to call Amazon Titan for embedding generation
def generate_embeddings(text):
payload = {
"inputText": text
}
response = bedrock_runtime.invoke_model(
modelId="amazon.titan-embed-text-v1",
contentType="application/json",
accept="application/json",
body=json.dumps(payload)
)
response_body = json.loads(response['body'].read())
return response_body.get('embedding')
# Function to store embeddings in OpenSearch
def store_embeddings_in_opensearch(content_id, embedding, content_title,genre):
client = get_opensearch_client()
print("got the client")
document = {
"title": content_title,
"overview": embedding,
"genre": genre,
"content_id": content_id
}
print("got the document")
response = client.index(
index=INDEX_NAME,
body=document
)
print("got the response")
return response
# Main Processing Function
def main():
print("Starting Batch Job to process SQS messages...")
messages = poll_sqs_messages()
if not messages:
print("No messages found in the queue. Exiting.")
return
for message in messages:
try:
body = json.loads(message['Body'])
db_record = body['dynamodb']['NewImage']
content_title = db_record['content_title']['S']
overview = db_record['overview']['S']
content_id = db_record['content_id']['N']
genre = db_record['genre']['S']
# Generate Embedding
embedding = generate_embeddings(overview)
print(f"Generated embedding for content: {content_title}")
# Store in OpenSearch
store_embeddings_in_opensearch(content_id, embedding, content_title,genre)
# Delete message from SQS after successful processing
# sqs.delete_message(
# QueueUrl=QUEUE_URL,
# ReceiptHandle=message['ReceiptHandle']
# )
# print(f"Deleted message {document_id} from SQS.")
except Exception as e:
print(f"Error processing message: {str(e)}")
continue
print("Batch Job completed successfully.")
if __name__ == "__main__":
main()
- Create a Dockerfile and Create a Docker image to push it to ECR
FROM python:3.9-slim
USER root
# Install dependencies
RUN pip install boto3 opensearch-py requests requests-aws4auth
# Copy the script into the container
COPY process_embeddings_batch.py /app/process_embeddings_batch.py
# Default command
CMD ["python", "/app/process_embeddings_batch.py"]
Visit the ECR section and Click on Create a Repo. Provide a name for the Repo and Click on Push Commands button once the Repo is Created
Use the push command to build and push the docker image
Step-by-step implementation:
Create a DynamoDB table and enable streams
Create a SQS Queue for holding the records of DB
Create an Event Bridge Pipe to Connect Dynamo DB and SQS
Create a Cloud Watch Alarm When messages in the Queue exceed more than 50 messages
Create a AWS Batch job definition to run jobs
Create a State Machine to Submit a job to AWS Batch
Create a rule in Even Bridge to listen for Alarm and Trigger the Step Function state machine
Create an OpenSearch Serverless collection and index
Create a DynamoDB table and enable streams:
- Visit DynamoDB Service, click on the Create Tables button, Provide a name for the table, make content_id as the partition key, and type as Number
- Visit the Exports and Streams tab and enable the Streams
Create a SQS Queue for holding the records of DB:
- Visit the SQS service from the AWS console and click on Create a Queue. There is no need for a message order. So Go for the standard queue. Provide a name for the Queue and click on Create Queue
Create an Event Bridge Pipe to Connect Dynamo DB and SQS:
Visit Event Bridge Service and click on Create pipe
Select Source as DyanoDB and Target as SQS
- Click on Create Pipe. This will push the dynamo db records to SQS
Create a Cloud Watch Alarm When messages in the Queue exceed more than 50 messages:
Visit the cloud watch service in the AWS console. From the Side panel Click on All Alarms and Click on Create Alarm
Select SQS ApproximateNumberOfMessagesVisible as the metric
- Keep the conditions like greater than 50 messages
- Skip the actions, provide a name for the alarm, and click on Create Alarm
Create a AWS Batch job definition to run jobs:
- Click on Create a job definition, select Fargate as Orchestration type provide storage as 21GB and keep the rest of the fields as it is and visit the next section
- Copy the URI of the ECR image and paste it here which we created at the start of the blog and keep the rest of the fields as it is
- Keep the user as root and select logging as AWS Logs
Review everything and create the job definition
We also need to create a job queue and computing environment. Provide basic details and create a job queue with a compute environment
Here is how to create a computing environment
- Visit the Job Queue and click on create a job queue with the environment we created
- Now We have Job Definition and Job Queue with us. Let’s create a state machine to trigger jobs
Create a State Machine to Submit a job to AWS Batch:
Visit the step functions service and click on Create a state machine.
Select submit job state and select job definition and queue we created above
For the sample, I am using only the one required state. Keep adding more states based on your needs
- Click on the submit job state and select job queue and definition from the right-side panel
- Now save the state machine and Let’s connect the SQS and this state machine based on the cloud watch Alarm We Created above
Create a rule in Even Bridge to listen for Alarm and Trigger the Step Function state machine:
Visit the Event Bridge service and select Rules from the side menu
Provide a name for the rule and select an event with a pattern
- In the event pattern paste the following JSON
{
"source": ["aws.cloudwatch"],
"detail-type": ["CloudWatch Alarm State Change"],
"detail": {
"state": {
"value": ["ALARM"]
},
"alarmName": ["sqs-threshold-alarm"]
}
}
- Select the target as the state machine we created and click on Create rule
Create an OpenSearch Serverless collection and index:
- Visit the Serverless section from OpenSearch Service, Click on Create a Collection, Provide the details like this, and click on Create collection
- Once the collection is created Click on Create an index with the following configuration
That’s it We have everything ready and connected to every service. Let’s run the inserting python script to insert data into Dynamo DB and check everything is Trigged well or not.
Note: Once the Threshold is reached it will take some time to get the state machine to be triggered
These are the jobs that got triggered whenever I inserted data into Dynamo DB
Verify whether records are inserted or not in the index. You can check the records count or dashboard to see the records
Once the data is ready you can build a vector search engine by following my previous article here
Building a vector search Engine
Please let me know if you are struck at any place and need any assistance through the comment section. Thanks. Have a great day