Kafka: Topics, Partition & Consumers

·

5 min read

Concepts

What is Kafka? Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

What is Topic? Kafka topics are the categories used to organize messages. Each topic has a name that is unique across the entire Kafka cluster. Messages are sent to and read from specific topics. In other words, producers write data to topics, and consumers read data from topics.

What is Partition? Kafka Topic is divided into several partitions. Kafka guarantees the order of the messages sent within the same topic partition**.** Since a topic can be split into partitions over multiple machines, multiple consumers can read a topic in parallel. This organization sets Kafka up for high message throughput.

What is Consumer? A consumer is the one client that consumes or reads data from the Kafka cluster via a topic. A consumer also knows from which broker, it should read the data. The consumer reads the data within each partition in an orderly manner.

What is Consumer Group? A consumer group is a set of consumers cooperating to consume data from some topics. The partitions of the topics are divided among the consumers in the group so that each member receives a proportional share of the partitions from a topic.

Implementation in Node.js

Let's create our utility class for using Kafka.

import { Kafka } from "kafkajs";

// Our Wrapper class for using kafka utility
class KafkaWrapper {
  // private variable for holding kafka client admin, producer 
  // and list of consumers.
  #admin;
  #producer;
  #consumers = [];

  // initialization kafka client connected to locally running brokers.
  static #kafka = new Kafka({
    clientId: "kafka-demo",
    brokers: ["localhost:9092"],
  });

  // Function to initialization kafka & producers.
  // passed topics array will be created in kafka cluster
  async initProducer(topics) {
    this.#admin = KafkaWrapper.#kafka.admin();
    await this.#admin.connect();
    await this.#admin.createTopics({
      topics: topics,
    });
    this.#producer = KafkaWrapper.#kafka.producer();
  }

  // Function to initialise new Consumer
  // groupId -> id of group 
  // topic -> topic name for which this consumer is subscribed to.
  // callback -> callback handler whenever this consumber this message on provided topic.
  async initConsumer(groupId, topic, callback) {
    this.#consumers.push(KafkaWrapper.#kafka.consumer({ groupId }));
    await this.#consumers.at(-1).connect();
    await this.#consumers.at(-1).subscribe({ topic });
    await this.#consumers.at(-1).run({
      eachMessage: async (message) => {
        callback(message);
      },
    });
  }

  // Function to send message into provided topic.
  async sendMessages(msg, topic) {
    await this.#producer.connect();
    await this.#producer.send({
      topic: topic,
      messages: [{ value: msg }],
    });
  }
}

export default KafkaWrapper;

Our code is self-explanatory. There are two important methods of class sendMessages(msg, topic) which will send a message to the topic provided via argument and initConsumer(groupId, topic, callback) that will create a new consumer inside the provided group, this will also subscribe to the provided topic and execute the provided callback function whenever a message is received on this consumer.

Now let's create a driver code.

// configuration varaiable for topic & partition of topic.
const topic = "topic";
const partition = 3;

// object for storing recevied messages so that we can later print & see.
const res = {};

// Configuration array for consumers
// if there are N number of element in array 
// then N number of consumber will be created with provided configuration
const consumers = [{ consumer: "consumer", group: "group" }];

// handler for received messages & storing in res object.
const handleMessage = (consumerId, msg) => {
  const { partition, message } = msg;
  if (!res.hasOwnProperty(consumerId)) res[consumerId] = [];
  res[consumerId].push({ partition, message: message.value.toString() });
};

// init kafka broker.
const broker = new Kafka();
await broker.initProducer([{ topic: topic, numPartitions: partition }]);

// init consumers from consumer configuration array
for (let each of consumers) {
  await broker.initConsumer(each.group, topic, (msg) =>
    handleMessage(each.consumer, msg)
  );
}

// sending 10 messages to topic
for (let i = 1; i <= 10; i++) {
  await broker.sendMessages(`Hello ${i}`, topic);
}

// printing received messages after some delay.
setTimeout(() => console.log(res), 2000);

Now we have an interface to test our Kafka communication. we can create as many consumers within the same group or different groups by adding more configuration objects in the consumer array. we can also increase or decrease the partition of topics by changing partition variable.

Examples

Now we run our code as per the below images by configuring the number of partitions on the topic and the number of consumers subscribing to that topic.

Explanation - Here a topic is divided into 3 partitions and a consumer group with 3 members subscribes to that topic, then Kafka will assign each partition to each group member to distribute load and increase message throughput on that topic.


Explanation - Here a topic is divided into 3 partitions and a single consumer subscribes to that topic, then Kafka will assign All partitions to that Consumer so that all messages get delivered and processed by that consumer.


Explanation - Here a topic is divided into 3 partitions and the Consumer group has only 2 members. In this case, Kafka will try to distribute loads and assign each partition to each group member and one member will get assigned to two partition. (Here in code Consumer B gets assigned to Partition 0 & 1 instead of consumer A as in image).


Explanation - Here a topic is divided into two partitions and the Consumer group has three members. Even we have three members only two members will get assigned to partition because we have only two partition and additional members will sit idle without receiving any messages.


Explanation - Here a topic is divided into three partitions and there are two consumer group with different number of members inside group. Here you can see Kafka automatically distribute load so that each consumer group get all message.

In First Consumer Group, All partition get assigned to each member of group.

  • ConsumerA get assigned to partition 2

  • ConsumerB get assigned to partition 0

  • ConsumerC get assigned to partition 1

In Second Group,

  • ConsumerD get assigned to partition 1

  • ConsumerE get assigned to partition 0 & 2.


Hope this helps to get some understanding of How Kafka works. Let me know if you have any other questions.

Resources