Ingesting data from Kafka to CockroachDB via Kafka Connect

Fabio Ghirardello - Dec 21 '22 - - Dev Community

In this brief write-up, I demonstrate how to build a working pipeline to ingest Kafka records into CockroachDB via Kafka Connect's JDBC Sink Connector, locally, using Docker.

This serves as functional testing, that is, for understanding the concepts and all moving parts before moving on to performance testing, on real Production grade clusters.
Stay tuned for a follow-up blog where we will use this content to build and run a performance test.
UPDATE: performance test is here!

Setup

Save below as file kafka2crdb.yaml.

# kafka2crdb.yaml
---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - 2181:2181
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-server:7.3.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
      - 9101:9101
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  schema-registry:
    image: confluentinc/cp-schema-registry:7.3.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - 8081:8081
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

  kafka-connect:
    image: cnfldemos/cp-server-connect-datagen:0.6.0-7.3.0
    hostname: connect
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8083:8083
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.3.0.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
    command: 
      - bash 
      - -c 
      - |
        # Installing connector plugin
        confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest
        # Downloading newer Postgres JDBC driver
        cd /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib
        rm -rf postgresql*
        wget https://jdbc.postgresql.org/download/postgresql-42.5.0.jar 
        # Launching Kafka Connect worker
        /etc/confluent/docker/run &
        sleep infinity

  control-center:
    image: confluentinc/cp-enterprise-control-center:7.3.0
    hostname: control-center
    container_name: control-center
    depends_on:
      - broker
      - schema-registry
      - kafka-connect
    ports:
      - 9021:9021
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
      CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
      CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

  cockroach:
    image: cockroachdb/cockroach:latest
    container_name: cockroach-1
    command: start-single-node --insecure
    ports:
      - 26257:26257
      - 8080:8080
Enter fullscreen mode Exit fullscreen mode

Notice in the kafka-connect definition how we're installing the latest version of the JDBC Sink Connector and update the JDBC Postgresql driver.

Bring the Docker Compose up.

Note:
You might have to tweak your Docker env to allow for more resources.
I use Docker Desktop for macOS configured with 12 CPUs and 22GB Memory.

docker-compose -f kafka2crdb.yaml up -d
Enter fullscreen mode Exit fullscreen mode

Make sure everything is up and running

$ docker-compose -f kafka2crdb.yaml ps
NAME                COMMAND                  SERVICE             STATUS              PORTS
broker              "/etc/confluent/dock…"   broker              running             0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp
cockroach-1         "/cockroach/cockroac…"   cockroach           running             0.0.0.0:8080->8080/tcp, 0.0.0.0:26257->26257/tcp
connect             "bash -c '# Installi…"   kafka-connect       running             0.0.0.0:8083->8083/tcp, 9092/tcp
control-center      "/etc/confluent/dock…"   control-center      running             0.0.0.0:9021->9021/tcp
schema-registry     "/etc/confluent/dock…"   schema-registry     running             0.0.0.0:8081->8081/tcp
zookeeper           "/etc/confluent/dock…"   zookeeper           running             2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
Enter fullscreen mode Exit fullscreen mode

Open the Control Center at http://localhost:9021 and wait until the broker shows up as healthy.

Open another tab for the CockroachDB Console at http://localhost:8080.

Demo

We create the below pipeline:

datagen(source) --> Kafka Topic --> Kafka JDBC Sink Connector --> CockroachDB(target)

You can do most of below tasks from within the Control Center.

Create Kafka topic

Connect to the broker container, and create a topic transactions with 4 partitions

docker exec -it broker /bin/bash
Enter fullscreen mode Exit fullscreen mode

Once in the broker shell

# $ create topic with 4 partitions
kafka-topics --bootstrap-server broker:9092 --create --topic transactions --partitions 4 

# describe it
$ kafka-topics --bootstrap-server broker:9092 --topic transactions --describe
Topic: transactions     TopicId: F6NF00Z8Ry2xIWnbYVF9vg PartitionCount: 4       ReplicationFactor: 1    Configs: 
        Topic: transactions     Partition: 0    Leader: 1       Replicas: 1     Isr: 1  Offline: 
        Topic: transactions     Partition: 1    Leader: 1       Replicas: 1     Isr: 1  Offline: 
        Topic: transactions     Partition: 2    Leader: 1       Replicas: 1     Isr: 1  Offline: 
        Topic: transactions     Partition: 3    Leader: 1       Replicas: 1     Isr: 1  Offline: 
Enter fullscreen mode Exit fullscreen mode

With the topic partitioned into 4, we can have an equal amount of consumer processes, or tasks, consuming from the topic and ingesting into CockroachDB, in parallel.

All set, now we are ready to ingest data into this topic.

Configure the Source Connector

Our source will be a built-in data generator.

Open a new terminal, and create the Datagen Connector.

# the datagen-connect container listens on port 8083
curl -s -X POST http://localhost:8083/connectors/ \
    -H "Content-Type: application/json" -d '{
        "name": "datagen-transactions",
        "config": {
            "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "kafka.topic": "transactions",
            "tasks.max": "64",
            "max.interval": "100",
            "quickstart": "transactions"
        }
    }' | jq '.'
Enter fullscreen mode Exit fullscreen mode

Confirm in the Control Center that messages are getting generated by visiting the Topics section.

topics

Check also on the Messages and Schema tabs to see what the records look like

messages

Configure the Sink Connector

We're ready to setup the JDBC Sink Connector to ingest data into CockroachDB.

Note how we set batch.size the same as max.poll.records to make sure 1 transaction includes only 128 records.

Because we've set reWriteBatchedInserts=true, the JDBC Postgres driver will conflate the 128 individual INSERT statements into a single, multi-record INSERT statement.

Note however that this single statement transaction is still an explicit transaction.

Note:
I've re-built the connector set with autocommit=true to leverage implicit transactions.
I have therefore replaced the original file kafka-connect-jdbc-10.6.1.jar in /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/ with my custom build JAR file.
Below screenshots show the results from using this custom build.

We also set tasks.max=4 to have 4 consumer processes reading from the 4 topic partitions and ingesting data into CockroachDB in parallel. Each task creates its own database connection and reads from a specific topic partition.

# register the connector 
curl -s -X POST http://localhost:8083/connectors/ \
     -H "Content-Type: application/json" -d '{
        "name": "sink-crdb",
            "config": {
            "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
            "connection.url": "jdbc:postgresql://cockroach-1:26257/defaultdb?reWriteBatchedInserts=true&ApplicationName=txns",
            "topics": "transactions",
            "tasks.max": "4",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "value.converter": "io.confluent.connect.avro.AvroConverter",
            "value.converter.schema.registry.url": "http://schema-registry:8081",
            "connection.user": "root",
            "connection.password": "",
            "auto.create": true,
            "auto.evolve": true,
            "insert.mode": "insert",
            "pk.mode": "none",
            "pk.fields": "none",
            "batch.size": 128,
            "consumer.override.max.poll.records": 128
            }
    }' | jq '.'
Enter fullscreen mode Exit fullscreen mode

You should see now 4 active SQL connections established to CockroachDB, and activity on the SQL Statement chart

statements-chart

Here, in CockroachDB Console in the SQL Activity > Transaction page, we confirm each transaction writes 128 rows.

txn

Drilling down into the transaction, we can see of how many statements it is composed. In this case, as expected, it is just 1 INSERT statement

txn-description

Further drilling down into the Statement itself, we can see this is a multi-record INSERT statements with 128 records, executed as an implicit transaction.

stmt-description

Query data in CockroachDB

Open a SQL prompt

docker exec -it cockroach-1 cockroach sql --insecure
Enter fullscreen mode Exit fullscreen mode
-- the table was automatically created by the Kafka JDBC Sink Connector 
-- this behavior is of course configurable
> SHOW TABLES;
  schema_name |  table_name  | type  | owner | estimated_row_count | locality
--------------+--------------+-------+-------+---------------------+-----------
  public      | transactions | table | root  |                   0 | NULL

-- check the schema of the created table 
-- note how CockroachDB created its own Primary Key
-- this is also configurable
> SHOW CREATE transactions;
   table_name  |                      create_statement
---------------+--------------------------------------------------------------
  transactions | CREATE TABLE public.transactions (
               |     transaction_id INT8 NOT NULL,
               |     card_id INT8 NOT NULL,
               |     user_id STRING NOT NULL,
               |     purchase_id INT8 NOT NULL,
               |     store_id INT8 NOT NULL,
               |     rowid INT8 NOT VISIBLE NOT NULL DEFAULT unique_rowid(),
               |     CONSTRAINT transactions_pkey PRIMARY KEY (rowid ASC)
               | )


-- inspect few rows
> SELECT * FROM transactions LIMIT 5;
  transaction_id | card_id | user_id | purchase_id | store_id
-----------------+---------+---------+-------------+-----------
               1 |       7 | User_6  |           0 |        1
               1 |      19 | User_7  |           0 |        6
               1 |      18 | User_5  |           0 |        6
               1 |       6 | User_   |           0 |        3
               1 |      13 | User_   |           0 |        3

-- avoid contention by using AS OF SYSTEM TIME when running large aggregate queries
> SELECT count(*) FROM transactions AS OF SYSTEM TIME '-5s';
  count
----------
  594633

-- wait few seconds, then query again to see count increasing...
> SELECT count(*) FROM transactions AS OF SYSTEM TIME '-5s';
  count
----------
  594992
Enter fullscreen mode Exit fullscreen mode

You can view more metrics and statement statistics on the DB Console, at http://localhost:8080.

Clean up

Bring down and remove all containers with

docker-compose -f kafka2crdb.yaml down
Enter fullscreen mode Exit fullscreen mode

Reference

. . . . . . . . . . . . . . . . . . . . .