Anyone who is involved in maintaining a production Kafka cluster will worry about the messages that are within the system. There are a number of retention policies within the core Kafka framework, they work perfectly well and as you expect them to. It is, however, worthwhile kicking the tyres to confirm assumptions.
In this post I will create a Kafka topic and, using the command line tools to alter the retention policy and then confirm that messages are being retained as we would expect them too.
Kafka Topic Retention
Message retention is based on time, the size of the message or on both measurements. In most cases, that I’m aware of, using retention based on time is preferred by most companies.
For time based retention, the message log retention is based on either hours, minutes or milliseconds. In terms of priority to be actioned by the cluster milliseconds will win, always. You can set all three but the lowest unit size will be used.
retention.ms takes priority over retention.minutes which takes priority over retention.hours.
Where possible I advise you to use only one retention setting type, I use the retention.ms setting as a rule, this gives me the complete control I need.
A Prototype Example
- Create a topic with a retention time of 3 minutes.
- Send a message to the topic with an obvious time in the payload.
- Alter the topic configuration and add another 30 minutes of retention time.
- Consume the message after the original three minute period and see if it’s still there.
Create a Topic
I’m using a standalone Kafka instance so there’s only one partition and one replica. The interesting part in this exercise is the configuration at the end. Using the retention.ms setting I’m setting the topic retention time to three minutes (3 minutes x 60 seconds x 1000 milliseconds = 180000 milliseconds).
$ bin/kafka-topics –zookeeper localhost:2181 –create –topic rtest2 –partitions 1 –replication-factor 1 –config retention.ms=180000
Send a Test Message
I’m using the kafka-console-producer command to send a plain text message to the topic. The topic doesn’t have a schema so I can send any type of message I wish, in this example I’m sending JSON as a string.
$ bin/kafka-console-producer –broker-list localhost:9092 –topic rtest2
>{“name”:”This is a test message, this was sent at 16:15″}
The message is now in the topic log and will be deleted just after 16:18. But I’m now going to extend the retention period to preserve that message a little longer.
Alter the Topic Retention
$ bin/kafka-configs –alter –zookeeper localhost:2181 –add-config retention.ms=1800000 –entity-type topics –entity-name rtest2
Completed Updating config for entity: topic ‘rtest2’.
Check the Topic by Consuming Messages
Leaving the cluster alone for 10-15 minutes will give enough time for the original message being committed to the message log after the configuration change. Running the consumer from the earliest offset will bring back the original messages. If the configuration change has worked as expected then the original message sent at 16:15 will be output.
$ bin/kafka-console-consumer –bootstrap-server localhost:9092 –topic rtest2 –from-beginning
{“name”:”This is a test message, this was sent at 16:15″}
Processed a total of 1 messages
Okay that’s worked perfectly well as expected, now I’ll try it again because I want to confirm it again. I’ll add the date this time for added confirmation.
$ date ; bin/kafka-console-consumer –bootstrap-server localhost:9092 –topic rtest2 –from-beginning
Fri 3 Apr 16:24:18 BST 2020
{“name”:”This is a message, this was sent at 16:15″}
Processed a total of 1 messages
Looking good. I’m going to do it again because I want to make sure. While I know it’s not an accident I like to check again.
$ date ; bin/kafka-console-consumer –bootstrap-server localhost:9092 –topic rtest2 –from-beginning
Fri 3 Apr 16:24:50 BST 2020
{“name”:”This is a message, this was sent at 16:15″}
Conclusion
In this brief post I’ve proved that messages written prior to the retention change have been preserved. To confirm the assumption creating a topic, sending a message and then altering the retention time gave us the evidence we require.
Related Articles
Getting started with Kafka Cassandra Connector
If you want to understand how to easily ingest data from Kafka topics into Cassandra than this blog can show you how with the DataStax Kafka Connector.
K3s – lightweight kubernetes made ready for production – Part 3
Do you want to know securely deploy k3s kubernetes for production? Have a read of this blog and accompanying Ansible project for you to run.
K3s – lightweight kubernetes made ready for production – Part 2
Do you want to know securely deploy k3s kubernetes for production? Have a read of this blog and accompanying Ansible project for you to run.