Apache Spark-Structured Streaming :: Cab Aggregator Use-case

SNEHASISH DUTTA - Jun 30 - - Dev Community

Building helps you retain more knowledge.
But teaching helps you retain even more. Teaching is another modality that locks in the experience you gain from building.--Dan Koe

Objective

Imagine a very simple system that can automatically warn cab companies whenever a driver rejects a bunch of rides in a short time. This system would use Kafka to send ride information (accepted, rejected) and Spark Structured Streaming to analyze it in real-time. If a driver rejects too many rides, the system would trigger an alert so the cab company can investigate.

What is Spark Structured Streaming ?

Structured

Spark Structured Streaming is a powerful tool for processing data streams in real-time. It's built on top of Apache Spark SQL, which means it leverages the familiar DataFrame and Dataset APIs you might already use for batch data processing in Spark. This offers several advantages:

Unified Programming Model: You can use the same set of operations for both streaming and batch data, making it easier to develop and maintain code.

Declarative API: Spark Structured Streaming lets you describe what you want to achieve with your data processing, rather than writing complex low-level code to handle the streaming aspects.
Fault Tolerance: Spark Structured Streaming ensures your processing jobs can recover from failures without losing data. It achieves this through techniques like checkpointing and write-ahead logs.

Here's a breakdown of how Spark Structured Streaming works:

Streaming Data Source: Your data comes from a streaming source like Kafka, Flume, or custom code that generates a continuous stream of data.

Micro-Batching: Spark Structured Streaming breaks down the continuous stream into small chunks of data called micro-batches.

Structured Processing: Each micro-batch is processed like a regular DataFrame or Dataset using Spark SQL operations. This allows you to perform transformations, aggregations, and other data manipulations on the streaming data.

Updated Results: As new micro-batches arrive, the processing continues, and the results are constantly updated, reflecting the latest data in the stream.

Sinks: The final output can be written to various destinations like databases, dashboards, or other streaming systems for further analysis or action.

Benefits of Spark Structured Streaming:

Real-time Insights: Analyze data as it arrives, enabling quicker decision-making and proactive responses to events.

Scalability: Handles large volumes of streaming data efficiently by leveraging Spark's distributed processing capabilities.

Ease of Use: The familiar DataFrame/Dataset API makes it easier to develop and maintain streaming applications.

In essence, Spark Structured Streaming bridges the gap between batch processing and real-time analytics, allowing you to analyze data as it's generated and gain valuable insights from continuous data streams.

Project Architecture

Architecture

Extract From : Apache Kafka
Transform Using : Apache Spark
Load Into : Apache Kafka

Producer and Infrastructure

Repository : https://github.com/snepar/cab-producer-infra

It is a Simple Application which ingests data into Kafka
It ingests Random Events either Accepted or Rejected

Sample Event

{ 
   "id": 3949106,
   "event_date": 1719749696532,
   "tour_value": 29.75265579847153,
   "id_driver": 3,
   "id_passenger": 11,
   "tour_status": rejected
} 
Enter fullscreen mode Exit fullscreen mode

Start the Infrastructure

docker compose up
Enter fullscreen mode Exit fullscreen mode

Radom Events Generator

val statuses = List("accepted", "rejected")
    val status = statuses(Random.nextInt(statuses.length))
    while (true) {
      val topic = "ride"
      val r = scala.util.Random
      val id = r.nextInt(10000000)
      val tour_value = r.nextDouble() * 100
      val id_driver = r.nextInt(10)
      val id_passenger = r.nextInt(100)
      val event_date = System.currentTimeMillis
      val payload =
        s"""{"id":$id,"event_date":$event_date,"tour_value":$tour_value,"id_driver":$id_driver,"id_passenger":$id_passenger,"tour_status":"$status"}""".stripMargin

      EventProducer.send(topic, payload)
      Thread.sleep(1000)
    }
Enter fullscreen mode Exit fullscreen mode

Send Random Events to Producer

def send(topic: String, payload: String): Unit = {
    val record = new ProducerRecord[String, String](topic, key, payload)
    producer.send(record)
  }
Enter fullscreen mode Exit fullscreen mode

See the produced events from Topic named ride in the Docker Terminal

kafka-console-consumer --topic ride --bootstrap-server broker:9092
Enter fullscreen mode Exit fullscreen mode

Ride

Spark Structured Streaming Application

Repository : https://github.com/snepar/spark-streaming-cab

Create Spark Session to Execute the application locally ::

val spark = SparkSession.builder()
      .appName("Integrating Kafka")
      .master("local[2]")
      .getOrCreate()

spark.sparkContext.setLogLevel("WARN")
Enter fullscreen mode Exit fullscreen mode

Configure Reader and Writer - Kafka topics

    val kafkahost = "localhost:9092"
    val inputTopic = "ride"
    val outputTopic = "rejectalert"
    val props = new Properties()
    props.put("host", kafkahost)
    props.put("input_topic",inputTopic)
    props.put("output_host", kafkahost)
    props.put("output_topic",outputTopic)
    props.put("checkpointLocation","/tmp")
Enter fullscreen mode Exit fullscreen mode

Define Schema for the Events

val schema = StructType(Seq(
      StructField("id", IntegerType, nullable = true), 
      StructField("event_date", LongType, nullable = false), 
      StructField("tour_value", DoubleType, nullable = true), 
      StructField("id_driver", StringType, nullable = false), 
      StructField("id_passenger", IntegerType, nullable = false), 
      StructField("tour_status", StringType, nullable = false) 
    ))
Enter fullscreen mode Exit fullscreen mode

Read from Kafka Topic and Create the Streaming Dataframe

val df = spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers","localhost:9092")
      .option("failOnDataLoss","false")
      .option("startingOffsets", "latest")
      .option("subscribe", "ride").load()
Enter fullscreen mode Exit fullscreen mode

Parse the Dataframe with Schema and filter out only Events which are marked as Rejected
The Rejected Events Signify that a Driver has rejected a ride

val parsedDF = df.selectExpr("cast(value as string) as value")
      .select(from_json(col("value"), schema).as("data"))
      .select("data.*").where("tour_status='rejected'")
Enter fullscreen mode Exit fullscreen mode

Aggregate in a Window of 1 minute how many rides were rejected and Group By driver ID , also calculate how much money has been lost due to this rejection

val driverPerformance: DataFrame = parsedDF.groupBy(
      window(to_utc_timestamp(from_unixtime(col("event_date") / 1000, "yyyy-MM-dd HH:mm:ss"), "UTC+1")
        .alias("event_timestamp"),
        "1 minute"),
      col("id_driver"))
      .agg(count(col("id")).alias("total_rejected_tours"),
        sum("tour_value").alias("total_loss"))
.select("id_driver", "total_rejected_tours", "total_loss")

Enter fullscreen mode Exit fullscreen mode

Set a threshold of 3 cancellations , if it crosses 3 generate an event

val thresholdCrossedDF = driverPerformance.where(col("total_rejected_tours").gt(3))
Enter fullscreen mode Exit fullscreen mode

Write this DataFrame to a Kafka Topic rejectalert

thresholdCrossedDF.selectExpr("CAST(id_driver AS STRING) AS key", "to_json(struct(*)) AS value")
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers",
      prop.getProperty("output_host","localhost:9092"))
      .option("topic",prop.getProperty("output_topic","rejectalert"))
.outputMode("update".option("checkpointLocation",prop.getProperty("checkpoint","/tmp"))
      .start().awaitTermination()
Enter fullscreen mode Exit fullscreen mode

Run the Complete Application : https://github.com/snepar/spark-streaming-cab/blob/master/src/main/scala/rideevent/AlertGenerator.scala

Using A Consumer on Kafka Broker Subscribe to these Alerts

Reject

References

https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

https://github.com/rockthejvm/spark-streaming

. .