We recently deployed a Kafka Connect environment to consume Avro messages from a topic and write them into an Oracle database. Everything seemed to be functioning just fine until we got a message from the team saying their connectors had suddenly stopped working.
On further investigation we found errors like this in the Kafka Connect logs:
2020-01-17 12:56:48 ERROR Uncaught exception in thread 'kafka-producer-network-thread | producer-25':
java.lang.OutOfMemoryError: Java heap space
2020-01-17 13:02:54 ERROR Uncaught exception in thread 'kafka-producer-network-thread | producer-57':
java.lang.OutOfMemoryError: Direct buffer memory
Our first thought was that Kafka Connect just needed more heap space so we increased it from the defaults (256MB-1GB) up to a fixed 8GB heap but the errors kept coming. We increased it further up to 20GB and the errors were still happening. The machine was receiving one message every few seconds but the Kafka Connect process was using around 97% of the RAM and over 80% CPU. This machine has 8 CPUs and 32GB RAM so clearly something wasn’t right!
In this case we were using a custom Kafka Connect plugin to convert messages from the topic into the required format to be inserted into Oracle so the first thought was, do we have a memory leak in our code? We went over and over our plugin code and could not see anywhere that could possibly be leaking memory so we looked back at Kafka Connect itself.
What we could see was that our sink connectors would run until an invalid message was pushed into the topic, at which point the OutOfMemoryError exceptions started appearing in the logs. This made sense as the errors were only ever logged from producer threads and this Kafka Connect instance was only running Sink connectors, so it must be related to pushing the invalid messages to dead letter queues.
A typical connector configuration for this use case looks something like this:
{
"name": "oracle-sink-test",
"config": {
"connector.class": "com.mydomain.PluginClass",
"connector.type": "sink",
"tasks.max": "1",
"topics": "source_topic",
"topic.type": "avro",
"connection.user": "DBUserName",
"connection.password": "DBPassword",
"connection.url": "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=oracle1)(PORT=9020))(CONNECT_DATA=(SERVER=DEDICATED)(SERVICE_NAME=SINKTEST)))",
"db.driver": "oracle.jdbc.driver.OracleDriver",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://schemaregistry:8443",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name":"dlq_sink_test",
"errors.deadletterqueue.topic.replication.factor": 1,
"errors.deadletterqueue.context.headers.enable": true
}
}
As you can see in the example configuration our connectors were configured with dead letter queues and so we tried changing the connectors to and removing the dead letter queue config. As we had hoped, this change meant the connectors would now fail with an error when they encountered an invalid message but we also observed much lower CPU and RAM usage while the connectors were running. The next thing we tried was leaving set to and putting the dead letter queue config back into the connector. This resulted in the CPU and RAM use going back up again and instead of failing on an invalid message the connector job would hang and its consumer would eventually get timed out by the broker.
For a detailed explanation of error handling in Kafka Connect see this blog post which explains it far better than I ever could: https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/.
So what was going on?
For a hint, here is an example of what the end of our looked like:
bootstrap.servers=broker1:9095,broker2:9095
security.protocol=SSL
ssl.truststore.location=/path/to/truststore.jks
ssl.truststore.password=truststorepassword
ssl.keystore.location=/path/to/keystore.jks
ssl.keystore.password=keystorepassword
ssl.key.password=keypassword
consumer.bootstrap.servers=broker1:9095,broker2:9095
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/path/to/truststore.jks
consumer.ssl.truststore.password=truststorepassword
consumer.ssl.keystore.location=/path/to/keystore.jks
consumer.ssl.keystore.password=keystorepassword
consumer.ssl.key.password=keypassword
As you can see we have SSL enabled on the brokers. We didn’t give this much thought because there are SSL settings in there, we had no SSL-related errors in the logs and nothing else suggested that the issue was related to SSL. After much investigation and Googling for answers we found this open issue in the Kafka bug tracker: “JVM runs into OOM if (Java) client uses a SSL port without setting the security protocol” (https://issues.apache.org/jira/browse/KAFKA-4090). This was the hint we needed to fix our problem.
We had configured SSL settings for Kafka Connect’s internal connections and for the consumers but we had not configured SSL for the producer threads. This was possibly an oversight as we were only running Sink connectors on this environment, but of course there are producer threads running to push invalid messages to the dead letter queues. Based on the information in KAFKA-4090 we decided to add explicit SSL settings for the producer threads like this:
producer.bootstrap.servers=broker1:9095,broker2:9095
producer.security.protocol=SSL
producer.ssl.truststore.location=/path/to/truststore.jks
producer.ssl.truststore.password=truststorepassword
producer.ssl.keystore.location=/path/to/keystore.jks
producer.ssl.keystore.password=keystorepassword
producer.ssl.key.password=keypassword
After making this change we restarted Kafka Connect and suddenly the CPU use went from 80-90% down to 5% and the RAM use went down from 95% to just the Java heap size plus a little (around 300MB in total as the heap was now set to 256MB-1GB).
We reverted the connector configuration back to with the dead letter queue configured and hey presto! messages started being consumed from the topic and the invalid messages were being correctly pushed to the dead letter queue.
So in summary, if you are seeing unexpected out of memory exceptions in Kafka Connect and you are using SSL to communicate with the brokers, make sure you configure the SSL settings individually for all three types of connection – internal connections, consumers and producers.
Related Articles
K3s – lightweight kubernetes made ready for production – Part 3
Do you want to know securely deploy k3s kubernetes for production? Have a read of this blog and accompanying Ansible project for you to run.
K3s – lightweight kubernetes made ready for production – Part 2
Do you want to know securely deploy k3s kubernetes for production? Have a read of this blog and accompanying Ansible project for you to run.
K3s – lightweight kubernetes made ready for production – Part 1
Do you want to know securely deploy k3s kubernetes for production? Have a read of this blog and accompanying Ansible project for you to run.