Read / Process / Write with Kafka Transactions

May 18, 2022
No items found.
Read / Process / Write with Kafka Transactions

Many people use Kafka as a data hose for a stream of events where idempotence is not critical. For example, if you are recording user clicks on a website then it's likely that if during a system failure, a number of records get replayed (at least once semantics) or not processed (at most once semantics) then it is not the end of the world.

That same behaviour is not going to make your company happy when you are using Kafka to process important business events which often have several steps all bundled into one operation.

For example, you might have an order processing application that takes an order event and creates two new events; charge_customer and ship_product. These events are picked up by separate downstream systems which debit the customer's account and ship the product respectively.

If your application fails after creating the charge_customer event but before the ship_product event then you are going to have some unhappy customers who get charged for a product that never arrives.

For important event processing, you need to be able to rely on Kafka to provide atomic transactions (which is to guarantee that the transaction will complete or return to its original state if it aborts).

Kafka has had the exactly once semantics capability since 0.11 (KIP-98) and it received several quality-of-life improvements in 2.5

How does it work?

The following diagram details how a read/process/write business event can be handled.

For simplicity's sake, only single partition topics are represented but the same general principle applies

This diagram has a lot going on in it so let's have a look at a rough code example to highlight the steps

// Producer setup.  Note that transaction.id is defined which is required
KafkaProducer producer = createKafkaProducer( "bootstrap.servers", "localhost:9092", "transactional.id", "my-transactional-id", "enable.idempotence", "true"); 
producer.initTransactions(); 

// Consumer setup. Note that isoloation.level is set to read_committed which is required for transactional aware consumers
KafkaConsumer consumer = createKafkaConsumer( "bootstrap.servers", "localhost:9092", "group.id", "my-group-id", "isolation.level", "read_committed"); 
consumer.subscribe(singleton("inputTopic")); 

while (true) { 
   // Section A: Read records from a topic
   ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
   
   // Section B: Producer registers a transactional.id with the transaction coordinator.  
   // Transaction coordinator closes out any pending transaction and increments the epoch 
   // Also fences out Zombies (Section below)
   producer.beginTransaction(); 
   for (ConsumerRecord record : records)
   // Section D: write the new records
      producer.send(producerRecord("outputTopic", record))
      
   // Section E: The producer sends a map of offsets to the consumer coordinator which writes the offset to __consumer_offsets topic
   // Note: offsetMapFunction is not an api function.  You will have to build this yourself based on records.partitions()
   //       don't forget to add 1 to the current offset so the consumer gets the next record
   producer.sendOffsetsToTransaction(offsetMapFunction(), "my-group-id"); 
   
   // Section F: The transaction coordinator initiates a commit (or abort).
   // It writes transaction commit markers to the topic partitions as well as the __consumer_offset topic to allow the next
   // poll to get the next record
   producer.commitTransaction();
}

One thing I want to highlight in the above code is that, unlike a non-transactional process flow, it is the transactional producer’s responsibility to update the consumer groups offsets via sendOffsetsToTransaction. This way, offsets get committed only if the entire transaction (consuming and producing) succeeds

The upshot to all this is that in the event of a failure, both the writes to the destination topic(s) as well as the reads of the source topics all get rolled back to the state they were before the transaction started. Thus we have an Atomic Transaction

The customer doesn’t get charged for a product they won’t receive and their order event is not considered as consumed by the applications consumer group so will be reprocessed correctly.

Atomic transactions introduced a number of new components into Kafka:

Transaction Coordinator

Similar to the consumer group coordinator, each producer is assigned a transaction coordinator, and all the logic of assigning Producer IDs and managing transactions is done by the transaction coordinator.

It is responsible for enforcing that there is only one active producer per Transaction ID

In order to identify the active producer for each transactional ID, the transaction coordinator tracks two values that are associated with that ID: a producer ID and a producer epoch.

The producer ID is provided by the producer client and the epoch is bumped each time the producer starts up.

This ensures that any previous zombie instances of the producer are fenced off and allows the transaction coordinator to recover (roll forward or back) any transaction left incomplete by the previous instance of the producer.

Idempotent Producer

An idempotent producer is one that can potentially send the same message to the broker multiple times and have it only be written to the topic log once.

For a single partition, Idempotent producer sends remove the possibility of duplicate messages due to producer or broker errors

To enable this in a producer you set the property “enable.idempotence=true”. You must also be using “acks=all

Idempotence applies to a single partition so if the message ends up getting sent to a different partition then it can still be written to the topic log. This is generally not an issue as resend attempts are not affected by partitioning logic so should always go to the same partition.

Idempotence requires the producer to have a retry setting of > 0 (It’s 2147483647 by default)

The way that it works is that each batch of messages sent to Kafka will contain a sequence number and producer ID that the brokers user to dedupe any duplicates sent.

The sequence number is increased every time the producer sends a message to the broker (not including retries)

The broker then keeps track of the largest Producer ID-Sequence number that it has successfully written and discards anything lower.

For example, let’s say that a producer sends a batch of 3 messages asynchronously; m1, m2, m3

  • m1 is sent successfully
  • m2 is sent and written to the topic by the broker but a failure occurs on sending the ack back to the producer
  • m3 is sent successfully
  • m2 is resent by the producer

Without idempotence enabled we could have m1,m2,m3,m2 written to the topic.

With idempotence enabled we have the following:

The sequence number and producer id that get added to the message add very little overhead to message processing.

If you already use acks=all on your producer then you may as well enable the idempotent producer. It’s a feature that works extremely well.

Consumer Isolation Level

Isolation level for consumers controls whether a consumer will read a message that is part of a transaction and not yet committed.

When set to read_committed, the consumer will only be presented with transactional messages that have been fully committed.

When set to read_uncommitted (default value) the consumer will receive all messages, including transactional messages that are in flight or aborted.

If you are going to be using transactions then you need to set the isolation level to read_committed

Zombie fencing:

In distributed systems, there is always issues such as application crashes or network interruptions. Automatic restarts and transient issues can often lead to situations where we have multiple instances processing the same input topics and writing to the same output topics, leading to duplicates which violate Exactly Once Semantics.

These are referred to as zombies.

Kafka deals with zombies by requiring each transactional producer to be assigned a unique identifier (transactional.id).

When a transactional producer starts up it registers its transaction.id with the transaction coordinator. The transaction coordinator also increments an epoch time associated with the transaction.id.

Each message sent to the broker will have the transaction.id and epoch included.

Any producers using the same transaction.id and an older epoch are then considered zombies and fenced off. This means that any future writes from them are rejected.

When the broker starts, it reads each partition log and materializes a cache mapping each producer ID to that ID’s current epoch. This allows for even restarted or new brokers to be able to deal with any Zombies.

Conclusion

This should give you a good understanding of how Atomic transactions work within Kafka and how you can use them to protect important processes from failures.

If you want to know more then I would highly recommend that you have a read of where it all started with KIP-98. It's quite easy to read and covers the process in great detail.

If you would like to know more about how to implement modern data, streaming and cloud technologies, such as Apache Kafka, into your business, we at Digitalis do it all: from data engineering, and cloud migration to fully managed services, we can help you modernize your applications, operations, and data. We provide consulting and managed services on cloud, data, and DevOps for any business type. Contact us for more information.

Subscribe to newsletter

Subscribe to receive the latest blog posts to your inbox every week.

By subscribing you agree to with our Privacy Policy.
Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Ready to Transform 

Your Business?