Simple Message Transforms with Kafka Connect

May 4, 2022
Karl Sorensen

We have previously shown how easy it is to set up Kafka Connect to stream to and from external systems.

However, there are times when the records aren’t quite what you want to send or receive.

There are plenty of heavyweight ways of handling this through stream processing engines such as Kafka Streams, KSQLDB or Apache Fink however you can also utilise Kafka Connect itself to do this through Single Message Transformations.

Imagine you have several satellite offices that use Kafka (and Kafka Connect) locally to manage their sales platform.

Each office has a topic setup that contains events of each sale made by that office.

{
“order_number” : “12345”,
“customer_id” : “USR123”,
“customer_phone_number” : “+445678978943”
}

The company has its analytics platform in GCP and wants to start producing sales reports for all of its satellite offices but the system was built before there were multiple offices and there is no office identifier in the record payload.

All of the records can be sent to a GCP Pub/Sub topic but without an office identifier, it won't be able to break the reports down by office.

Now each office could update its systems to include an office identifier but this would require multiple local services to be updated and regression tested and the company doesn’t have the time.

To avoid reworking each office's sales platform we can use Single Message Transformations to perform some small transformations on each sales record.

We can start with a basic connector configuration. This will stream all the events to GCP

{ “name”: “sales-to-analytics-connector”,
“config”: {
“connector.class”: “com.google.pubsub.kafka.sink.CloudPubSubSinkConnector”,
“tasks.max”: “1”,
“topics”: “sales”,
“cps.project”: “company-gcp-project”,
“cps.topic”: “company-gcp-project-topic”,
“gcp.credentials.file.path”: “/configs/kafka_connect/gcp.creds”
}
}

To update the connector to include the office identifier we can use the InsertField transform with the added fields in bold.

{ “name”: “sales-to-analytics-connector”,
“config”: {
“connector.class”: “com.google.pubsub.kafka.sink.CloudPubSubSinkConnector”,
“tasks.max”: “1”,
“topics”: “sales”,
“cps.project”: “company-gcp-project”,
“cps.topic”: “company-gcp-project-topic”,
“gcp.credentials.file.path”: “/configs/kafka_connect/gcp.creds”,
“transforms”: “InsertRegion”,
“transforms.InsertRegion.static.field”: “Region”,
“transforms.InsertRegion.static.value”: “DUBLIN”

}
}

Once the connector is updated, each record will have a new field added to it when it is pushed to the GCP Topic.

The way it works is that the transform is given a name and loaded by the “transforms” configuration.

Each transform is then configured using the format:

transforms.<transform_name>.<config>: <value>

Each transform will have different configs that need to be set.

The original record is not affected in the source topic “sales”

{
“order_number” : “12345”,
“customer_id” : “USR123”,
“customer_phone_number” : “+445678978943”,
“Region” : “DUBLIN”
}

The analytics team are now happy as they can use the extra data field to create sales reports broken down by office location.

Unfortunately, the security team have had a look at the data stream and is not happy that there is Personally Identifiable Information (PII) in it.

They have asked that the customer_phone_number field be either removed or obfuscated.

Once again this can be done simply through an SMT

{ “name”: “sales-to-analytics-connector”,
“config”: {
“connector.class”: “com.google.pubsub.kafka.sink.CloudPubSubSinkConnector”,
“tasks.max”: “1”,
“topics”: “sales”,
“cps.project”: “company-gcp-project”,
“cps.topic”: “company-gcp-project-topic”,
“gcp.credentials.file.path”: “/configs/kafka_connect/gcp.creds”,
“transforms”: “InsertRegion,ScrubPhone”,
“transforms.InsertRegion.static.field”: “Region”,
“transforms.InsertRegion.static.value”: “DUBLIN”,
“transforms.InsertRegion.type”: “org.apache.kafka.connect.transforms.InsertField$Value”,
“transforms.ScrubPhone.fields”: “customer_phone_number”,
“transforms.ScrubPhone.type”: “org.apache.kafka.connect.transforms.MaskField$Value”
}
}

This will give you a new record of

{
“order_number” : “12345”,
“customer_id” : “USR123”,
“customer_phone_number” : “”,“Region” : “DUBLIN”
}

There is also the drop transform that you could use but that is not included in the Apache version of Kafka and you would need the Confluent Hub client to install

Other Options

We’ve just seen how easy it is to modify a record to add and remove fields.

Transforms can also affect the record keys and headers so you could modify record keys to adapt to certain partition strategies.

Other transforms are available and you can even create your own!

One thing to note when using SMT is that they should only be used for simple transforms of the data. It cannot split, join or aggregate records.

If you are tempted to create a transform that tries to store state or make calls to external APIs then you should look to do it with a dedicated stream processing engine as mentioned at the start.

If you would like to know more about how to implement modern data, streaming and cloud technologies, such as Apache Kafka, into your business, we at Digitalis do it all: from data engineering, cloud migration to fully managed services, we can help you modernize your applications, operations, and data. We provide consulting and managed services on cloud, data, and DevOps for any business type. Contact us for more information.

Subscribe to newsletter

Subscribe to receive the latest blog posts to your inbox every week.

By subscribing you agree to with our Privacy Policy.
Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Ready to Transform 

Your Business?