Kafka Poison Message Antidote

April 10, 2024
No items found.

Data validation and comprehensive error handling is one of those important tasks in building a Kafka application that I see skipped far too often.

Even with the use of schema manager to force some semblance of message structure, it is all too easy for a message to bring your consumer to a screaming halt; the dreaded poison message.

If you’re in the position of managing a Kafka cluster but not responsible for the application development (or you are the developer and have messed up :) )you might end up with a panicked call from a business unit reporting that their app has stopped processing messages and topic lag is approaching stratospheric levels.

You have a look and discover that someone has inserted a message with content that the consumer dev team never expected (null messages are depressingly common!), the app is now simply refusing to skip past the message and is in a crash loop or has hung.

Obviously, the best solution is to have the app team create a proper fix that can gracefully handle the poison message, but it’s 2am on a Monday morning, batch is frozen and the business day is fast approaching. What do we do to rescue the situation?

Remember, Kafka is not a message queue. We can’t simply remove the message and allow processing to resume. What we can do is the next best thing: modify the consumer group offset to skip over the poison message so the app doesn’t try to consume it again.

The process for doing is quite straight forward but it is definitely one of those things you want take a breath and make sure you are doing the right thing before you run the command because there can be some unexpected consequences which I’ll highlight near the end.

Let's see an example.

First, we create a topic:

bin/kafka-topics.sh --bootstrap-server $(hostname):9092 --create --topic simpletopic --partitions 1 --replication-factor 1

This is just a simple topic with only 1 partition

Now, we can write to it with some random numbers and then insert a poison message after a bit before writing some more valid data.

from kafka import KafkaProducer
import random

kafka_brokers= ['127.0.0.1:9092']
kafka_topic = "simpletopic"

print ("Connecting to broker")
producer = KafkaProducer(bootstrap_servers=kafka_brokers)
print ("Connected")

print ("Sending normal messages")
for _ in range(random.randint(5,10)):
    future = producer.send(kafka_topic, value=str.encode(str(random.randint(0,9))))
    record_metadata = future.get(timeout=10)

print ("Sending poison message")
future = producer.send(kafka_topic, value=str.encode("0"))
record_metadata = future.get(timeout=10)

print ("Inserting poison message in partition: " + str(record_metadata.partition))
print ("Inserting poison message at offset: " + str(record_metadata.offset))

print ("Sending normal messages again")
for _ in range(random.randint(5,10)):
    future = producer.send(kafka_topic, value=str.encode(str(random.randint(1,900))))
    record_metadata = future.get(timeout=10)

print ("done")

and run it

Connecting to broker
Connected
Sending normal messages
Sending poison message
Inserting poison message in partition: 0
Inserting poison message at offset: 8
Sending normal messages again
done

We can see that the poison message went in at offset 8

Now, we create a simple consumer that just divides a number by the number in the topic

from kafka import KafkaConsumer
import threading

def consume_messages(topic_name, group_id, consumer_id):
    consumer = KafkaConsumer(
        topic_name,
        bootstrap_servers=['127.0.0.1:9092'],  
        enable_auto_commit=False,
        group_id=group_id
    )

    print(f"Consumer {consumer_id} started consuming")

    try:
        for message in consumer:
            print(f"Consumer {consumer_id} received: {message.value.decode('utf-8')} from partition {message.partition} at offset {message.offset}")
            print(f"Running complex calculation! {666 / int(message.value.decode('utf-8'))}")
            consumer.commit()            
    finally:
        consumer.close()

def main():
    topic = 'testtopic'
    group = 'simple_topic'

    consumer1 = threading.Thread(target=consume_messages, args=(topic, group, 'Consumer1'))
    consumer1.start()
    consumer1.join()

if __name__ == "__main__":
    main()

We intentionally disable auto-commit so we don’t mark the poisoned message as consumed.

When we run the consumer, we should see that it hits the poison message at offset 8 and then dies

python kafka-read-single-stream.py 
Consumer Consumer1 started consuming
Consumer Consumer1 received: 889 from partition 0 at offset 0
Running complex calculation! 0.749156355455568
.....
Running complex calculation! 10.918032786885245
Consumer Consumer1 received: 127 from partition 0 at offset 7
Running complex calculation! 5.244094488188976
Consumer Consumer1 received: 0 from partition 0 at offset 8
Exception in thread Thread-1 (consume_messages):
Traceback (most recent call last):
  File "/usr/lib/python3.11/threading.py", line 1045, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.11/threading.py", line 982, in run
    self._target(*self._args, **self._kwargs)
  File "kafka-read-single-stream.py", line 19, in consume_messages
    print(f"Running complex calculation! {666 / int(message.value.decode('utf-8'))}")
                                          ~~~~^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
ZeroDivisionError: division by zero

We can confirm this by using the consumer group command

bin/kafka-consumer-groups.sh --bootstrap-server $(hostname):9092 --describe --group test_group

Consumer group 'test_group' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test_group      simpletopic     0          8               16              8 

The group test_group on topic simpletopic has a current offset of 8 and this is where it will attempt to resume if we were to start it again.

We could run the consumer again and again but it won’t be able to get past that poison message.

If we are in a situation where we can’t fix the app and believe the amount of poison messages are limited (hopefully only one!) then we can use the kafka-consumer-groups.sh command to skip ahead the offset.

There are few options of how to skip ahead but the one I use the most is --shift-by which allows you to specify the number of offsets to skip ahead (or back) by rather than having to provide an exact offset position.

kafka-consumer-groups has a handy --dry-run option that will tell you what the offset will be after you run the command

bin/kafka-consumer-groups.sh --bootstrap-server $(hostname):9092 --reset-offsets --shift-by 1 --group test_group --topic simpletopic --dry-run

GROUP                          TOPIC                          PARTITION  NEW-OFFSET     
test_group                     simpletopic                    0          9

You can see that it is proposing to reset the offset to 9 which is past the poison message

It's important to note that all members of the group need to be shut down; otherwise, Kafka will refuse the command, which makes a lot of sense. Resetting the offset of a running consumer would have unpredictable results at best!

If we go ahead and run the command with the --execute flag it will skip ahead the offset

bin/kafka-consumer-groups.sh --bootstrap-server $(hostname):9092 --reset-offsets --shift-by 1 --group test_group --topic simpletopic --execute

GROUP                          TOPIC                          PARTITION  NEW-OFFSET     
test_group                     simpletopic                    0          9        

which we can verify with the describe command

bin/kafka-consumer-groups.sh --bootstrap-server $(hostname):9092 --describe --group test_group

Consumer group 'test_group' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test_group      simpletopic     0          9               16              7               -               -               -

Now, if we restart the consumer it should happily continue on

python kafka-read-single-stream.py 
Consumer Consumer1 started consuming
Consumer Consumer1 received: 463 from partition 0 at offset 9
Running complex calculation! 1.4384449244060475
Consumer Consumer1 received: 670 from partition 0 at offset 10
Running complex calculation! 0.9940298507462687
Consumer Consumer1 received: 468 from partition 0 at offset 11
Running complex calculation! 1.4230769230769231
......

Fantastic…. however things are rarely that simple. It is pretty unlikely that you will encounter a topic with a single partition. In most scenarios, you will be using topics that have multiple partitions

Let's create a new topic with 4 partitions

bin/kafka-topics.sh --bootstrap-server $(hostname):9092 --create --topic multitopic --partitions 4 --replication-factor 1

Now, when we write the poison message we can see that it is written to partition 1, offset 12

python kafka-posion-writer.py 
Connecting to broker
Connected
Sending normal messages
Sending poison message
Inserting poison message in partition: 1
Inserting poison message at offset: 12
Sending normal messages again
done

and reading we again fall over at the poison message

python kafka-read-single-stream.py 
Consumer Consumer1 started consuming
Consumer Consumer1 received: 305 from partition 2 at offset 0
Running complex calculation! 2.183606557377049
Consumer Consumer1 received: 553 from partition 2 at offset 1
Running complex calculation! 1.2043399638336347
...
Consumer Consumer1 received: 0 from partition 1 at offset 12
Exception in thread Thread-1 (consume_messages):
Traceback (most recent call last):
  File "/usr/lib/python3.11/threading.py", line 1045, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.11/threading.py", line 982, in run
    self._target(*self._args, **self._kwargs)
  File "kafka-read-single-stream.py", line 19, in consume_messages
    print(f"Running complex calculation! {666 / int(message.value.decode('utf-8'))}")
                                          ~~~~^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
ZeroDivisionError: division by zero

This time, when describing the consumer groups we can see that we have different offsets for each partition

bin/kafka-consumer-groups.sh --bootstrap-server $(hostname):9092 --describe --group test_group

Consumer group 'test_group' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test_group      multitopic      0          19              19              0               -               -               -
test_group      multitopic      1          12              28              16              -               -               -
test_group      multitopic      2          31              31              0               -               -               -
test_group      multitopic      3          17              17              0               -               -               -

You can see that all the partitions that didn’t have the poison message in them have processed so there is no lag on them but partition 1 is stuck. Once again, it doesn’t matter how many times you run the consumer, it won’t advance past the poison message.

In addition to the extra partitions it is also unlikely that the producers will have stopped writing messages into the topic while you are trying to rescue the consumer so let’s make this a bit more production-like.

If we add some more records to the topic, it may end up looking something like this.

bin/kafka-consumer-groups.sh --bootstrap-server $(hostname):9092 --describe --group test_group

Consumer group 'test_group' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test_group      multitopic      0          19              36              17              -               -               -
test_group      multitopic      1          12              48              36              -               -               -
test_group      multitopic      2          31              45              14              -               -               -
test_group      multitopic      3          17              41              24              -               -               -

You can see that there is lag building up on all the partitions but it is largest for partition 1. Now, if we were to run the same offset skip dry run command of

bin/kafka-consumer-groups.sh --bootstrap-server $(hostname):9092 --reset-offsets --shift-by 1 --group test_group --topic multitopic --dry-run

GROUP                          TOPIC                          PARTITION  NEW-OFFSET     
test_group                     multitopic                     0          20             
test_group                     multitopic                     1          13             
test_group                     multitopic                     2          32             
test_group                     multitopic                     3          18

What do you notice?

You should be able to see that the offset has increased by one for all partitions.

If you were to run this command with --execute you will end up skipping records that still need to be processed in the other partitions!

I have seen this mistake made on several occasions, and it is not fun to recover from!

The simple solution in this case is to specify the partition when you run the reset offset command. You can do this by simply adding :<partition> to the — topic parameter in the command

bin/kafka-consumer-groups.sh --bootstrap-server $(hostname):9092 --reset-offsets --shift-by 1 --group test_group --topic multitopic:1 --dry-run

GROUP                          TOPIC                          PARTITION  NEW-OFFSET     
test_group                     multitopic                     1          13

here it is only resetting the offset for the partition that has the poison message in it.

Once that has been run, your consumer will only skip ahead in partition 1, and the other partitions will resume from where they left off.

It’s as simple as that!

I hope this has helped you to understand how to deal with poison message scenarios and hopefully know how to prevent accidentally moving offset past records that are yet to be processed.

If you would like to know more about how to implement modern data, streaming and cloud technologies, such as Apache Kafka, into your business, we at Digitalis do it all: from data engineering and cloud migration to fully managed services, we can help you modernize your applications, operations, and data. We provide consulting and managed services on cloud, data, and DevOps for any business type. Contact us for more information.

Subscribe to newsletter

Subscribe to receive the latest blog posts to your inbox every week.

By subscribing you agree to with our Privacy Policy.
Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Ready to Transform 

Your Business?