Testing Kafka Streams applications

Abhishek Gupta - Mar 16 '20 - - Dev Community

The previous blog posts in the Kafka Streams series covered Stateless and Stateful operations in the DSL API. In this blog, we will explore a few examples to demonstrate how to use the testing utilities to validate topologies based on the Kafka Streams DSL API.

Kafka Streams provides testing utilities to execute unit tests for your stream processing pipelines without having to rely on an external or embedded Kafka cluster. In addition to testing, these utilities also serve as a great learning tool to grok various API features.

Let's start with a high-level overview of testing related APIs

Code is available on GitHub and tests can be executed by cloning the repo followed by mvn test

Key concepts

Initially, there were a few classes in the org.apache.kafka.streams.test package. They are now deprecated in favor of the following classes

TestInputTopic

An instance of TestInputTopic represents an input topic and you can send records to it using the pipeInput method (and its overloaded versions). Create TestInputTopic instances using TopologyTestDriver (explained below) and use custom serializers if needed. You can then send key-value pairs, just values one at a time or in a batch (using a List)

TestOutputTopic

TestOutputTopic is the other half of the send-receive equation and complements a TestInputTopic. You can use it to read records from output topics that your topology operations write to. Its methods include reading records (key-values pairs), only the value, querying the size (no. of current records which have not been consumed), etc.

TopologyTestDriver

TopologyTestDriver contains a reference to the Topology as well the configuration related to your Kafka Streams application. As mentioned earlier, it is used to create instances of TestInputTopic,TestOutputTopic, provide access to state stores etc.

High level flow

If you're using Maven, you can include the testing utility as a dependency

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams-test-utils</artifactId>
            <version>2.4.0</version>
            <scope>test</scope>
        </dependency>
Enter fullscreen mode Exit fullscreen mode

and you will (most likely) use JUnit and hamcrest to write matching rules...

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.hamcrest</groupId>
            <artifactId>hamcrest-core</artifactId>
            <version>1.3</version>
            <scope>test</scope>
        </dependency>
Enter fullscreen mode Exit fullscreen mode

Here is how a test case might look like (similar to how you would unit test any Java code with JUnit etc.)

  • setup global state (if any) using @BeforeClass annotated method
  • setup state for each test run using @Before annotated method - this is where you would typically create TopologyTestDriver etc.
  • @Test methods which validate the Topology
  • @After (and/or @AfterClass) methods for tearing down any state (be it global or otherwise)

Please ensure that you call TopologyTestDriver.close() to clean up processors in the topology and their associated state. Failure to do so might result in test failures due to inconsistent state

Now that you have an understanding of the concepts and basic setup, let's look at a few concrete examples. We'll start off with stateless operations

Testing stateless operations

filter

Here is the Topology which uses the filter method to only allow values which have a length greater than five.

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> stream = builder.stream(INPUT_TOPIC);

        stream.filter((k, v) -> v.length() > 5).to(OUTPUT_TOPIC);
Enter fullscreen mode Exit fullscreen mode

And here is the corresponding test:

    @Test
    public void shouldIncludeValueWithLengthGreaterThanFive() {

        topology = App.retainWordsLongerThan5Letters();
        td = new TopologyTestDriver(topology, config);

        inputTopic = td.createInputTopic(App.INPUT_TOPIC, Serdes.String().serializer(), Serdes.String().serializer());
        outputTopic = td.createOutputTopic(App.OUTPUT_TOPIC, Serdes.String().deserializer(), Serdes.String().deserializer());

        assertThat(outputTopic.isEmpty(), is(true));

        inputTopic.pipeInput("key1", "barrrrr");
        assertThat(outputTopic.readValue(), equalTo("barrrrr"));
        assertThat(outputTopic.isEmpty(), is(true));

        inputTopic.pipeInput("key2", "bar");
        assertThat(outputTopic.isEmpty(), is(true));
    }
Enter fullscreen mode Exit fullscreen mode

We start by choosing the Topology we want to test, create the TopologyTestDriver instance along with the TestInputTopic and TestOutputTopic objects.

Next, we confirm whether the output topic is empty before sending any data - assertThat(outputTopic.isEmpty(), is(true));

Now data/records can be sent to the input topic using inputTopic.pipeInput("key1", "barrrrr"); This is a synchronous process and triggers the Topology which in this case executes the filter operation and pushes this to the output topic since the value length is more than five. We confirm the same using assertThat(outputTopic.readValue(), equalTo("barrrrr")); and double check to see whether the output topic is empty

Finally, we send the value bar and confirm that it was not was sent to the output topic because its length is smaller than five.

flatMap

As explained in part 1 of this series (stateless operations), here is a flatMap operation

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> stream = builder.stream(INPUT_TOPIC);

        stream.flatMap(new KeyValueMapper<String, String, Iterable<? extends KeyValue<? extends String, ? extends String>>>() {
            @Override
            public Iterable<? extends KeyValue<? extends String, ? extends String>> apply(String k, String csv) {
                String[] values = csv.split(",");
                return Arrays.asList(values)
                        .stream()
                        .map(value -> new KeyValue<>(k, value))
                        .collect(Collectors.toList());
            }
        }).to(OUTPUT_TOPIC);
Enter fullscreen mode Exit fullscreen mode

In the above example, each record in the stream gets flatMapped such that each CSV (comma separated) value is first split into its constituents and a KeyValue pair is created for each part of the CSV string.

To test this....

        topology = App.flatMap();
        td = new TopologyTestDriver(topology, config);

        inputTopic = td.createInputTopic(App.INPUT_TOPIC, Serdes.String().serializer(), Serdes.String().serializer());
        outputTopic = td.createOutputTopic(App.OUTPUT_TOPIC, Serdes.String().deserializer(), Serdes.String().deserializer());

        inputTopic.pipeInput("random", "foo,bar,baz");
        inputTopic.pipeInput("hello", "world,universe");
        inputTopic.pipeInput("hi", "there");

        assertThat(outputTopic.getQueueSize(), equalTo(6L));

        assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("random", "foo")));
        assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("random", "bar")));
        assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("random", "baz")));

        assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("hello", "world")));
        assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("hello", "universe")));

        assertThat(outputTopic.readKeyValue(), equalTo(new KeyValue<>("hi", "there")));

        assertThat(outputTopic.isEmpty(), is(true));
Enter fullscreen mode Exit fullscreen mode

As usual, we setup the required test util classes and push input records to the input topic. e.g. for key random and its comma-separated values foo,bar,baz will be split into individual key-value pairs i.e. they will result in three records being pushed to the output table. The same applies to other input records as well.

We confirm the number of recorsd in the putput topic assertThat(outputTopic.getQueueSize(), equalTo(6L)); and validate each key-value pair to confirm the flatMap behavior

Stateful operation without State store

Here is an example of a Topology which uses groupByKey followed by count and stores the results in an output topic

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> stream = builder.stream(INPUT_TOPIC);

        stream.groupByKey()
                .count()
                .toStream()
                .to(OUTPUT_TOPIC);
Enter fullscreen mode Exit fullscreen mode

Testing a stateful operation is not very different than that of a stateless one.

        topology = App.count();
        td = new TopologyTestDriver(topology, config);

        inputTopic = td.createInputTopic(App.INPUT_TOPIC, Serdes.String().serializer(), Serdes.String().serializer());
        TestOutputTopic<String, Long> ot = td.createOutputTopic(App.OUTPUT_TOPIC, Serdes.String().deserializer(), Serdes.Long().deserializer());

        inputTopic.pipeInput("key1", "value1");
        inputTopic.pipeInput("key1", "value2");
        inputTopic.pipeInput("key2", "value3");
        inputTopic.pipeInput("key3", "value4");
        inputTopic.pipeInput("key2", "value5");

        assertThat(ot.readKeyValue(), equalTo(new KeyValue<String, Long>("key1", 1L)));
        assertThat(ot.readKeyValue(), equalTo(new KeyValue<String, Long>("key1", 2L)));
        assertThat(ot.readKeyValue(), equalTo(new KeyValue<String, Long>("key2", 1L)));
        assertThat(ot.readKeyValue(), equalTo(new KeyValue<String, Long>("key3", 1L)));
        assertThat(ot.readKeyValue(), equalTo(new KeyValue<String, Long>("key2", 2L)));
Enter fullscreen mode Exit fullscreen mode

Individual records are sent to an input topic and the output topic and then the counts are validated. As expected, keys key1, key2 and key3 have counts 2, 2, 1 respectively.

Stateful operation with a State store

Things get interesting when Topology consists of a state store. In this example, instead of sending the counds to an output topic, an intermediate state store is used (this can be queried via Interactive Queries)

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> stream = builder.stream(INPUT_TOPIC);

        stream.groupByKey().count(Materialized.as("count-store"));
Enter fullscreen mode Exit fullscreen mode

TopologyTestDriver provides access to the state store (KeyValueStore) via getKeyValueStore. The state store count is validated after each record is sent to the input topic e.g. assertThat(countStore.get("key1"), equalTo(1L));

        topology = App.countWithStateStore();
        td = new TopologyTestDriver(topology, config);

        inputTopic = td.createInputTopic(App.INPUT_TOPIC, Serdes.String().serializer(), Serdes.String().serializer());

        KeyValueStore<String, Long> countStore = td.getKeyValueStore("count-store");

        inputTopic.pipeInput("key1", "value1");
        assertThat(countStore.get("key1"), equalTo(1L));

        inputTopic.pipeInput("key1", "value2");
        assertThat(countStore.get("key1"), equalTo(2L));

        inputTopic.pipeInput("key2", "value3");
        assertThat(countStore.get("key2"), equalTo(1L));

        inputTopic.pipeInput("key3", "value4");
        assertThat(countStore.get("key3"), equalTo(1L));

        inputTopic.pipeInput("key2", "value5");
        assertThat(countStore.get("key2"), equalTo(2L));
Enter fullscreen mode Exit fullscreen mode

Note that in our tests, we had created the Topology, TopologyTestDriver, TestInputTopic and TestOutputTopic in each of the tests method. This was simply because we were testing different Topologies. If you were testing a single Topology using a bunch of test cases as a part of single JUnit class, you can very easily move this to setup method annotated with @Before so that its runs automatically before the start of each test case

That's all for now! This was a short but hopefully useful introduction to testing your Kafka Streams based processing pipelines.

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .