Kafka is a distributed event streaming platform that works seamlessly with Spring WebFlux to build reactive, non-blocking microservices. In this guide, we'll walk through step-by-step how to integrate Kafka with Spring WebFlux for reactive messaging.
1. Why Kafka with Spring WebFlux?
Using Kafka with WebFlux allows us to:
✅ Process messages asynchronously and reactively.
✅ Handle high-throughput event streams efficiently.
✅ Scale consumers using Kafka’s consumer groups.
✅ Avoid blocking threads, improving resource efficiency.
2. Setting Up Kafka in Spring Boot
Step 1: Add Dependencies
Add the following dependencies in pom.xml
:
<dependencies>
<!-- Spring Boot WebFlux -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- Spring Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Reactive Kafka -->
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>1.3.19</version>
</dependency>
<!-- Lombok for reducing boilerplate code -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
Step 2: Configure Kafka in application.yml
Create a configuration file for Kafka:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: reactive-group
auto-offset-reset: earliest
producer:
retries: 3
acks: all
Note: Make sure Kafka and Zookeeper are running on your system.
3. Creating a Reactive Kafka Producer
Step 1: Define a Model Class
Create a simple DTO (Data Transfer Object) for Kafka messages:
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MessageDTO {
private String id;
private String message;
}
Step 2: Implement Kafka Producer Service
Create a non-blocking producer using KafkaSender
from Reactor Kafka:
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
@Service
public class ReactiveKafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public ReactiveKafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public Mono<String> sendMessage(String topic, String message) {
return Mono.fromFuture(() -> kafkaTemplate.send(topic, message).completable())
.map(result -> "Message sent to " + result.getRecordMetadata().topic());
}
}
Step 3: Create a Reactive REST API for Sending Messages
Expose an endpoint in ReactiveKafkaController.java
:
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Mono;
@RestController
@RequestMapping("/kafka")
public class ReactiveKafkaController {
private final ReactiveKafkaProducer kafkaProducer;
public ReactiveKafkaController(ReactiveKafkaProducer kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}
@PostMapping("/send")
public Mono<String> sendMessage(@RequestParam String topic, @RequestParam String message) {
return kafkaProducer.sendMessage(topic, message);
}
}
✅ Test it using cURL or Postman:
curl -X POST "http://localhost:8080/kafka/send?topic=reactive-topic&message=Hello Kafka"
4. Creating a Reactive Kafka Consumer
Step 1: Implement Kafka Consumer Service
Use KafkaReceiver
from Reactor Kafka for non-blocking message consumption:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@Service
public class ReactiveKafkaConsumer {
public Flux<String> consumeMessages(String topic) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "reactive-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(props)
.subscription(Collections.singleton(topic));
return KafkaReceiver.create(receiverOptions)
.receive()
.map(record -> "Received: " + record.value());
}
}
Step 2: Expose a Reactive API for Consuming Messages
Create an endpoint in ReactiveKafkaController.java
:
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
@RestController
@RequestMapping("/kafka")
public class ReactiveKafkaController {
private final ReactiveKafkaConsumer kafkaConsumer;
public ReactiveKafkaController(ReactiveKafkaConsumer kafkaConsumer) {
this.kafkaConsumer = kafkaConsumer;
}
@GetMapping("/consume")
public Flux<String> consumeMessages(@RequestParam String topic) {
return kafkaConsumer.consumeMessages(topic);
}
}
✅ Test the consumer using cURL:
curl -X GET "http://localhost:8080/kafka/consume?topic=reactive-topic"
5. Testing the Full Reactive Kafka Workflow
- Start Kafka and Zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
- Create a Kafka topic:
bin/kafka-topics.sh --create --topic reactive-topic --bootstrap-server localhost:9092
- Send a message:
curl -X POST "http://localhost:8080/kafka/send?topic=reactive-topic&message=Hello Kafka"
- Start consuming messages:
curl -X GET "http://localhost:8080/kafka/consume?topic=reactive-topic"
6. Key Takeaways
✅ Fully non-blocking Kafka producer and consumer using Spring WebFlux.
✅ Backpressure handled via Reactor Kafka.
✅ Scalable event-driven architecture for microservices.
Next Steps
Now that we’ve covered Kafka + Spring WebFlux, I'll create a similar guide for RabbitMQ + Spring WebFlux next. Let me know if you want any additional details!