Learn stream processing with Kafka Streams: Stateless operations

Abhishek Gupta - Mar 5 '20 - - Dev Community

Kafka Streams is a Java library for developing stream processing applications on top of Apache Kafka. This is the first in a series of blog posts on Kafka Streams and its APIs.

This is not a "theoretical guide" about Kafka Stream (although I have covered some of those aspects in the past)

In this part, we will cover stateless operations in the Kafka Streams DSL API - specifically, the functions available in KStream such as filter, map, groupBy etc. The DSL API in Kafka Streams offers a powerful, functional style programming model to define stream processing topologies. Please note that the KTable API also offers stateless functions and what's covered in this post will be applicable in that case as well (more or less)

The APIs (KStream etc.) referenced in this post can be found in the Kafka Streams javadocs

The setup

To start things, you need to create a KafkaStreams instance. It needs a Topology and related configuration (in the form of a java.util.Properties)

Set the required configuration for your Kafka streams app:

Properties config = new Properties();

config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, App.APP_ID);
config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
Enter fullscreen mode Exit fullscreen mode

We can then build a Topology which defines the processing pipeline (the rest of this blog post will focus on the stateless parts of a topology)

You can create the KafkaStreams instance and start processing

KafkaStreams app = new KafkaStreams(topology, config);
app.start();
new CountdownLatch(1).await(); // wait forever
Enter fullscreen mode Exit fullscreen mode

Stateless operations using KStream

I generally like categorizing things into buckets - helps me "divide and conquer". I have tried the same in this case by dividing various KStream operations into filter, map etc.

Let's dig in!

filter

You can use filter to omit or include records based on a criteria. For example, if the value sent to a topic contains a word and you want to include the ones which are greater than a specified length. You can define this criteria using a a Predicate and pass it to the filter method - this will create a new KStream instance with the filtered records

KStream<String, String> stream = builder.stream("words");
stream.filter(new Predicate<String, String>() {
    @Override
    public boolean test(String k, String v) {
            return v.length() > 5;
        }
    })
Enter fullscreen mode Exit fullscreen mode

It is also possible to use filterNot if you want to exclude records based on a criteria. Here is a lambda style example:

KStream<String, String> stream = builder.stream("words");
stream.filterNot((key,value) -> value.startsWith("foo"));
Enter fullscreen mode Exit fullscreen mode

map

A commonly used stateless operation is map. In case of Kafka Streams, it can be used to transform each record in the input KStream by applying a mapper function

This is available in multiple flavors - map, mapValues, flatMap, flatMapValues

Simply use the map method if you want to alter both key and the value. For e.g., to convert key and value to uppercase

stream.map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
    @Override
    public KeyValue<String, String> apply(String k, String v) {
            return new KeyValue<>(k.toUpperCase(), v.toUpperCase());
        }
    });
Enter fullscreen mode Exit fullscreen mode

Use mapValues if all you want to alter is the value:

stream.mapValues(value -> value.toUpperCase());
Enter fullscreen mode Exit fullscreen mode

flatMap similar to map, but it allows you to return multiple records (KeyValues)

stream.flatMap(new KeyValueMapper<String, String, Iterable<? extends KeyValue<? extends String, ? extends String>>>() {
    @Override
    public Iterable<? extends KeyValue<? extends String, ? extends String>> apply(String k, String csv) {
        String[] values = csv.split(",");
        return Arrays.asList(values)
                    .stream()
                    .map(value -> new KeyValue<>(k, value))
                    .collect(Collectors.toList());
            }
    })
Enter fullscreen mode Exit fullscreen mode

In the above example, each record in the stream gets flatMapped such that each CSV (comma separated) value is first split into its constituents and a KeyValue pair is created for each part of the CSV string. For e.g. if you have these records (foo <-> a,b,c) and (bar <-> d,e) (where foo and bar are keys), the resulting stream will have five entries - (foo,a), (foo,b), (foo,c), (bar,d), (bar,e)

Use flatMapValues if you only want to accept a value from the stream and return a collection of values

group

If you want to perform stateful aggegations on the contents of a KStream, you will first need to group its records by their key to create a KGroupedStream.

we will cover stateful operations on KGroupedStream in subsequent blog posts in this series

Here is an example of how you can do this using groupByKey

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC); 

KGroupedStream<String,String> kgs = stream.groupByKey();
Enter fullscreen mode Exit fullscreen mode

A generalized version of groupByKey is groupBy which gives you the ability to group based on a different key using a KeyValueMapper

stream.groupBy(new KeyValueMapper<String, String, String>() {
    @Override
    public String apply(String k, String v) {
        return k.toUpperCase();
    }
});
Enter fullscreen mode Exit fullscreen mode

In both cases (groupByKey and groupBy), if you need to use a different Serde (Serializer and Deserializer) instead of the default ones, use the overloaded version which accepts a Grouped object

stream.groupByKey(Grouped.with(Serdes.Bytes(), Serdes.Long()));
Enter fullscreen mode Exit fullscreen mode

Terminal operations

A terminal operation in Kafka Streams is a method that returns void instead of an intermediate such as another KStream or KTable.

You can use the to method to store the records of a KStream to a topic in Kafka.

KStream<String, String> stream = builder.stream("words");

stream.mapValues(value -> value.toUpperCase())
      .to("uppercase-words");
Enter fullscreen mode Exit fullscreen mode

An overloaded version of to allows you to specify a Produced object to customize the Serdes and partitioner

stream.mapValues(value -> value.toUpperCase())
      .to("output-topic",Produced.with(Serdes.Bytes(), Serdes.Long()));
Enter fullscreen mode Exit fullscreen mode

Instead of specifying a static topic name, you can make use of a TopicNameExtractor and include any custom logic to choose a specific topic in a dynamic fashion

stream.mapValues(value -> value.toUpperCase())
    .to(new TopicNameExtractor<String, String>() {
        @Override
        public String extract(String k, String v, RecordContext rc) {
            return rc.topic()+"_uppercase";
        }
    });
Enter fullscreen mode Exit fullscreen mode

In this example, we make use of the RecordContext which contains the metadata of the record, to get the topic and append _uppercase to it

In all the above cases, the sink topic should pre-exist in Kafka

If you want to log the KStream records (for debugging purposes), use the print method. It accepts an instance of Printed to configure the behavior.

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);
stream.mapValues(v -> v.toUpperCase()).print(Printed.toSysOut());
Enter fullscreen mode Exit fullscreen mode

This will print out the records e.g. if you pass in (foo, bar) and (john,doe) to the input topic, they will get converted to uppercase and logged as such:

[KSTREAM-MAPVALUES-0000000001]: foo, BAR
[KSTREAM-MAPVALUES-0000000001]: john, DOE
Enter fullscreen mode Exit fullscreen mode

You can also use Printed.toFile (instead of toSysOut) to target a specific file

foreach method is similar to print and peek i.e.

  • it is also a terminal operation (like print)
  • and it accepts a ForeachAction (like peek)

Miscellaneous operations

Since print method is a terminal operation, you have the option of using peek which returns the same KStream instance! It accepts a ForeachAction which can use to specify what you want to do for each record e.g. log the key and value

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);

stream.mapValues(v -> v.toUpperCase())
      .peek((k,v) -> System.out.println("key="+k+", value="+v))
      .to(OUTPUT_TOPIC);
Enter fullscreen mode Exit fullscreen mode

In the above example, you will be able to see the key and values being logged and they will also be materialized to the output topic (unlike the print operation)

branch is a method which I have not used (to be honest!), but it looks quite interesting. It gives you the ability evaluate every record in a KStream against multiple criteria (represented by a Predicate) and output multiple (an array of) KStreams. The key here is that you can use multiple Predicates instead of a single one as is the case with filter and filterNot.

You can merge two KStreams together into a single one.

StreamsBuilder builder = new StreamsBuilder(); 

KStream<String, String> stream1 = builder.stream("topic1");
KStream<String, String> stream2 = builder.stream("topic2");

stream1.merge(stream2).to("output-topic");
Enter fullscreen mode Exit fullscreen mode

please note that the resulting stream may not have all the records in order

If you want to derive a new key (it can have a different type as well) for each record in your KStream, use the selectKey method which accepts a KeyValueMapper. selectKey is similar to map but the difference is that map restricts the return type to a KeyValue object

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);

stream.selectKey(new KeyValueMapper<String, String, String>() {
            @Override
            public String apply(String k, String v) {
                return k.toUpperCase();
            }
        })
Enter fullscreen mode Exit fullscreen mode

While developing your processing pipelines with Kafka Streams DSL, you will find yourself pushing resulting stream records to an output topic using to and then creating a new stream from that (output) topic i.e.

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream1 = builder.stream(INPUT_TOPIC);
stream1.mapValues(v -> v.toUpperCase()).to(OUTPUT_TOPIC);

//output topic now becomes the input source
KStream<String, String> stream2 = builder.stream(OUTPUT_TOPIC);

//continue processing with stream2
stream2.filter((k,v) -> v.length > 5).to(LENGTHY_WORDS_TOPIC);
Enter fullscreen mode Exit fullscreen mode

This can be simplified by using the through method. So you can rewrite the above as follows:

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(INPUT_TOPIC);

stream.mapValues(v -> v.toUpperCase())
      .through(OUTPUT_TOPIC)
      .filter((k,v) -> v.length > 5)
      .to(LENGTHY_WORDS_TOPIC);
Enter fullscreen mode Exit fullscreen mode

Here, we materialize the records (with upper case values) to an intermediate topic and continue processing (using filter in this case) and finally store post-filtration results in another topic

That's it for now. Stay tuned for upcoming posts in this series!

References

Please don't forget to check out the following resources for Kafka Streams

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .