Easy Stream Management with Kafka Connect

May 3, 2022
Karl Sorensen

Kafka Connect saves you effort by providing a common framework to connect Kafka with external systems using Connectors.

External systems include a huge range such as

  • RDBMS (Oracle, DB2, MySQL, Postgres)
  • Object Stores (S3, Azure Blob, Google Cloud Storage)
  • Message Queues (ActiveMQ, IBM MQ, MQTT, GCP PubSub)
  • NoSQL (Cassandra, Elasticsearch, MongoDB)
  • HTTP

Connectors are divided into two categories; sources and sinks

Source Connectors are responsible for ingesting from an external system into a Kafka Topic

Sink Connectors deliver data to external systems from Kafka topics

Components

Kafka Connect Components

Worker Process: The worker process is the underlying process that is started up and is responsible for executing connectors and tasks.

In distributed mode, it spins up a rest interface to allow for interaction with the Connect Cluster.

Each Worker Process rest interface can accept requests for resources that are hosted on any node of the cluster. It will forward any request that is for a resource it doesn’t control.

Connector: A connector instance is a logical job that is responsible for managing the copying of data between Kafka and another system.

Each connector instance coordinates a set of tasks

Task: Tasks are responsible for actually copying the data. Tasks have no state stored within them. Task state is stored in Kafka in special topics config.storage.topic and status.storage.topic and managed by the associated connector

Plugins: The classes used by a connector are defined in plugins. The plugins are how the connector is able to communicate with external systems.

The worker process uses the configuration property plugin.path to locate plugins. This path generally should point to the parent directory where the plugin is installed.

For example, if you are installing the Debezium MySQL connector and it is installed at /opt/connect/plugins/debezium-connector-mysql then your plugin.path will point to /opt/connect/plugins and NOT /opt/plugins/debezium-connector-mysql

Operating Mode

Kafka Connect can be run in either Standalone mode or Distributed Mode.

Standalone mode is where everything runs in a single process.

Distributed mode is designed to run across multiple nodes (called processes) and is fault-tolerant. It is managed through a rest interface and uses dedicated Kafka topics to track progress.

When running in distributed mode the group.id setting will control what cluster the process is part of. A unique Kafka Connect cluster will be created for each group.id that exists, including requirements for dedicated special topics, etc.

Kafka Connect Cluster with each Worker Process having the same group.id setting — Note a connector's tasks don’t have to reside on the same worker process as the connector itself.

Configuring a Connector

In this section, we will set up a quick connector to read from a MySQL DB

Setup conf/connect-distributed.properties

bootstrap.servers=127.0.0.1:9092
group.id=connect-cluster
plugin.path=/opt/connect/plugins

If you have security enabled in your Kafka cluster then you need to add the relevant configs discussed here

Pull down the Debezium plugin

curl https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.9.2.Final/debezium-connector-mysql-1.9.2.Final-plugin.tar.gz | tar -C /opt/connect/plugins

Start Kafka Connect in distributed mode

bin/connect-distributed.sh conf/connect-distributed.properties

Prepare connector config debezium-connector.config

{
  "name": "debezium-connector",  
  "config": {  
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "my-database-host",  
    "database.port": "3306",
    "database.user": "testuser",
    "database.password": "testpassword",
    "database.server.id": "1999",  
    "database.server.name": "testdb",  
    "database.include.list": "debeziumtest",  
    "database.history.kafka.bootstrap.servers": "127.0.0.1:9092",  
    "database.history.kafka.topic": "schema-changes.testdb",
 }
}

Create the connector

curl -d @debezium-connector.config -H “Content-Type: application/json” -X POST http://127.0.0.1:8083/connectors

Confirm connector running

curl http://127.0.0.1:8083/connectors/debezium-connector/status | jq
{
  "name": "debezium-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "127.0.0.1:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "127.0.0.1:8083"
    }
  ],
  "type": "source"
}

The rest interface provides a number of commands to allow for the control of the connector and its associated classes.

Confluent provide comprehensive documentation for it.

Why use Kafka Connect instead of rolling your own.

We sometimes get asked the question of why not create bespoke clients to perform the same function. After all, I could write a short python script to achieve a lot of the same functionality.

But then you start to wonder if you’ve thought about things like:

  • Failure recovery
  • Restart
  • logging
  • Testing
  • Orchestration

The Kafka Connect framework handles all of this for you and joins up with the existing methods you probably already have set up for Kafka itself.

If you would like to know more about how to implement modern data, streaming and cloud technologies, such as Apache Kafka, into your business, we at Digitalis do it all: from data engineering, cloud migration to fully managed services, we can help you modernize your applications, operations, and data. We provide consulting and managed services on cloud, data, and DevOps for any business type. Contact us for more information.

Subscribe to newsletter

Subscribe to receive the latest blog posts to your inbox every week.

By subscribing you agree to with our Privacy Policy.
Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Ready to Transform 

Your Business?