Building a Reactive System with Kafka and Spring WebFlux

DevCorner - Feb 11 - - Dev Community

Kafka is a distributed event streaming platform that works seamlessly with Spring WebFlux to build reactive, non-blocking microservices. In this guide, we'll walk through step-by-step how to integrate Kafka with Spring WebFlux for reactive messaging.


1. Why Kafka with Spring WebFlux?

Using Kafka with WebFlux allows us to:

✅ Process messages asynchronously and reactively.

✅ Handle high-throughput event streams efficiently.

✅ Scale consumers using Kafka’s consumer groups.

✅ Avoid blocking threads, improving resource efficiency.


2. Setting Up Kafka in Spring Boot

Step 1: Add Dependencies

Add the following dependencies in pom.xml:

<dependencies>
    <!-- Spring Boot WebFlux -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>

    <!-- Spring Kafka -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    <!-- Reactive Kafka -->
    <dependency>
        <groupId>io.projectreactor.kafka</groupId>
        <artifactId>reactor-kafka</artifactId>
        <version>1.3.19</version>
    </dependency>

    <!-- Lombok for reducing boilerplate code -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <scope>provided</scope>
    </dependency>
</dependencies>
Enter fullscreen mode Exit fullscreen mode

Step 2: Configure Kafka in application.yml

Create a configuration file for Kafka:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: reactive-group
      auto-offset-reset: earliest
    producer:
      retries: 3
      acks: all
Enter fullscreen mode Exit fullscreen mode

Note: Make sure Kafka and Zookeeper are running on your system.


3. Creating a Reactive Kafka Producer

Step 1: Define a Model Class

Create a simple DTO (Data Transfer Object) for Kafka messages:

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class MessageDTO {
    private String id;
    private String message;
}
Enter fullscreen mode Exit fullscreen mode

Step 2: Implement Kafka Producer Service

Create a non-blocking producer using KafkaSender from Reactor Kafka:

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

@Service
public class ReactiveKafkaProducer {
    private final KafkaTemplate<String, String> kafkaTemplate;

    public ReactiveKafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public Mono<String> sendMessage(String topic, String message) {
        return Mono.fromFuture(() -> kafkaTemplate.send(topic, message).completable())
                   .map(result -> "Message sent to " + result.getRecordMetadata().topic());
    }
}
Enter fullscreen mode Exit fullscreen mode

Step 3: Create a Reactive REST API for Sending Messages

Expose an endpoint in ReactiveKafkaController.java:

import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Mono;

@RestController
@RequestMapping("/kafka")
public class ReactiveKafkaController {
    private final ReactiveKafkaProducer kafkaProducer;

    public ReactiveKafkaController(ReactiveKafkaProducer kafkaProducer) {
        this.kafkaProducer = kafkaProducer;
    }

    @PostMapping("/send")
    public Mono<String> sendMessage(@RequestParam String topic, @RequestParam String message) {
        return kafkaProducer.sendMessage(topic, message);
    }
}
Enter fullscreen mode Exit fullscreen mode

Test it using cURL or Postman:

curl -X POST "http://localhost:8080/kafka/send?topic=reactive-topic&message=Hello Kafka"
Enter fullscreen mode Exit fullscreen mode

4. Creating a Reactive Kafka Consumer

Step 1: Implement Kafka Consumer Service

Use KafkaReceiver from Reactor Kafka for non-blocking message consumption:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

@Service
public class ReactiveKafkaConsumer {

    public Flux<String> consumeMessages(String topic) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "reactive-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(props)
                .subscription(Collections.singleton(topic));

        return KafkaReceiver.create(receiverOptions)
                            .receive()
                            .map(record -> "Received: " + record.value());
    }
}
Enter fullscreen mode Exit fullscreen mode

Step 2: Expose a Reactive API for Consuming Messages

Create an endpoint in ReactiveKafkaController.java:

import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;

@RestController
@RequestMapping("/kafka")
public class ReactiveKafkaController {
    private final ReactiveKafkaConsumer kafkaConsumer;

    public ReactiveKafkaController(ReactiveKafkaConsumer kafkaConsumer) {
        this.kafkaConsumer = kafkaConsumer;
    }

    @GetMapping("/consume")
    public Flux<String> consumeMessages(@RequestParam String topic) {
        return kafkaConsumer.consumeMessages(topic);
    }
}
Enter fullscreen mode Exit fullscreen mode

Test the consumer using cURL:

curl -X GET "http://localhost:8080/kafka/consume?topic=reactive-topic"
Enter fullscreen mode Exit fullscreen mode

5. Testing the Full Reactive Kafka Workflow

  1. Start Kafka and Zookeeper:
   bin/zookeeper-server-start.sh config/zookeeper.properties
   bin/kafka-server-start.sh config/server.properties
Enter fullscreen mode Exit fullscreen mode
  1. Create a Kafka topic:
   bin/kafka-topics.sh --create --topic reactive-topic --bootstrap-server localhost:9092
Enter fullscreen mode Exit fullscreen mode
  1. Send a message:
   curl -X POST "http://localhost:8080/kafka/send?topic=reactive-topic&message=Hello Kafka"
Enter fullscreen mode Exit fullscreen mode
  1. Start consuming messages:
   curl -X GET "http://localhost:8080/kafka/consume?topic=reactive-topic"
Enter fullscreen mode Exit fullscreen mode

6. Key Takeaways

Fully non-blocking Kafka producer and consumer using Spring WebFlux.

Backpressure handled via Reactor Kafka.

Scalable event-driven architecture for microservices.


Next Steps

Now that we’ve covered Kafka + Spring WebFlux, I'll create a similar guide for RabbitMQ + Spring WebFlux next. Let me know if you want any additional details!

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .