In this post we will learn how to create a Kafka producer and consumer in Node.js. We will also look at how to tune some configuration options to make our application production-ready.

banner

Kafka is an open-source event streaming platform, used for publishing and processing events at high-throughput. There are a lot of popular libraries for Node.js in order to interface with Kafka. For this post, we will be using the kafkajs library (but the same concepts will apply for any other library as well).

If you want to skip the explanation and see the code, you can view it on Github

Getting Started

First, make sure you have a running Kafka cluster on your machine.

Before diving into the code, we should know about brokers and topics, which will be needed by both the producer and consumer.

A “topic” can be thought of as a distinct queue or channel where messages are sent.

For most production applications, there isn’t a single Kafka server running, but rather a cluster of multiple servers called “brokers”. The messages for each topic are split amongst the various brokers.

topics and brokers

To learn more about how each topic is split between the brokers, you can read the official docs.

Creating the Kafka Producer

For the purpose of illustration, let’s create a function in a new file producer.js, that writes a message into the Kafka cluster every second, forever:

// import the `Kafka` instance from the kafkajs library
const { Kafka } = require("kafkajs")

// the client ID lets kafka know who's producing the messages
const clientId = "my-app"
// we can define the list of brokers in the cluster
const brokers = ["localhost:9092"]
// this is the topic to which we want to write messages
const topic = "message-log"

// initialize a new kafka client and initialize a producer from it
const kafka = new Kafka({ clientId, brokers })
const producer = kafka.producer()

// we define an async function that writes a new message each second
const produce = async () => {
	await producer.connect()
	let i = 0

	// after the produce has connected, we start an interval timer
	setInterval(async () => {
		try {
			// send a message to the configured topic with
			// the key and value formed from the current value of `i`
			await producer.send({
				topic,
				messages: [
					{
						key: String(i),
						value: "this is message " + i,
					},
				],
			})

			// if the message is written successfully, log it and increment `i`
			console.log("writes: ", i)
			i++
		} catch (err) {
			console.error("could not write message " + err)
		}
	}, 1000)
}

module.exports = produce

Creating the Kafka Consumer

When creating a consumer, we need to specify it’s group ID. This is because a single topic can have multiple consumers, and each consumers group ID ensures that multiple consumers belonging to the same group ID don’t get repeated messages.

Let’s create another consume function in a new file consumer.js that consumes messages from the Kafka cluster whenever they’re available:

// the kafka instance and configuration variables are the same as before

// create a new consumer from the kafka client, and set its group ID
// the group ID helps Kafka keep track of the messages that this client
// is yet to receive
const consumer = kafka.consumer({ groupId: clientId })

const consume = async () => {
	// first, we wait for the client to connect and subscribe to the given topic
	await consumer.connect()
	await consumer.subscribe({ topic })
	await consumer.run({
		// this function is called every time the consumer gets a new message
		eachMessage: ({ message }) => {
			// here, we just log the message to the standard output
			console.log(`received message: ${message.value}`)
		},
	})
}

module.exports = consume

Now that we’ve defined the functions to send and receive messages, we can put it all together in the index.js file:

const produce = require("./produce")
const consume = require("./consume")

// call the `produce` function and log an error if it occurs
produce().catch((err) => {
	console.error("error in producer: ", err)
})

// start the consumer, and log any errors
consume().catch((err) => {
	console.error("error in consumer: ", err)
})

If you run this code, you’ll see an output similar to this:

❯ node index.js
{"level":"INFO","timestamp":"2020-12-29T07:26:23.029Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"my-app"}
writes:  0
writes:  1
writes:  2
writes:  3
writes:  4
writes:  5
{"level":"INFO","timestamp":"2020-12-29T07:26:51.137Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"my-app","memberId":"my-app-72213802-d193-47da-ac0d-1fc76cb76d05","leaderId":"my-app-72213802-d193-47da-ac0d-1fc76cb76d05","isLeader":true,"memberAssignment":{"message-log":[0]},"groupProtocol":"RoundRobinAssigner","duration":28104}
received message: this is message 0
received message: this is message 1
received message: this is message 2
received message: this is message 3
received message: this is message 4
received message: this is message 5
writes:  6
writes:  7

Something to notice here is that we receive multiple messages at a time, even though we set the writes to take place with a break of one second.

This is because the Kafka client has some default settings that are great for large scale applications, but which you might want to modify if latency is a concern.

In the next section we’ll Node.js through some configuration options that you can set to optimize the Kafka client for your needs.

Tuning Kafka Client Configuration

There are a lot of options that you can configure when creating the Kafka producer and consumer. In this section we’ll Node.js over some of the more important configuration options, and what they mean.

You can see the full list of configuration options here

Minimum Buffered Bytes

As we’ve seen from the previous example, the data received by the consumer isn’t exactly “real time”.

The consumer polls the Kafka brokers to check if there is enough data to receive. The minimum buffered bytes defines what “enough” is. For example, if we have a configuration like this:

const clientId = "my-app"

// ...

const consumer = kafka.consumer({
	groupId: clientId,
	minBytes: 5,
	maxBytes: 1e6,
})

this means that if the consumer polls the cluster to check if there is any new data on the topic for the my-app consumer ID, the cluster will only respond if there are at least 5 new bytes of information to send.

In this example, every message is 8 bytes, and the minimum buffered bytes is set to 15. when the total size of pending messages exceeds the minimum buffered bytes, they are sent together as a batch

maxBytes, on the other hand, defines the maximum quantity of data that the cluster can respond with when polled.

Setting minBytes would help to receive the data in batches, which would reduce the overall throughput and load on your system. However, if there is a long period of time that elapses before the amount of new data crosses the minBytes value, it would result in the previous data getting stuck for that amount of time.

Max Wait Time

The max wait time setting helps mitigate the problem discussed above. It sets the maximum time to wait between receiving messages from the Kafka cluster, regardless of the minBytes setting.

So, if we set up our reader with the following config:

const consumer = kafka.consumer({
	groupId: clientId,
	minBytes: 5,
	maxBytes: 1e6,
	// wait for at most 3 seconds before receiving new data
	maxWaitTimeInMs: 3000,
})

that would mean that the consumer would have to wait at the most 3 seconds before receiving any new messages, even if the new messages did not cross the min bytes setting that we set previously.

In this example, every message is 8 bytes, and the minimum buffered bytes is set to 15 and max wait time is 3 seconds

Start Offset

When a new consumer is added to a topic, it has two options for where it wants to start consuming data from:

  1. Earliest - The consumer will start consuming data for a topic starting from the earliest message that is available.
  2. Latest - Only consume new messages that appear after the consumer has joined the cluster.

offset tell the consumer where to start consuming messages from

This can be configured by setting the fromBeginning option when calling the subscribe method (true for “earliest” and false for “latest”):

//...
await consumer.subscribe({ topic, fromBeginning: true })
// ...

Note that this only applies for new consumer groups. If you’ve already consumed data with the same consumer groupId setting before, you will continue from wherever you left off.

Message Batching

So far we’ve looked at configuration on the consumer side. Let’s take a look at some producer/writer side configuration options.

Similar to the consumer, the producer also tries to send messages in batches. This is to reduce the total number of network round trips and improve efficiency in writing messages, but comes at the cost of increased overall latency.

With the kafkajs library, we can batch multiple messages to send at once by adding them to the messages array:

await producer.send({
	topic,
	messages: [
		{
			key: String(i),
			value: "this is message " + i,
		},
		{
			key: String(i + 1),
			value: "this is message " + (i + 1),
		},
		{
			key: String(i + 2),
			value: "this is message " + (i + 2),
		},
	],
})

Required Acknowledgements

When we call the producer.send method in our example, it blocks the code until the message is confirmed to be written. However, the definition of what “confirmed” means can be different based on your settings.

Remember, the Kafka cluster (and your topic partitions) is distributed between multiple brokers. Of these, one of the brokers is the designated leader and the rest are followers.

Keeping this in mind, there are three modes of acknowledgement (represented by integers) when writing messages to the cluster:

  1. All brokers acknowledge that they have received the message (represented as -1)
  2. Only the leading broker acknowledges that it has received the messages (represented as 1). The remaining brokers can still eventually receive the message, but we won’t wait for them to do so.
  3. No one acknowledges receiving the message (represented as 0). This is basically a fire-and-forget mode, where we don’t care if our message is received or not. This should only be used for data that you are ok with losing a bit of, but require high throughput for.

required acknowledgements determines when your producer will consider the message as “written”

In our code, we can use the acks option to set the maximum required acknowledgements:

await producer.send({
	topic,
	// can be set to -1, 0, or 1
	// 1 is a good default for most non-transactional data
	acks: 1,
	messages: [
		{
			key: String(i),
			value: "this is message " + i,
		},
	],
})

Logging Options

In our example, we printed to the console every time a messages was written or read. We may want more information sometimes for debugging or to know more about our cluster.

The kafkajs library comes with the option of specifying a more verbose log level that can provide more detailed information about the state of your kafka brokers.

The Kafka constructor comes with a logLevel option, which we can set to the “DEBUG” level to get the most verbose information:

const { Kafka, logLevel } = require("kafkajs")

//...

const kafka = new Kafka({
	clientId,
	brokers,
	logLevel: logLevel.DEBUG,
})

You can choose other log levels and provide your own custom logging tool as well. For more details, you can visit the documentation page

The kafka instance can now be used in the producer and consumer like before.

When running the program, we can see that the output contains much more information than before:

❯ node index.js
{"level":"DEBUG","timestamp":"2020-12-30T06:45:08.916Z","logger":"kafkajs","message":"[Connection] Connecting","broker":"localhost:9092","clientId":"my-app","ssl":false,"sasl":false}
{"level":"DEBUG","timestamp":"2020-12-30T06:45:08.932Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 6)","broker":"localhost:9092","clientId":"my-app","correlationId":1,"size":119,"data":{"throttleTime":0,"brokers":[{"nodeId":0,"host":"192.168.0.104","port":9092,"rack":null}],"clusterId":"CjFFQNLySaOh56HqC1acDA","controllerId":0,"topicMetadata":[{"topicErrorCode":0,"topic":"message-log","isInternal":false,"partitionMetadata":[{"partitionErrorCode":0,"partitionId":0,"leader":0,"replicas":[0],"isr":[0],"offlineReplicas":[]}]}],"clientSideThrottleTime":0}}
{"level":"INFO","timestamp":"2020-12-30T06:45:08.933Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"my-app"}
{"level":"DEBUG","timestamp":"2020-12-30T06:45:08.933Z","logger":"kafkajs","message":"[Connection] Connecting","broker":"192.168.0.104:9092","clientId":"my-app","ssl":false,"sasl":false}
{"level":"DEBUG","timestamp":"2020-12-30T06:45:08.934Z","logger":"kafkajs","message":"[Broker] Verified support for SaslAuthenticate","broker":"192.168.0.104:9092","supportAuthenticationProtocol":true}
{"level":"DEBUG","timestamp":"2020-12-30T06:45:08.936Z","logger":"kafkajs","message":"[Connection] Request GroupCoordinator(key: 10, version: 2)","broker":"192.168.0.104:9092","clientId":"my-app","correlationId":0,"expectResponse":true,"size":29}
{"level":"DEBUG","timestamp":"2020-12-30T06:45:08.937Z","logger":"kafkajs","message":"[Connection] Response GroupCoordinator(key: 10, version: 2)","broker":"192.168.0.104:9092","clientId":"my-app","correlationId":0,"size":39,"data":{"throttleTime":0,"errorCode":0,"errorMessage":"NONE","coordinator":{"nodeId":0,"host":"192.168.0.104","port":9092},"clientSideThrottleTime":0}}
{"level":"DEBUG","timestamp":"2020-12-30T06:45:08.937Z","logger":"kafkajs","message":"[Cluster] Found group coordinator","nodeId":0}
{"level":"DEBUG","timestamp":"2020-12-30T06:45:08.938Z","logger":"kafkajs","message":"[Connection] Request JoinGroup(key: 11, version: 5)","broker":"192.168.0.104:9092","clientId":"my-app","correlationId":1,"expectResponse":true,"size":101}
{"level":"DEBUG","timestamp":"2020-12-30T06:45:08.939Z","logger":"kafkajs","message":"[Connection] Response JoinGroup(key: 11, version: 5)","broker":"192.168.0.104:9092","clientId":"my-app","error":"The group member needs to have a valid member id before actually entering a consumer group","correlationId":1,"payload":{"type":"Buffer","data":"[filtered]"}}
{"level":"DEBUG","timestamp":"2020-12-30T06:45:08.940Z","logger":"kafkajs","message":"[Connection] Request JoinGroup(key: 11, version: 5)","broker":"192.168.0.104:9092","clientId":"my-app","correlationId":2,"expectResponse":true,"size":144}
{"level":"DEBUG","timestamp":"2020-12-30T06:45:08.941Z","logger":"kafkajs","message":"[Connection] Response JoinGroup(key: 11, version: 5)","broker":"192.168.0.104:9092","clientId":"my-app","correlationId":2,"size":202,"data":{"throttleTime":0,"clientSideThrottleTime":0,"errorCode":0,"generationId":17,"groupProtocol":"RoundRobinAssigner","leaderId":"my-app-f3dde7bc-1479-4205-bdcd-e4bd33d494b9","memberId":"my-app-f3dde7bc-1479-4205-bdcd-e4bd33d494b9","members":[{"memberId":"my-app-f3dde7bc-1479-4205-bdcd-e4bd33d494b9","groupInstanceId":null,"memberMetadata":{"type":"Buffer","data":[0,1,0,0,0,1,0,11,109,101,115,115,97,103,101,45,108,111,103,0,0,0,0]}}]}}
{"level":"DEBUG","timestamp":"2020-12-30T06:45:08.941Z","logger":"kafkajs","message":"[ConsumerGroup] Chosen as group leader","groupId":"my-app","generationId":17,"memberId":"my-app-f3dde7bc-1479-4205-bdcd-e4bd33d494b9","topics":["message-log"]}
{"level":"DEBUG","timestamp":"2020-12-30T06:45:08.942Z","logger":"kafkajs","message":"[Connection] Request Metadata(key: 3, version: 6)","broker":"192.168.0.104:9092","clientId":"my-app","correlationId":3,"expectResponse":true,"size":38}
{"level":"DEBUG","timestamp":"2020-12-30T06:45:08.942Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 6)","broker":"192.168.0.104:9092","clientId":"my-app","correlationId":3,"size":119,"data":{"throttleTime":0,"brokers":[{"nodeId":0,"host":"192.168.0.104","port":9092,"rack":null}],"clusterId":"CjFFQNLySaOh56HqC1acDA","controllerId":0,"topicMetadata":[{"topicErrorCode":0,"topic":"message-log","isInternal":false,"partitionMetadata":[{"partitionErrorCode":0,"partitionId":0,"leader":0,"replicas":[0],"isr":[0],"offlineReplicas":[]}]}],"clientSideThrottleTime":0}}
{"level":"DEBUG","timestamp":"2020-12-30T06:45:08.943Z","logger":"kafkajs","message":"[ConsumerGroup] Group assignment","groupId":"my-app","generationId":17,"groupProtocol":"RoundRobinAssigner","assignment":[{"memberId":"my-app-f3dde7bc-1479-4205-bdcd-e4bd33d494b9","memberAssignment":{"type":"Buffer","data":[0,1,0,0,0,1,0,11,109,101,115,115,97,103,101,45,108,111,103,0,0,0,1,0,0,0,0,0,0,0,0]}}],"topics":["message-log"]}
{"level":"DEBUG","timestamp":"2020-12-30T06:45:08.945Z","logger":"kafkajs","message":"[Connection] Request SyncGroup(key: 14, version: 3)","broker":"192.168.0.104:9092","clientId":"my-app","correlationId":4,"expectResponse":true,"size":163}
{"level":"DEBUG","timestamp":"2020-12-30T06:45:08.946Z","logger":"kafkajs","message":"[Connection] Response SyncGroup(key: 14, version: 3)","broker":"192.168.0.104:9092","clientId":"my-app","correlationId":4,"size":45,"data":{"throttleTime":0,"errorCode":0,"memberAssignment":{"type":"Buffer","data":[0,1,0,0,0,1,0,11,109,101,115,115,97,103,101,45,108,111,103,0,0,0,1,0,0,0,0,0,0,0,0]},"clientSideThrottleTime":0}}
{"level":"DEBUG","timestamp":"2020-12-30T06:45:08.946Z","logger":"kafkajs","message":"[ConsumerGroup] Received assignment","groupId":"my-app","generationId":17,"memberId":"my-app-f3dde7bc-1479-4205-bdcd-e4bd33d494b9","memberAssignment":{"message-log":[0]}}
{"level":"INFO","timestamp":"2020-12-30T06:45:08.946Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"my-app","memberId":"my-app-f3dde7bc-1479-4205-bdcd-e4bd33d494b9","leaderId":"my-app-f3dde7bc-1479-4205-bdcd-e4bd33d494b9","isLeader":true,"memberAssignment":{"message-log":[0]},"groupProtocol":"RoundRobinAssigner","duration":13}
{"level":"DEBUG","timestamp":"2020-12-30T06:45:08.949Z","logger":"kafkajs","message":"[Connection] Request OffsetFetch(key: 9, version: 4)","broker":"192.168.0.104:9092","clientId":"my-app","correlationId":5,"expectResponse":true,"size":53}
{"level":"DEBUG","timestamp":"2020-12-30T06:45:08.950Z","logger":"kafkajs","message":"[Connection] Response OffsetFetch(key: 9, version: 4)","broker":"192.168.0.104:9092","clientId":"my-app","correlationId":5,"size":47,"data":{"throttleTime":0,"responses":[{"topic":"message-log","partitions":[{"partition":0,"offset":"4376","metadata":"","errorCode":0}]}],"errorCode":0,"clientSideThrottleTime":0}}
{"level":"DEBUG","timestamp":"2020-12-30T06:45:08.950Z","logger":"kafkajs","message":"[ConsumerGroup] Fetching from 1 partitions for 1 out of 1 topics","topics":["message-log"],"activeTopicPartitions":[{"topic":"message-log","partitions":[0]}],"pausedTopicPartitions":[]}
{"level":"DEBUG","timestamp":"2020-12-30T06:45:08.959Z","logger":"kafkajs","message":"[Connection] Request Fetch(key: 1, version: 11)","broker":"192.168.0.104:9092","clientId":"my-app","correlationId":6,"expectResponse":true,"size":100}
{"level":"DEBUG","timestamp":"2020-12-30T06:45:09.938Z","logger":"kafkajs","message":"[Connection] Request Metadata(key: 3, version: 6)","broker":"localhost:9092","clientId":"my-app","correlationId":1,"expectResponse":true,"size":38}
{"level":"DEBUG","timestamp":"2020-12-30T06:45:09.940Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 6)","broker":"localhost:9092","clientId":"my-app","correlationId":1,"size":119,"data":{"throttleTime":0,"brokers":[{"nodeId":0,"host":"192.168.0.104","port":9092,"rack":null}],"clusterId":"CjFFQNLySaOh56HqC1acDA","controllerId":0,"topicMetadata":[{"topicErrorCode":0,"topic":"message-log","isInternal":false,"partitionMetadata":[{"partitionErrorCode":0,"partitionId":0,"leader":0,"replicas":[0],"isr":[0],"offlineReplicas":[]}]}],"clientSideThrottleTime":0}}
{"level":"DEBUG","timestamp":"2020-12-30T06:45:09.941Z","logger":"kafkajs","message":"[Connection] Connecting","broker":"192.168.0.104:9092","clientId":"my-app","ssl":false,"sasl":false}
{"level":"DEBUG","timestamp":"2020-12-30T06:45:09.941Z","logger":"kafkajs","message":"[Broker] Verified support for SaslAuthenticate","broker":"192.168.0.104:9092","supportAuthenticationProtocol":true}
{"level":"DEBUG","timestamp":"2020-12-30T06:45:09.949Z","logger":"kafkajs","message":"[Connection] Request Produce(key: 0, version: 7)","broker":"192.168.0.104:9092","clientId":"my-app","correlationId":0,"expectResponse":true,"size":143}
{"level":"DEBUG","timestamp":"2020-12-30T06:45:09.951Z","logger":"kafkajs","message":"[Connection] Response Fetch(key: 1, version: 11)","broker":"192.168.0.104:9092","clientId":"my-app","correlationId":6,"size":163,"data":"[filtered]"}
received message: this is message 0
{"level":"DEBUG","timestamp":"2020-12-30T06:45:09.955Z","logger":"kafkajs","message":"[Connection] Request OffsetCommit(key: 8, version: 5)","broker":"192.168.0.104:9092","clientId":"my-app","correlationId":7,"expectResponse":true,"size":112}
{"level":"DEBUG","timestamp":"2020-12-30T06:45:09.955Z","logger":"kafkajs","message":"[Connection] Response Produce(key: 0, version: 7)","broker":"192.168.0.104:9092","clientId":"my-app","correlationId":0,"size":59,"data":{"topics":[{"topicName":"message-log","partitions":[{"partition":0,"errorCode":0,"baseOffset":"4376","logAppendTime":"-1","logStartOffset":"3139"}]}],"throttleTime":0,"clientSideThrottleTime":0}}
writes:  0

Although detailed logging can be helpful in a non-production environment, you should be careful before using it in production, to prevent your logs from being polluted with too much information

Further Reading

This post has covered most of the important options that I personally use in a production environment, but is by no means exhaustive.

There are many more configuration options that you may find important for your use case. You can read the full list here.

For library specific options, you can see the KafkaJS docs for producing and consuming messages.

You can see the working example of the code explained here on Github.