From CockroachDB to AWS SNS via AWS API Gateway

Fabio Ghirardello - Oct 9 - - Dev Community

This is a brief write up on how to send rows inserted into a CockroachDB table as messages to a SNS topic.

CockroachDB Change Data Capture (CDC) features several sinks, see docs for a full list here, however AWS SNS is not yet one of these.

We can workaround this limitation by leveraging the webhook sink, which publishes to a HTTP endpoint, and API Gateway, which provides such HTTP endpoint and is tightly integrated internally with other AWS services, such as SNS.

At high level, the pipeline works as follows:

  1. A Changefeed is created on the CockroachDB table.
  2. The Changefeed watches for INSERTs (any write operation, to be precise) on the table and emits messages to the API Gateway endpoint.
  3. API Gateway receives the message and publishes the payload to the SNS topic via an internal AWS integration.
  4. Subscribers of the SNS topic get a notification.

We will create this pipeline backwards, starting from creating the SNS topic back to configuring the Changefeed.

Create SNS topic

This is the easiest step of them all, thanks to the AWS Console for making it so easy.

  1. Open the AWS Console and head to Simple Notification Service.
  2. You'll be welcomed to a page where you are prompted to create a new topic.
  3. Create a topic named "events". Keep all the default settings; for this test we won't bother with certs and keys, we want to make sure the pipeline works first.
  4. Create a subscriber. In my example, I choose "Protocol" to be "Email", and entered my email address as the "Endpoint".
  5. Finally, very important, test it out directly from the Console by using the Publish message button.
  6. Check your inbox, you should have received a request to verify your email address, and the message you published.

Jot down your topic ARN, in my case is arn:aws:sns:ca-central-1:3xxx8:events

sns

Good, you have created the topic and validated it's working.

Create AWS Role

A prerequisite for the API Gateway is to have a AWS Role with privileges to Publish to the SNS topic we just created.

  1. Go to AWS IAM.
  2. Go to Roles and "Create role". Select AWS Service as 'Trusted Entity Type' and "API Gateway" as the 'Use case'.
  3. Continue to Step 3. Name it "events_role" and create it.
  4. Your role is now created, but it doesn't have the permission it needs. Find the role in the list of roles and select it.
  5. In the Permission section, Add Permission and choose "AmazonSNSFullAccess", for simplicity, then save.

aws_role

Jot down the ARN of the role, in my case it's arn:aws:iam::3xxx8:role/events_role

Create API Gateway endpoint

  1. From the AWS Console, head to API Gateway.
  2. Click the Create API button and select a 'REST API'.
  3. Name it "events", and set 'API Endpoint type' to "Regional".
  4. Within the "events" API selected, click the Create Resource button and choose / as Resource Path and events as Resource Name. No need to enable CORS or the Proxy Resource.
  5. Once in the Resource Detail page for the /events/ path, choose Create Method
  6. Choose:
    Method type = POST
    Integration type = AWS service
    AWS Region = ca-central-1, which is the region I've used to deploy the SNS topic
    AWS Service = Simple Notification Service
    AWS Subdomain = keep it blank
    HTTP method = POST
    Action Type = Use path override
    Path override = arn:aws:apigateway:ca-central-1:sns:path// <-- substitute with your region
    Execution role = the ARN role you jotted down previously

    Leave the rest the default then "Create method"

You should now have something that looks like this:

api_gateway_1

Now, we need to create the tidbit that takes the incoming HTTP payload and inserts it as to the SNS topic.

  1. Click the "Integration Request" tab, then Edit
  2. Scroll to the "URL request headers parameters" section and add

    Name = Content-Type
    Mapped from = 'application/x-www-form-urlencoded'

  3. Go to "Mapping templates" and set

    Content type = application/json
    Template body =

    #set($topic="arn:aws:sns:ca-central-1:3xxx8:events")
    #set($msg=$input.path('$.payload'))
    Action=Publish&TopicArn=$util.urlEncode($topic)&Message=$util.urlEncode($msg)
    

    Substitute my value with your SNS topic ARN

  4. Save, then you should see the below

api_gateway_2

Finally, click the Test tab, and in the Request Body enter

{"payload":"test_event_api"}
Enter fullscreen mode Exit fullscreen mode

You should get a confirmation that the message was published, and soon you should receive the same SNS email in your inbox.

api_gateway_3

Wow, it's working, getting there!

Now we need to publish this API, and create a usage plan.

Deploy API

  1. On the main page for this API, click "Deploy API"
  2. At the prompt, choose New stage and call it DEV.
  3. Click 'Deploy'

You'll be redirected to the Stages section.
Jot down the Invoke URL, in my case, it's https://72eebc6w0k.execute-api.ca-central-1.amazonaws.com/DEV

Create usage plan

  1. On the main page for this API, click 'Usage Plans'
  2. Create a new usage plan. I added these sample values for this functional test

    Name = events_plan
    Rate = 10
    Burst = 10
    Requests = 100 per day

  3. Click 'Create usage plan'

  4. Re-select the newly create events_plan usage plan, and add a Stage

  5. At the prompt, choose

    API = events
    Stage = DEV

  6. Save your changes.

Test locally

Now that the API is live and that we have a billing plan attached, we are ready to test if that can be invoked by an external client.

On your laptop, thus locally, invoke it using curl

$ curl -X POST https://72eebc6w0k.execute-api.ca-central-1.amazonaws.com/DEV/events --json '{"payload":"hello_from_curl"}' | jq
{
  "PublishResponse": {
    "PublishResult": {
      "MessageId": "7a3e9555-6031-5d2b-b72a-cad5c8ddf3b4",
      "SequenceNumber": null
    },
    "ResponseMetadata": {
      "RequestId": "a63ca446-743e-520b-b689-fbc6dcd2bc94"
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

And sure enough I got my email with the same text.

We are now ready to have CockroachDB send data to our endpoint.

Create test table and configure the changefeed

Create a simple events table such as

CREATE TABLE events (
    id UUID NOT NULL DEFAULT gen_random_uuid(),
    payload JSONB NULL,
    ts TIMESTAMPTZ NULL DEFAULT now():::TIMESTAMPTZ,
    CONSTRAINT pk PRIMARY KEY (id ASC)
);
Enter fullscreen mode Exit fullscreen mode

Create the changefeed on the newly created events table.
You will need to update the endpoint hostname with yours.

-- enable rangefeed
SET CLUSTER SETTING kv.rangefeed.enabled = true;

CREATE CHANGEFEED FOR TABLE events INTO 'webhook-https://72eebc6w0k.execute-api.ca-central-1.amazonaws.com/DEV/events?insecure_tls_skip_verify=true'
WITH updated, resolved='20s', diff, schema_change_policy=backfill, initial_scan=no, 
    webhook_sink_config = '{ "Flush": {"Messages": 100, "Frequency": "5s"}, "Retry": { "Max": 4, "Backoff": "10s"} }'
;
--         job_id
-- -----------------------
--   1010627173653250049
Enter fullscreen mode Exit fullscreen mode

Check that the changefeed is running and posting a resolved timestamp

select running_status from [show changefeed jobs where job_id 1010627173653250049;
--                running_status
-- --------------------------------------------
--   running: resolved=1728489668.321914000,0
Enter fullscreen mode Exit fullscreen mode

Good, it's running so we're finally ready to test by inserting a row into the table

insert into events (payload) values ('{"my_key" : "my_value"}');  
Enter fullscreen mode Exit fullscreen mode

Check your inbox:

Image description

. . . . . . . . . . . . . . . . . . . . .