This tutorial will show you how to install and run a Kafka cluster on your machine, and demonstrate some important concepts about the architecture of Kafka.

banner

Apache Kafka is a distributed streaming platform. It can be used for anything ranging from a distributed message broker to a platform for processing data streams.

Let’s see how we can get a production quality Kafka cluster running with the least effort possible.

Installation

First, we need to install Java in order to run the Kafka executables.

Next, you can download Kafka’s binaries from the official download page (this one is for v3.6.0). Extract the tar files in any location of you choice :

tar -xvzf kafka_2.13-3.6.0.tgz

You should see a folder named kafka_2.13-3.6.0, and inside you will see bin and config folders.

System Architecture

There are a bunch of processes that we need to start to run our cluster :

  1. Zookeeper : Which is used by Kafka to maintain state between the nodes of the cluster.
  2. Kafka brokers : The “pipes” in our pipeline, which store and emit data.
  3. Producers : That insert data into the cluster.
  4. Consumers : That read data from the cluster.

basic architecture

Note that each block in this diagram can be on a different system on the network.

Starting Zookeeper

Zookeeper is a distributed key-value store commonly used to store server state for coordination.

Kafka requires a Zookeeper server in order to run, so the first thing we need to do is start a Zookeeper instance.

Inside the extracted kafka_2.13-3.6.0, you will find some useful files:

  • bin/zookeeper-server-start.sh : which is used to start the server.
  • config/zookeeper.properties : which provides the default configuration for the Zookeeper server to run.

We can start the Zookeeper server by running:

bin/zookeeper-server-start.sh config/zookeeper.properties

Make sure you are inside the kafka_2.13-3.6.0 directory when executing this command

You should see a confirmation that the server has started:

[2023-10-10 14:24:37,146] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2023-10-10 14:24:37,152] INFO Using org.apache.zookeeper.server.watch.WatchManager as watch manager (org.apache.zookeeper.server.watch.WatchManagerFactory)
[2023-10-10 14:24:37,152] INFO Using org.apache.zookeeper.server.watch.WatchManager as watch manager (org.apache.zookeeper.server.watch.WatchManagerFactory)
[2023-10-10 14:24:37,152] INFO zookeeper.snapshotSizeFactor = 0.33 (org.apache.zookeeper.server.ZKDatabase)
[2023-10-10 14:24:37,152] INFO zookeeper.commitLogCount=500 (org.apache.zookeeper.server.ZKDatabase)
[2023-10-10 14:24:37,154] INFO zookeeper.snapshot.compression.method = CHECKED (org.apache.zookeeper.server.persistence.SnapStream)
[2023-10-10 14:24:37,154] INFO Reading snapshot /tmp/zookeeper/version-2/snapshot.0 (org.apache.zookeeper.server.persistence.FileSnap)
[2023-10-10 14:24:37,156] INFO The digest value is empty in snapshot (org.apache.zookeeper.server.DataTree)
[2023-10-10 14:24:37,158] INFO Snapshot loaded in 5 ms, highest zxid is 0x0, digest is 1371985504 (org.apache.zookeeper.server.ZKDatabase)
[2023-10-10 14:24:37,158] INFO Snapshotting: 0x0 to /tmp/zookeeper/version-2/snapshot.0 (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
[2023-10-10 14:24:37,159] INFO Snapshot taken in 1 ms (org.apache.zookeeper.server.ZooKeeperServer)
[2023-10-10 14:24:37,163] INFO PrepRequestProcessor (sid:0) started, reconfigEnabled=false (org.apache.zookeeper.server.PrepRequestProcessor)
[2023-10-10 14:24:37,163] INFO zookeeper.request_throttler.shutdownTimeout = 10000 ms (org.apache.zookeeper.server.RequestThrottler)
[2023-10-10 14:24:37,173] INFO Using checkIntervalMs=60000 maxPerMinute=10000 maxNeverUsedIntervalMs=0 (org.apache.zookeeper.server.ContainerManager)

If you inspect the config/zookeeper.properties file, you should see the clientPort property set to 2181, which is the port that your Zookeeper server is currently listening on:

# File: kafka_2.13-3.6.0/config/zookeeper.properties

# the port at which the clients will connect
clientPort=2181

Starting the Kafka Brokers

Kafka brokers are the heart of the cluster - they act as the pipelines where our data is stored and distributed.

Similar to how we started Zookeeper, there are two files meant to start (bin/kafka-server-start.sh) and configure (config/server.properties) the broker servers.

Since we want to demonstrate the distributed nature of kafka, let’s start up 3 brokers, as shown in the previous diagram.

If you open the config/server.properties file, you will see several configuration options (you can ignore most of them for now).

However, there are three properties that have to be unique for each broker instance:

File: kafka_2.13-3.6.0/config/server.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

# The address the socket server listens on. It will get the value returned from 
listeners=PLAINTEXT://:9092

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

Since we will have 3 servers, it’s better to maintain 3 configuration files for each server. Copy the config/server.properties file and make 3 files for each server instance:

cp config/server.properties config/server.1.properties
cp config/server.properties config/server.2.properties
cp config/server.properties config/server.3.properties

Change the above 3 properties for each copy of the file so that they are all unique.

server.1.properties

broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs1

server.2.properties

broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/kafka-logs2

server.3.properties

broker.id=3
listeners=PLAINTEXT://:9095
log.dirs=/tmp/kafka-logs3

Create the log directories that we configured:

mkdir /tmp/kafka-logs1
mkdir /tmp/kafka-logs2
mkdir /tmp/kafka-logs3

Finally, we can start the broker instances. Run the below three commands on different terminal sessions:

bin/kafka-server-start.sh config/server.1.properties
bin/kafka-server-start.sh config/server.2.properties
bin/kafka-server-start.sh config/server.3.properties

You should see a startup message when the brokers start successfully:

[2023-10-10 14:28:28,680] INFO Kafka version: 3.6.0 (org.apache.kafka.common.utils.AppInfoParser)
[2023-10-10 14:28:28,680] INFO Kafka commitId: 60e845626d8a465a (org.apache.kafka.common.utils.AppInfoParser)
[2023-10-10 14:28:28,680] INFO Kafka startTimeMs: 1696928308678 (org.apache.kafka.common.utils.AppInfoParser)
[2023-10-10 14:28:28,680] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)

Creating Topics

Before we can start putting data into your cluster, we need to create a topic to which the data will belong. To do this, run the command:

bin/kafka-topics.sh --create --topic my-kafka-topic --bootstrap-server localhost:9093 --partitions 3 --replication-factor 2

Let’s explain some of the options here:

  • partitions lets you decide how many brokers you want your data to be split between. Since we set up 3 brokers, we can set this option to 3.
  • replication-factor describes how many copies of you data you want (in case one of the brokers goes down, you still have your data on the others). Since we set this value to 2, our data will have two copies of itself on any two of the brokers.
  • bootstrap-server points to the address of any one of our active Kafka brokers. Since all brokers know about each other through Zookeeper, it doesn’t matter which one you choose.

Note - If you’re using Kafka version 2.x.x and below, you will have to use --zookeeper localhost:2181 (the address of your Zookeeper instance) instead of --bootstrap-server localhost:9093

topics are split into partitions that are replicated across multiple brokers

Once you create the topic, you should see a confirmation message:

Created topic my-kafka-topic.

Listing Topics

If you want to list all available topics, you can run the bin/kafka-topics.sh as well:

bin/kafka-topics.sh --list --bootstrap-server localhost:9093

In this case it will only return one topic, which we created in the previous section:

my-kafka-topic

To get more details on a topic, you can use the --describe argument with the same command:

bin/kafka-topics.sh --describe --topic my-kafka-topic --bootstrap-server localhost:9093

Which will output information about the partitions and replicas of the topic:

Topic: my-kafka-topic   TopicId: 0ohYPRHiR_CDL5VlbSluCw PartitionCount: 3       ReplicationFactor: 2    Configs: 
        Topic: my-kafka-topic   Partition: 0    Leader: 1       Replicas: 1,3   Isr: 1,3
        Topic: my-kafka-topic   Partition: 1    Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: my-kafka-topic   Partition: 2    Leader: 3       Replicas: 3,2   Isr: 3,2

Producers: Sending Messages to a Topic

The “producer” is the process that puts data into our Kafka cluster. The command line tools in the bin directory provide us with a console producer, that inputs data into the cluster every time your enter text into the console.

To start the console producer, run the command:

bin/kafka-console-producer.sh --broker-list localhost:9093,localhost:9094,localhost:9095 --topic my-kafka-topic
  • broker-list points the producer to the addresses of the brokers that we just provisioned
  • topic specifies the topic you want the data to go to.

You should now see a command prompt, where you can enter text which gets inserted into the Kafka cluster every time you hit enter.

Consumers

We can use Kafka consumers to read data from the cluster. In this case, we’ll read the data that we produced in the previous section.

To start a consumer, run the command:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my-kafka-topic --from-beginning
  • bootstrap-server can be any one of the brokers in the cluster
  • topic should be the same as the topic under which you producers inserted data into the cluster.
  • from-beginning tells the cluster that you want all the messages that it currently has with it, even messages that we put into it previously.

When you run the above command, you should immediately see all the messages that you input using the producer, logged onto your console.

If you input any more messages with the producer while the consumer is running, you should see it output into the console in real time.

This way, Kafka acts like a persistent message queue, saving the messages that were not yet read by the consumer, while passing on new messages as they come while the consumer is running

Testing Replication: What if a Broker Goes Offline?

Now that the Kafka cluster is set up on our system, let’s test the replication of our data.

Shut down one of the three brokers that you ran, and you should see that your cluster is still running fine. This means that Kafka is tolerant to some of its nodes failing.

Try starting another consumer in a different terminal window:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my-kafka-topic --from-beginning --group group2

The only thing we’ve added here is the group option, which differentiates one consumer from another. Once you run the command, you should see all messages getting logged on the console from the beginning.

Even though one of our brokers was shut down, our data was not lost. This is because the replication factor of 2 that we set earlier ensured that a copy of our data was present on multiple brokers.

Even though Broker1 is offline, we can still retrieve all 3 partitions from the remaining brokers

You can play around with your setup in many more ways. What happens if you take down another broker? What if you had 5 brokers and took 2 of them down? What if you changed the replication factor for your topic?

The best way to know how resilient Kafka is, is to experiment with it yourself.

Implementing Kafka Producers and Consumers

Once you have your cluster up and running, you can implement a producer and consumer in your application code.

You can read my other posts on how to implement a Kafka producer and consumer in Golang, Node.js, or Java.