Building a real-time HTTP traffic stream with Apache Kafka

Kimmo Sääskilahti - Mar 10 '20 - - Dev Community

There are many reasons to record and analyze the traffic flowing in and out of your APIs. This data enables you to build audit logs or send alerts of anomalous activities, such as denial-of-service (DoS) attacks. More generally, you can also monitor the health and usage of your API and deeply understand customer behavior.

This article focuses on building a real-time pipeline for streaming HTTP traffic to Apache Kafka. By the end, we'll have built an example server application with Node.js, started Apache Kafka locally, and recorded data to Kafka from our server.

Kafka (short for Apache Kafka) is a high-performance distributed streaming platform. It's often used to centralize log management and to decouple data sources from data sinks. Kafka is a good choice for streaming data because it can ingest data from various sources at huge volumes. It's also tailor-made for real-time use cases, such as sending alerts of DoS attacks. Kafka also has various connectors for sending data to other services for further analysis. For example: Amazon S3, Azure Blob Storage, ElasticSearch, or HDFS.

⚠️ Prerequisites:

✅ Steps:

  1. Creating a Node.js server
  2. Preparing Kafka
  3. Creating an OpenAPI specification from recordings
  4. Conclusion

All of the code and instructions for this tutorial can be found in the meeshkan-express-kafka-demo GitHub repository.

Creating a Node.js server

We'll create a RESTful server with Express and record traffic logs in the HTTP Types format. HTTP Types is a human-readable JSON format for HTTP exchanges, with an example exchange looking as follows:

{
  "request": {
    "method": "get",
    "protocol": "http",
    "host": "example.com",
    "headers": {
      "accept": "*/*",
      "user-agent": "Mozilla/5.0 (pc-x86_64-linux-gnu) Siege/3.0.8"
    },
    "pathname": "/user/repos",
    "query": { "param": "value" },
    "timestamp": "2018-11-13T20:20:39+01:00"
  },
  "response": {
    "statusCode": 200,
    "body": "Hello as response!",
    "headers": {
      "content-length": "1999",
      "content-type": "text/html; charset=utf-8"
    },
    "timestamp": "2018-11-13T20:20:39+02:00"
  }
}
Enter fullscreen mode Exit fullscreen mode

To log HTTP traffic from Express to Kafka, we'll need:

  1. Middleware converting Express requests and responses to HTTP Types objects. The @meeshkanml/express-middleware package handles this.
  2. A transport sending the HTTP Types objects to Kafka. This is provided by http-types-kafka.

We'll see how to put these together below.

Our server is defined in src/index.ts. The entry point to the program is the main() function defined as follows:

const KAFKA_TOPIC = "http_recordings";
const KAFKA_CONFIG: KafkaConfig = {
  brokers: ["localhost:9092"],
};

const main = async () => {
  const httpTypesKafkaProducer = HttpTypesKafkaProducer.create({
    kafkaConfig: KAFKA_CONFIG,
    topic: KAFKA_TOPIC,
  });

  const kafkaExchangeTransport = async (exchange: HttpExchange) => {
    debugLog("Sending an exchange to Kafka");
    await httpTypesKafkaProducer.send(exchange);
  };

  const app = buildApp(kafkaExchangeTransport);

  // Prepare
  await kafkaTransport.connect();

  app.listen(PORT, "localhost", () => {
    console.log(`Listening at port ${PORT}`);
  });
  app.on("close", () => console.log("Closing express"));
};

main();
Enter fullscreen mode Exit fullscreen mode

Here, we're first creating a Kafka producer by defining the Kafka topic to write to and the list of brokers (consisting only of localhost:9092). http-types-kafka is a wrapper around kafkajs and KafkaConfig is defined in kafkajs. kafkaExchangeTransport is a function taking a HttpExchange object and returning a promise.

In our case, this promise is defined as:

const kafkaExchangeTransport = async (exchange: HttpExchange) => {
  debugLog("Sending an exchange to Kafka");
  await httpTypesKafkaProducer.send(exchange);
};
Enter fullscreen mode Exit fullscreen mode

The Express app is defined in the buildApp function. This function is also in the src/index.ts and looks like:

import httpTypesExpressMiddleware from "@meeshkanml/express-middleware";

const buildApp = (
  exchangeTransport: (exchange: HttpExchange) => Promise<void>
) => {
  const app = express();

  app.use(express.json());

  const kafkaExchangeMiddleware = httpTypesExpressMiddleware({
    transports: [exchangeTransport],
  });

  app.use(kafkaExchangeMiddleware);

  const userStore = new UserStore();

  app.use("/users", usersRouter(userStore));

  return app;
};
Enter fullscreen mode Exit fullscreen mode

Here, we're using express.json() middleware to parse request bodies as JSON. Express middleware for logging API traffic is created with the httpTypesExpressMiddleware imported from the @meeshkanml/express-middleware package. The object takes a list of transports as an argument, so we could also send our logs to other destinations such as a local file.

The actual user-facing API of our server is mounted on the /users route defined in usersRouter. The function creating the Express router takes an instance of UserStore to access the list of users. For demonstration purposes, we define our synchronous in-memory user store as follows:

// Representation of user
interface User {
  id: string;
  name: string;
  email: string;
}

interface CreateUserInput {
  name: string;
  email: string;
}

class UserStore {
  private readonly users: Record<string, User> = {};
  constructor() {}

  getUserById(userId: string): User | undefined {
    return this.users[userId];
  }

  createUser(userInput: CreateUserInput): User {
    const userId = uuidv4();
    const user: User = {
      id: userId,
      name: userInput.name,
      email: userInput.email,
    };
    this.users[userId] = user;
    return user;
  }
}
Enter fullscreen mode Exit fullscreen mode

The store keeps an in-memory dictionary of users by mapping user IDs to User objects. It also exposes getUserById and createUser methods for getting and creating users.

User requests are handled by our server as follows:

const usersRouter = (userStore: UserStore): express.Router => {
  const router = express.Router();

  router.post("/", (req: express.Request, res: express.Response) => {
    // Create a new user
    let userInput: CreateUserInput;
    debugLog("Incoming post user", req.body);
    try {
      userInput = parseCreateUserInput(req.body);
    } catch (err) {
      debugLog("Bad request", err, req.body);
      return res.sendStatus(400);
    }
    const newUser = userStore.createUser(userInput);
    // Set Location for client-navigation
    res.location(`users/${newUser.id}`);
    return res.json(newUser);
  });

  router.get("/:userId", (req: express.Request, res: express.Response) => {
    // Get user by ID
    const userId = req.params.userId;
    if (typeof userId !== "string") {
      return res.sendStatus(400);
    }
    const maybeUser = userStore.getUserById(userId);
    if (maybeUser) {
      return res.json(maybeUser);
    } else {
      return res.sendStatus(404);
    }
  });

  return router;
};
Enter fullscreen mode Exit fullscreen mode

The router exposes POST / and GET /:userId routes for creating and fetching users, respectively. Remember the router is mounted to /users, so the routes translate to POST /users and GET /users/:userId routes at top-level.

The request to create a new user is handled by validating the user input first. Creating a new user is then delegated to userStore.createUser and the created User object is sent back to the user as JSON.

Fetching a user is similar. The user ID given in the route must be a string, after which a user is fetched from userStore.getUserbyId. The store returns undefined if the user is not found, so that's converted to a response with status code 404.

Preparing Kafka

Before starting our server, we need to start Kafka.

If you prefer to install Kafka on your own machine, you can follow the instructions in Kafka Quick Start. Alternatively, you can use Docker. Our demo repository has a Docker Compose file docker-compose.yml. This file starts a single instance of Zookeeper, a centralized service for maintaining configuration information, and a single instance of Kafka. The Docker Compose file has been copied from the kafka-stack-docker-compose repository with small modifications.

Using Docker Compose, we can use the command line to start the Kafka cluster by running:

$ docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

The -d flag starts the Kafka cluster in the background. Data stored in Kafka is persisted in the local kafka-data/ directory so that data is not lost after stopping the containers. Kafka broker is listening at port 9092, which is also published by Docker.

Now we need to create a Kafka topic for our recordings. Run one of the following commands to create a topic named http_recordings, depending on whether you have Kafka tools installed or not:

# If you have Kafka installed
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic http_recordings --create --partitions 3 --replication-factor 1

# If you're using Docker
$ docker exec kafka1 kafka-topics --bootstrap-server localhost:9092 --topic http_recordings --create --partitions 3 --replication-factor 1
Enter fullscreen mode Exit fullscreen mode

The latter command executes the kafka-topics command inside the kafka1 container started by Docker Compose.

To see messages arriving to Kafka, start a console consumer to consume the http_recordings topic:

# If you have Kafka installed
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic http_recordings --from-beginning

# If you're using Docker
$ docker exec kafka1 kafka-console-consumer --bootstrap-server localhost:9092 --topic http_recordings --from-beginning
Enter fullscreen mode Exit fullscreen mode

Recording calls

Now we're ready to start our server and make some calls! You can start the server with:

$ yarn  # Install dependencies
$ yarn start  # Start server
# OR if using npm
$ npm install
$ npm run start
Enter fullscreen mode Exit fullscreen mode

Let's now make some calls to localhost:3000 using curl:

# Create a user
$ curl -X POST -d '{"name": "Kimmo", "email": "kimmo@example.com" }' -H "Content-Type: application/json" http://localhost:3000/users
# Example response:
# {"id":"95768802-5476-4cae-aae4-fb51a6b62ec1","name":"Kimmo","email":"kimmo@example.com"}

# Replace the user ID with the value you got
$ curl http://localhost:3000/users/95768802-5476-4cae-aae4-fb51a6b62ec1
# Example response:
# {"id":"95768802-5476-4cae-aae4-fb51a6b62ec1","name":"Kimmo","email":"kimmo@example.com"}

# To save the created user ID to environment variable USER_ID in bash, you can use sed (https://www.gnu.org/software/sed/manual/sed.html) to replace the whole response body with the captured ID:
$ export USER_ID=`curl -X POST -d '{"name": "Kimmo", "email": "kimmo@example.com" }' -H "Content-Type: application/json" http://localhost:3000/users | sed 's/.*"id":"\([^"]*\)".*/\1/'`

# Get created user by using the environment variable
$ curl http://localhost:3000/users/${USER_ID}
Enter fullscreen mode Exit fullscreen mode

Our Kafka console consumer should print HTTP exchanges line by line, showing we're successfully recording:

{"request":{"method":"post","protocol":"http","host":"localhost","headers":{"host":"localhost:3000","user-agent":"curl/7.54.0","accept":"*/*","content-type":"application/json","content-length":"48"},"body":"{\"name\":\"Kimmo\",\"email\":\"kimmo@example.com\"}","path":"/users","pathname":"/users","query":{}},"response":{"timestamp":"2020-02-28T10:39:28.833Z","statusCode":200,"headers":{"x-powered-by":"Express","location":"users/0549a790-fe19-4e1b-ae15-2ab99a2c91ad","content-type":"application/json; charset=utf-8","content-length":"88","etag":"W/\"58-LnvhpMtTNC8tDgPlNu5AwKbj3P0\""},"body":"{\"id\":\"0549a790-fe19-4e1b-ae15-2ab99a2c91ad\",\"name\":\"Kimmo\",\"email\":\"kimmo@example.com\"}"}}
{"request":{"method":"get","protocol":"http","host":"localhost","headers":{"host":"localhost:3000","user-agent":"curl/7.54.0","accept":"*/*"},"body":"{}","path":"/users/0549a790-fe19-4e1b-ae15-2ab99a2c91ad","pathname":"/users/0549a790-fe19-4e1b-ae15-2ab99a2c91ad","query":{}},"response":{"timestamp":"2020-02-28T10:39:54.034Z","statusCode":200,"headers":{"x-powered-by":"Express","content-type":"application/json; charset=utf-8","content-length":"88","etag":"W/\"58-LnvhpMtTNC8tDgPlNu5AwKbj3P0\""},"body":"{\"id\":\"0549a790-fe19-4e1b-ae15-2ab99a2c91ad\",\"name\":\"Kimmo\",\"email\":\"kimmo@example.com\"}"}}
Enter fullscreen mode Exit fullscreen mode

Creating an OpenAPI specification from recordings

To show a potential use case for our HTTP recordings, we'll use the recordings to create an OpenAPI specification. This will be done using the meeshkan Python tool. Our OpenAPI specification will then act as a contract - specifying the API endpoints and what data they consume or produce. It can be used for documentation or testing.

To get started, install meeshkan from PyPI:

$ pip install meeshkan
Enter fullscreen mode Exit fullscreen mode

To create an OpenAPI specification to the directory my_spec/, run the following command:

$ meeshkan build --source kafka -o my_spec
Enter fullscreen mode Exit fullscreen mode

meeshkan will update the OpenAPI specification in memory whenever new data arrives in http_recordings topic. Stop meeshkan with Ctrl+C and the specification is written to my_spec directory with an openapi.json as follows:

{
  "openapi": "3.0.0",
  "info": {
    "title": "API title",
    "version": "1.0",
    "description": "API description"
  },
  "paths": {
    "/users": {
      "summary": "Path summary",
      "description": "Path description",
      "post": {
        "responses": {
          "200": {
            "description": "Response description",
            "headers": {},
            "content": {
              "application/json": {
                "schema": {
                  "required": ["email", "id", "name"],
                  "properties": {
                    "id": { "type": "string" },
                    "name": { "type": "string" },
                    "email": { "type": "string" }
                  },
                  "type": "object"
                }
              }
  ...
}
Enter fullscreen mode Exit fullscreen mode

Finally, we can close down our Kafka cluster:

$ docker-compose down
Enter fullscreen mode Exit fullscreen mode

Conclusion

To summarize, we created an Express server running in Node.js and added a middleware logging all HTTP exchanges to Apache Kafka. We also saw how to use meeshkan to create an OpenAPI specification of our server.

If you haven't tried it yourself yet, you can follow the steps of this article in our GitHub repository.

meeshkan is still under development, so we very much appreciate any feedback. Feel free to comment below or try our tutorial.

Thank you for reading!

Big thanks to Carolyn for suggestions how to improve the article!

This article was originally posted on meeshkan.com.

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .