Part 1: Integrating Debezium Server and Memphis.dev for Streaming Change Data Capture (CDC) Events

avital trifsik - May 18 '23 - - Dev Community

This is part one of a series of blog posts on building a modern event-driven system using Memphis.dev.

Change data capture (CDC) is an architectural pattern which turns databases into sources for event-driven architectures. Frequently, CDC is implemented on top of built-in replication support. Changes to data (e.g., caused by INSERT, UPDATE, and DELETE statements) are recorded as atomic units and appended to a replication log for transmission to replicas. CDC software copies the events from the replica logs to streaming infrastructure for processing by downstream components.

So what do CDC events look like? In this tutorial, we’ll use the example of a table of todo items with the following fields:

Table

A null value for the due date signifies that there is no due date.

If a user creates a todo item to buy milk from the store, the corresponding CDC event would look like:

{
“before” : null,

“after” : {
    “id” : 25,
    “description” : “buy milk”,
    “creation_timestamp” : “2023-05-01T16:32:15”,
    “due_date” : “2023-05-02”,
    “completed”: False }
}
Enter fullscreen mode Exit fullscreen mode

If the user then completes (updates) the todo item, the following CDC event would be generated:

{
“before” : {
    “id” : 25,
    “description” : “buy milk”,
    “creation_timestamp” : “2023-05-01T16:32:15”,
    “due_date” : “2023-05-02”,
    “completed”: False },

“after” : {
    “id” : 25,
    “description” : “buy milk”,
    “creation_timestamp” : “2023-05-01T16:32:15”,
    “due_date” : “2023-05-02”,
    “completed”: True }
}

Enter fullscreen mode Exit fullscreen mode

If the user deletes, the original item, the CDC event will look like so:

{
“before” : {
     “id” : 25,
     “description” : “buy milk”,
     “creation_timestamp” : “2023-05-01T16:32:15”,
     “due_date” : “2023-05-02”,
     “completed”: False },

“after” : null
}

Enter fullscreen mode Exit fullscreen mode

The CDC approach is used to support various data analyses that are run in near real-time:

  • Copying data from one database to another. Modern systems often incorporate multiple storage solutions chosen because of their optimizations for complementary workloads. For example, online transaction processing (OLTP) databases like PostgreSQL are designed to support many concurrent users each performing queries that touch a small amount of data. Online analytical processing (OLAP) databases like Clickhouse are optimized to handle a small number of queries touching a large amount of data. The CDC approach doesn’t require schema changes (e.g., adding update timestamps) and is less resource intensive than approaches like running periodic queries to find new or changed records.

  • Performing real-time data integration. Some tasks require that data be pulled from multiple data sets and integrated. For example, a user’s clickstream (page view) events may be changed with details of the products they’re browsing to feed a machine learning model. Performing these joins in the production OLTP databases reduces application responsiveness. CDC allows computational heavy actions to be run on dedicated subsystems.

  • Performing aggregations or window analyses. An OLTP database may only log events such as commercial transactions. Business analysts may want to see the current sum of sales in a given quarter, however. The events captured through CDC can be aggregated in real-time to update dashboards and other data applications.

  • Performing de-aggregations. For performance reasons, an OLTP database may only store the current state of data like counters. For example, a database may store the number of likes or views of social media posts. Machine learning models often need individual events, however. CDC generates an event for every increase or decrease in the counters, effectively creating a historical time series for downstream analyses.


Implementing the CDC Pattern

Debezium is a popular open-source tool for facilitating CDC. Debezium provides connectors for various open-source and proprietary databases. The connectors listen for data change events and convert the internal representations to common formats such as JSON and Avro. Additionally, some support is provided for filtering and transforming events.

Debezium was originally designed as connectors that run in the Apache Kafka Connect framework. Apache Kafka, unfortunately, has a pretty large deployment footprint for production setups. It is recommended that a minimal production deployment has at least 3 nodes with 64 GB of RAM and 24 cores with storage configured with RAID 10 and at least 3 additional nodes for a separate Apache Zookeeper cluster. Meaning, a minimal production setup of Apache Kafka requires at least 6 nodes. Further, the JVM and operating system need to be tuned significantly to achieve optimal performance.

Many cloud-native systems are divided into microservices that are designed to scale independently. Rather than relying on one large message broker cluster, it’s common for these systems to deploy multiple, small, independent clusters. Memphis.dev is a next-generation, cloud-native message broker with a low resource footprint, minimal operational overhead, and no required performance tuning.

Debezium recently announced the general availability of Debezium Server, a framework for using Debezium connectors without Apache Kafka. Debezium Server runs in a standalone mode. Sink connectors for a wide range of messaging systems are included out of the box.

CDC-PATTERN

In this tutorial, we’ll demonstrate how to implement the CDC pattern for PostgreSQL using Debezium Server and Memphis.dev.


The Collaborative Power of Open Source: Interfacing Debezium Server and Memphis.dev

Memphis.dev and Debezium Server are integrated using REST. The Memphis.dev REST gateway provides endpoints for consuming messages, while Debezium Server provides the HTTP client sink for transmitting messages via REST. In our reference solution, Debezium Server makes a POST request to /station/todo-cdc-events/produce/single for each message. The REST interface accepts messages in JSON, text, and binary Protocol Buffer formats.

Unfortunately, we hit a stumbling block while implementing our CDC solution. The Memphis.dev REST gateway uses JSON Web Tokens (JWT) authentication for security, but Debezium Server’s HTTP client didn’t support it. Thanks to the collaborative power of open source, we were able to work with the Debezium developers to add JWT authentication functionality. The user must specify a username, password, and authentication endpoint URL in the Debezium Server configuration file. The server then tracks its authentication state and makes REST requests to perform an initial authorization and refresh that authorization as needed.

With the JWT authentication now in place, Debezium Server can forward CDC events to Memphis.dev. Further, all Debezium Server users, whether or not they are using Memphis.dev, can benefit from this functionality.


Overview of the Solution

Here, we describe a reference solution for delivering change data capture events with Memphis.dev. Debezium Server’s HTTP client sink is used to send the CDC events from a PostgreSQL database to a Memphis.dev instance using the Memphis REST gateway. Our solution has six components:

  1. Todo Item Generator: Inserts a randomly-generated todo item in the PostgreSQL table every 0.5 seconds. Each todo item contains a description, creation timestamp, optional due date, and completion status.

  2. PostgreSQL: Configured with a single database containing a single table (todo_items).

  3. Debezium Server: Instance of Debezium Server configured with PostgreSQL source and HTTP Client sink connectors.

  4. Memphis.dev REST Gateway: Uses the out-of-the-box configuration.

  5. Memphis.dev: Configured with a single station (todo-cdc-events) and single user (todocdcservice).

  6. Printing Consumer: A script that uses the Memphis.dev Python SDK to consume messages and print them to the console.

Postgress CDC example


Running the Implementation

Code repository: Memphis Example Solutions.
Docker Compose will be needed.

Step 1: Build the Docker images for Debezium Server, the printing consumer, and database setup (table and user creation).

Currently, our implementation depends on a pre-release version of Debezium Server for the JWT authentication support. A Docker image will be built directly from the main branch of the Debezium and Debezium Server repositories. Note that this step can take quite a while (~20 minutes) to run. When Debezium Server 2.3.0 is released, we will switch to using the upstream Docker image.

$ docker compose build --pull --no-cache
Enter fullscreen mode Exit fullscreen mode
[+] Building 0.0s (0/1)
[+] Building 0.2s (2/3)
 => [internal] load build definition from Dockerfile             0.0s
[+] Building 19.0s (5/10)
 => [internal] load build definition from Dockerfile             0.0s
[+] Building 19.2s (5/10)
 => [internal] load build definition from Dockerfile             0.0s
 => => transferring dockerfile: 302B                             0.0s
 => [internal] load .dockerignore                                0.0s
 => => transferring context: 2B                                  0.0s
 => [internal] load metadata for docker.io/library/debian:bullseye-slim                           0.3s
 => CACHED [1/6] FROM docker.io/library/debian:bullseye-slim@sha256:9404b05bd09b57c76eccc0c5505b3c88b5feccac808d9b193a4fbac87bb                              0.0s
[+] Building 31.4s (5/10)
[+] Building 32.2s (5/10)
[+] Building 34.2s (5/10)
 => [internal] load .dockerignore                                0.0s
[+] Building 37.6s (11/11) FINISHED
 => [internal] load build definition from Dockerfile             0.0s
 => => transferring dockerfile: 302B                             0.0s
[+] Building 37.7s (5/10)
 => [internal] load .dockerignore                                0.0s
 => => transferring context: 2B                                  0.0s
 => [internal] load build definition from Dockerfile             0.0s
 => => transferring dockerfile: 300B                             0.0s
[+] Building 37.9s (5/10)
 => [internal] load .dockerignore                                0.0s
[+] Building 38.0s (5/10)
[+] Building 38.2s (5/10)
[+] Building 18.9s (4/14)
 => [internal] load build definition from Dockerfile             0.0s
 => => transferring dockerfile: 613B                             0.0s
[+] Building 20.0s (4/14)
[+] Building 65.8s (11/11) FINISHED
 => [internal] load .dockerignore                                0.0s
 => => transferring context: 2B                                  0.0s
 => [internal] load build definition from Dockerfile             0.0s
[+] Building 1207.0s (15/15) FINISHED
 => [internal] load build definition from Dockerfile             0.0s
 => => transferring dockerfile: 613B                             0.0s
 => [internal] load .dockerignore                                0.0s
 => => transferring context: 2B                                  0.0s
 => [internal] load metadata for docker.io/library/debian:bullseye-slim                           0.2s
 => CACHED [1/6] FROM docker.io/library/debian:bullseye-slim@sha256:9404b05bd09b57c76eccc0c5505b3c88b5feccac808d9b193a4fbac87bb                              0.0s
 => [ 2/13] RUN apt update && apt upgrade -y && apt install -y openjdk-11-jdk-headless wget git curl && rm -rf /var/cache/apt/ 49.5s
 => [ 3/13] RUN git clone https://github.com/debezium/debezium   6.0s
 => [ 4/13] WORKDIR /debezium                                    0.1s
 => [ 5/13] RUN ./mvnw clean install -DskipITs -DskipTests     761.4s
 => [ 6/13] RUN git clone https://github.com/debezium/debezium-server debezium-server-build                                            1.1s
 => [ 7/13] WORKDIR /debezium-server-build                       0.0s
 => [ 8/13] RUN ./mvnw package -DskipITs -DskipTests -Passembly372.1s
 => [ 9/13] RUN tar -xzvf debezium-server-dist/target/debezium-server-dist-*.tar.gz -C /   2.0s
 => [10/13] WORKDIR /debezium-server                             0.0s
 => [11/13] RUN mkdir data                                       0.5s
 => exporting to image                                          14.0s => => exporting layers                                          14.0s
 => => writing image sha256:51d987a3bf905f35be87ce649099e76c13277d75c4ac26972868fc9af2617d14                                                                0.0s
 => => naming to docker.io/library/debezium-server               0.0s
[+] Building 41.8s (11/11) FINISHED
 => [internal] load build definition from Dockerfile             0.0s
 => => transferring dockerfile: 302B                             0.0s
 => [internal] load .dockerignore                                0.0s
 => => transferring context: 2B                                  0.0s
 => [internal] load metadata for docker.io/library/debian:bullseye-slim                           0.3s
 => [internal] load build context                                0.0s
 => => transferring context: 39B                                 0.0s
 => CACHED [1/6] FROM docker.io/library/debian:bullseye-slim@sha256:9404b05bd09b57c76eccc0c5505b3c88b5feccac808d9b193a4fbac87bb                              0.0s
 => [2/6] RUN apt update && apt upgrade -y && apt install -y python3 python3-pip && rm -rf /var/cache/apt/*                          33.5s
 => [3/6] WORKDIR /app                                           0.0s
 => [4/6] COPY todo_generator.py /app/                           0.0s
 => [5/6] RUN pip3 install -U pip wheel                          2.0s
 => [6/6] RUN pip3 install psycopg2-binary                       1.1s
 => exporting to image                                           4.9s
 => => exporting layers                                          4.9s
 => => writing image sha256:6424a08a9dedb77b798610a0b87c1c0a0c5f910039d03d673b3cf47ac54c10de                                                                0.0s
 => => naming to docker.io/library/todo-generator                0.0s
Enter fullscreen mode Exit fullscreen mode

Step 2: Start the Memphis.dev broker and REST gateway. Note that the memphis-rest-gateway service depends on the memphis broker service, so the broker service will be started as well.

$ docker compose up -d memphis-rest-gateway
Enter fullscreen mode Exit fullscreen mode
[+] Running 4/4
 ⠿ Network postgres-debezium-cdc-example_default                   Created                                                       0.1s
 ⠿ Container postgres-debezium-cdc-example-memphis-metadata-1      Healthy                                                       6.1s
 ⠿ Container postgres-debezium-cdc-example-memphis-1               Health...                                                    16.9s
 ⠿ Container postgres-debezium-cdc-example-memphis-rest-gateway-1  Started                                                      17.3s
Enter fullscreen mode Exit fullscreen mode

Step 3: Follow the instructions for configuring Memphis.dev with a new station (todo-cdc-events) and user (todocdcservice) using the web UI.

Point your browser at http://localhost:9000/. Click the “sign in with root” link at the bottom of the page.

sign in

Log in with root (username) and memphis (password).

memphis root

Follow the wizard to create a station named todo-cdc-events.

memphis ui

Create a user named todocdcservice with the same value for the password.

userapp

Click “next” until the wizard is finished:

Image description

Click “Go to station overview” to go to the station overview page.

Image description

Step 4: Start the printing consumer:

$ docker compose up -d printing-consumer
Enter fullscreen mode Exit fullscreen mode
[+] Running 3/3
 ⠿ Container postgres-debezium-cdc-example-memphis-metadata-1  H...                                                              0.6s
 ⠿ Container postgres-debezium-cdc-example-memphis-1           Healthy                                                           1.1s
 ⠿ Container printing-consumer                                 Started           
Enter fullscreen mode Exit fullscreen mode

Start the todo item generator, PostgreSQL database, and Debezium Server:

$ docker compose up -d todo-generator
Enter fullscreen mode Exit fullscreen mode
[+] Running 7/7
 ⠿ Container postgres                                              Healthy                                                       7.9s
 ⠿ Container postgres-debezium-cdc-example-memphis-metadata-1      Healthy                                                       0.7s
 ⠿ Container postgres-debezium-cdc-example-memphis-1               Health...                                                     1.2s
 ⠿ Container postgres-debezium-cdc-example-memphis-rest-gateway-1  Running                                                       0.0s
 ⠿ Container database-setup                                        Exited                                                        6.8s
 ⠿ Container debezium-server                                       Healthy                                                      12.7s
 ⠿ Container todo-generator                                        Started      
Enter fullscreen mode Exit fullscreen mode

Note that the todo item generator depends on the other services and will start them automatically. The database-setup container will run once to create the database, tables, and role in PostgreSQL.

Lastly, confirm the system is working. Check the todo-cdc-events station overview screen in the Memphis.dev web UI to confirm that the producer and consumer are connected and messages are being delivered.

Image description
And, print the logs for the printing-consumer container:

$ docker logs --tail 2 printing-consumer
Enter fullscreen mode Exit fullscreen mode
message:  bytearray(b'{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"item_id"},{"type":"string","optional":false,"field":"description"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"creation_date"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"due_date"},{"type":"boolean","optional":false,"field":"completed"}],"optional":true,"name":"tutorial.public.todo_items.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"item_id"},{"type":"string","optional":false,"field":"description"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"creation_date"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"due_date"},{"type":"boolean","optional":false,"field":"completed"}],"optional":true,"name":"tutorial.public.todo_items.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"tutorial.public.todo_items.Envelope","version":1},"payload":{"before":null,"after":{"item_id":205,"description":"ERJGCHXXOBBGSMOUQSMB","creation_date":1682991115063809,"due_date":null,"completed":false},"source":{"version":"2.3.0-SNAPSHOT","connector":"postgresql","name":"tutorial","ts_ms":1682991115065,"snapshot":"false","db":"todo_application","sequence":"[\\"26715784\\",\\"26715784\\"]","schema":"public","table":"todo_items","txId":945,"lsn":26715784,"xmin":null},"op":"c","ts_ms":1682991115377,"transaction":null}}')
message:  bytearray(b'{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"item_id"},{"type":"string","optional":false,"field":"description"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"creation_date"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"due_date"},{"type":"boolean","optional":false,"field":"completed"}],"optional":true,"name":"tutorial.public.todo_items.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"item_id"},{"type":"string","optional":false,"field":"description"},{"type":"int64","optional":false,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"creation_date"},{"type":"int64","optional":true,"name":"io.debezium.time.MicroTimestamp","version":1,"field":"due_date"},{"type":"boolean","optional":false,"field":"completed"}],"optional":true,"name":"tutorial.public.todo_items.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"tutorial.public.todo_items.Envelope","version":1},"payload":{"before":null,"after":{"item_id":206,"description":"KXWQYXRWCGSKTBJOJFSX","creation_date":1682991115566896,"due_date":1683250315566896,"completed":false},"source":{"version":"2.3.0-SNAPSHOT","connector":"postgresql","name":"tutorial","ts_ms":1682991115568,"snapshot":"false","db":"todo_application","sequence":"[\\"26715992\\",\\"26715992\\"]","schema":"public","table":"todo_items","txId":946,"lsn":26715992,"xmin":null},"op":"c","ts_ms":1682991115885,"transaction":null}}')
Enter fullscreen mode Exit fullscreen mode

Congratulations! You now have a working example of how to capture and transfer data change events from a PostgreSQL database into Memphis.dev using Debezium Server.


check out part 2:Change Data Capture (CDC) for MongoDB with Debezium and Memphis.dev


Join 4500+ others and sign up for our data engineering newsletter.

Originally published at Memphis.dev By RJ Nowling, Developer advocate at Memphis.dev

Follow Us to get the latest updates!
GithubDocsDiscord

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .