In this post we will learn how to create a Kafka producer and consumer in Java. We will also look at how to tune some configuration options to make our application production-ready.
Kafka is an open-source event streaming platform, used for publishing and processing events at high-throughput.
For this post, we will be using the offical Apache Kafka client library to implement our own producer and consumer in a Java application. The Java application will use a standard maven project structure.
If you want to skip the explanation and see the code, you can view it on Github
Getting Started
First, make sure you have a running Kafka cluster on your machine.
Before diving into the code, we should know about brokers and topics, which will be needed by both the producer and consumer.
A “topic” can be thought of as a distinct queue or channel where messages are sent.
For most production applications, there isn’t a single Kafka server running, but rather a cluster of multiple servers called “brokers”. The messages for each topic are split amongst the various brokers.
To learn more about how each topic is split between the brokers, you can read the official docs.
We will also need to install the kafka client libraries. We can define these as dependencies in our projects pom.xml
file:
<dependencies>
<!-- This is the core library containing the classes we will use -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.0</version>
</dependency>
<!-- The kafka client libraries use the slf4j logger, so we need to
add this as a dependency so that the required classes are present
in our class path for the kafka client libraries to use
-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.9</version>
</dependency>
</dependencies>
You can view the entire configuration of the pom.xml file on Github
Creating the Kafka Producer
For the purpose of illustration, let’s create a function that writes a message into the Kafka cluster every second, forever:
public class App {
private static final String TOPIC = "my-kafka-topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9093,localhost:9094,localhost:9095";
private static void produce() {
// Create configuration options for our producer and initialize a new producer
Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
// We configure the serializer to describe the format in which we want to produce data into
// our Kafka cluster
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Since we need to close our producer, we can use the try-with-resources statement to
// create
// a new producer
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
// here, we run an infinite loop to sent a message to the cluster every second
for (int i = 0;; i++) {
String key = Integer.toString(i);
String message = "this is message " + Integer.toString(i);
producer.send(new ProducerRecord<String, String>(TOPIC, key, message));
// log a confirmation once the message is written
System.out.println("sent msg " + key);
try {
// Sleep for a second
Thread.sleep(1000);
} catch (Exception e) {
break;
}
}
} catch (Exception e) {
System.out.println("Could not start producer: " + e);
}
}
}
Creating the Kafka Consumer
When creating a consumer, we need to specify it’s group ID. This is because a single topic can have multiple consumers, and each consumers group ID ensures that multiple consumers belonging to the same group ID don’t get repeated messages.
Let’s create another consume
method that consumes messages from the Kafka cluster whenever they’re available:
public class App {
private static final String TOPIC = "my-kafka-topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9093,localhost:9094,localhost:9095";
private static void consume() {
// Create configuration options for our consumer
Properties props = new Properties();
props.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS);
// The group ID is a unique identified for each consumer group
props.setProperty("group.id", "my-group-id");
// Since our producer uses a string serializer, we need to use the corresponding
// deserializer
props.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
// Every time we consume a message from kafka, we need to "commit" - that is, acknowledge
// receipts of the messages. We can set up an auto-commit at regular intervals, so that
// this is taken care of in the background
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
// Since we need to close our consumer, we can use the try-with-resources statement to
// create it
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
// Subscribe this consumer to the same topic that we wrote messages to earlier
consumer.subscribe(Arrays.asList(TOPIC));
// run an infinite loop where we consume and print new messages to the topic
while (true) {
// The consumer.poll method checks and waits for any new messages to arrive for the
// subscribed topic
// in case there are no messages for the duration specified in the argument (1000 ms
// in this case), it returns an empty list
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("received message: %s\n", record.value());
}
}
}
}
}
Running our Application
The produce
and consume
methods both will run indefinitely. We can start them together on different threads in our main
method:
public class App {
// ...
public static void main(String[] args) throws Exception {
Thread consumerThread = new Thread(App::consume);
consumerThread.start();
Thread producerThread = new Thread(App::produce);
producerThread.start();
}
// ...
}
If you run this code, you’ll see an output similar to this:
❯ mvn clean compile assembly:single && java -jar target/java-kafka-example-1.0-SNAPSHOT-jar-with-dependencies.jar
sent msg 0
received message: this is message 0
sent msg 1
received message: this is message 1
sent msg 2
received message: this is message 2
sent msg 3
received message: this is message 3
sent msg 4
received message: this is message 4
sent msg 5
received message: this is message 5
sent msg 6
received message: this is message 6
sent msg 7
received message: this is message 7
sent msg 8
received message: this is message 8
In the next section we’ll go through some configuration options that you can set to optimize the Kafka client for your needs.
Tuning Kafka Client Configuration
There are a lot of options that you can configure when creating the Kafka producer and consumer. In this section we’ll go over some of the more important configuration options, and what they mean.
You can see the full list of configuration options here
Minimum Buffered Bytes
As we’ve seen from the previous example, the data received by the consumer isn’t exactly “real time”.
The consumer polls the Kafka brokers to check if there is enough data to receive. The minimum buffered bytes defines what “enough” is. For example, if we have a configuration like this:
Properties props = new Properties();
// other properties are the same as before
// We can set the minimum buffered bytes using the "fetch.min.bytes" property
props.setProperty("fetch.min.bytes", "5");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
// ...
// ...
}
this means that if the consumer polls the cluster to check if there is any new data on the topic for the my-group-id
consumer ID, the cluster will only respond if there are at least 5 new bytes of information to send.
Setting fetch.min.bytes
would help to receive the data in batches, which would reduce the overall throughput and load on your system. However, if there is a long period of time that elapses before the amount of new data crosses the fetch.min.bytes
value, it would result in the previous data getting stuck for that amount of time.
Max Wait Time
The fetch.max.wait.ms setting helps mitigate the problem discussed above. It sets the maximum time to wait between receiving messages from the Kafka cluster, regardless of the fetch.min.bytes
setting.
So, if we set up our reader with the following config:
Properties props = new Properties();
// other properties are the same as before
props.setProperty("fetch.min.bytes", "5");
// Here, we can set the maximum wait time along with the minimum bytes.
props.setProperty("fetch.max.wait.ms", "3000");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
// ...
// ...
}
that would mean that the consumer would have to wait at the most 3 seconds before receiving any new messages, even if the new messages did not cross the min bytes setting that we set previously.
Start Offset
When a new consumer is added to a topic, it has two options for where it wants to start consuming data from:
- Earliest - The consumer will start consuming data for a topic starting from the earliest message that is available.
- Latest - Only consume new messages that appear after the consumer has joined the cluster. This is the default setting.
We can set the auto.offset.reset
property to earliest
or latest
(the default) depending on our requirements
private static void consume() {
Properties props = new Properties();
// other properties are the same as before
props.setProperty("auto.offset.reset", "earliest");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
// ...
}
}
Note that this only applies for new consumer groups. If you’ve already consumed data with the same consumer
GroupID
setting before, you will continue from wherever you left off.
Message Batching
So far we’ve looked at configuration on the consumer side. Let’s take a look at some producer/writer side configuration options.
Similar to the consumer, the producer also tries to send messages in batches. This is to reduce the total number of network round trips and improve efficiency in writing messages, but comes at the cost of increased overall latency.
When batching messages, we can set:
- Batch Size - The total number of messages that should be buffered before writing to the Kafka brokers.
- Batch Timeout (Linger) - The maximum time before which messages are written to the brokers. That means that even if the message batch is not full, they will still be written onto the Kafka cluster once this time period has elapsed.
In our code, we can set this configuration using the batch.size
and linger.ms
options:
private static void produce() {
Properties props = new Properties();
// same as before
props.put("batch.size", "10");
// no matter what happens, write all pending messages
// every 2 seconds
props.put("linger.ms", "2000");
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
// same as before
}
}
If you want your writer to immediately send every message it gets, set the batch size to 1
Required Acknowledgements
When we call the producer.send
method in our example, it returns a future that resolves once the message is confirmed to be written. However, the definition of what “confirmed” means can be different based on your settings.
Remember, the Kafka cluster (and your topic partitions) is distributed between multiple brokers. Of these, one of the brokers is the designated leader and the rest are followers.
Keeping this in mind, there are three modes of acknowledgement (represented by integers) when writing messages to the cluster:
- All brokers acknowledge that they have received the message (represented as
-1
) - Only the leading broker acknowledges that it has received the messages (represented as
1
). The remaining brokers can still eventually receive the message, but we won’t wait for them to do so. - No one acknowledges receiving the message (represented as
0
). This is basically a fire-and-forget mode, where we don’t care if our message is received or not. This should only be used for data that you are ok with losing a bit of, but require high throughput for.
In our code, we can use the acks
property to set the maximum required acknowledgements:
private static void produce() {
Properties props = new Properties();
// same as before
props.put("acks", "1");
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
// same as before
}
}
Logging Options
In our example, we printed to the console every time a messages was written or read. We may want more information sometimes for debugging or to know more about our cluster.
The Kafka client library uses the slf4j logging interface to produce logs, for which we can provide an implementation at deployment time.
The default binding is a NOPLogger, which means that nothing is logged and all logs are silently discarded. We can add another implementation as a dependency in our pom.xml
file, and the Kafka client library will use it as its slf4j binding.
For now, we can use the SimpleLogger binding, that prints messages to the console. To see this in action, let’s add the dependency to our pom.xml
file:
<dependencies>
<!-- same as before -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
When running the program, we can see that the output contains much more information than before:
❯ java -jar target/java-kafka-example-1.0-SNAPSHOT-jar-with-dependencies.jar
[Thread-1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.2.1
[Thread-1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: b172a0a94f4ebb9f
[Thread-1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1660562904275
[Thread-0] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.2.1
[Thread-0] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: b172a0a94f4ebb9f
[Thread-0] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1660562904290
[Thread-0] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-my-group-id-1, groupId=my-group-id] Subscribed to topic(s): my-kafka-topic
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition my-kafka-topic-0 to 0 since the associated topicId changed from null to XQFoKDATSdO4ayRHRTVN7A
[Thread-0] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-my-group-id-1, groupId=my-group-id] Resetting the last seen epoch of partition my-kafka-topic-0 to 0 since the associated topicId changed from null to XQFoKDATSdO4ayRHRTVN7A
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition my-kafka-topic-2 to 0 since the associated topicId changed from null to XQFoKDATSdO4ayRHRTVN7A
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Resetting the last seen epoch of partition my-kafka-topic-1 to 0 since the associated topicId changed from null to XQFoKDATSdO4ayRHRTVN7A
[Thread-0] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-my-group-id-1, groupId=my-group-id] Resetting the last seen epoch of partition my-kafka-topic-2 to 0 since the associated topicId changed from null to XQFoKDATSdO4ayRHRTVN7A
[Thread-0] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-my-group-id-1, groupId=my-group-id] Resetting the last seen epoch of partition my-kafka-topic-1 to 0 since the associated topicId changed from null to XQFoKDATSdO4ayRHRTVN7A
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: W2SThqvyQPaEln-9lcM5vQ
[Thread-0] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-my-group-id-1, groupId=my-group-id] Cluster ID: W2SThqvyQPaEln-9lcM5vQ
[Thread-0] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-my-group-id-1, groupId=my-group-id] Discovered group coordinator localhost:9095 (id: 2147483644 rack: null)
[Thread-0] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-my-group-id-1, groupId=my-group-id] (Re-)joining group
[Thread-0] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-my-group-id-1, groupId=my-group-id] Request joining group due to: need to re-join with the given member-id: consumer-my-group-id-1-5c38b6ab-b448-44a4-8185-a0b5505935b7
[Thread-0] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-my-group-id-1, groupId=my-group-id] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException)
[Thread-0] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-my-group-id-1, groupId=my-group-id] (Re-)joining group
sent msg 0
sent msg 1
...
...
Although detailed logging can be helpful in a non-production environment, you should be careful before using it in production, to prevent your logs from being polluted with too much information
Further Reading
This post has covered most of the important options that I personally use in a production environment, but is by no means exhaustive.
There are many more configuration options that you may find important for your use case. You can read the full list here.
You can see the working example of the code explained here on Github.