Kafka To Kafka

Kafka test-topic:
{"uniqueID": "abc1", "value": 5}
{"uniqueID": "abc2", "value": 6}
{"uniqueID": "abc3", "value": 7}
{"uniqueID": "abc4", "value": 8}

Extract: Establish a data source. In this case, we will be using the streaming record broker Kafka. Here’s how to set up a Kafka source stream in PyFlink:

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.common.serialization import SimpleStringSchema

env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///path/to/flink-sql-connector-kafka-1.17.1.jar")

kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'test-group'}
kafka_consumer = FlinkKafkaConsumer('test-topic', SimpleStringSchema(), kafka_props).set_start_from_earliest()
topic_stream = env.add_source(kafka_consumer)
topic_stream.print()
env.execute("Kafka Source Example")

Every Flink job will start with establishing a source. Here we do the imports and establish the stream execution environment. We add the JARs to the environment and establish the Kafka consumer.

Serialization needs to be established every time data goes between consumers and functions in PyFlink. In this case, our Kafka topic has data in simple string format deserialized using SimpleStringSchema().

.set_start_from_earliest() will consume all data from the beginning of the Kafka topic which is where you want to start when testing out code. This code will print 4 strings of the JSON records in the topic. Here is what the output will look like:

{"uniqueID": "abc1", "value": 5}
{"uniqueID": "abc2", "value": 6}
{"uniqueID": "abc3", "value": 7}
{"uniqueID": "abc4", "value": 8}

Transform:

The great thing about working in Python is that there’s a library to do anything you want, so transforming the data is easy. Let’s add onto the code we had earlier:

import json
from pyflink.common.typeinfo import Types

class Transform:
    def __init__(self, json_data):
        self.data = json_data

    def make_uppercase(self):
        self.data['uniqueID'] = self.data['uniqueID'].upper()
        return self.data

    def new_field(self):
        self.data['new_field'] = self.data['uniqueID'] + str(self.data['value'])
        return self.data

    def process(self):
        self.make_uppercase()
        self.new_field()
        return self.data

topic_stream = env.add_source(kafka_consumer)
parsed_stream = topic_stream.map(json.loads, output_type=Types.PICKLED_BYTE_ARRAY())
transformed_stream = parsed_stream.map(lambda x: Transform(x).process())

Loading the Kafka record as a dictionary and running some Python code makes transforming the data very straightforward. The output_type specifies the serialization of the record. Here we just loaded it using the default pickled byte array which is robust but more expensive on compute. This code makes:

{"uniqueID": "ABC1", "value": 5, "new_field": "ABC15"}
{"uniqueID": "ABC2", "value": 6, "new_field": "ABC26"}
{"uniqueID": "ABC3", "value": 7, "new_field": "ABC37"}
{"uniqueID": "ABC4", "value": 8, "new_field": "ABC48"}

Load:

Now we’re going to load (sink) this data back into a different Kafka topic.

from pyflink.datastream.connectors import kafka as kfk

transformed_stream = parsed_stream.map(lambda x: Transform(x).process())
load_stream = transformed_stream.map(json.dumps, output_type=Types.STRING())

loaded_stream_serializer = kfk.KafkaRecordSerializationSchema.builder() \
    .set_topic('test-sink') \
    .set_value_serialization_schema(SimpleStringSchema()) \
    .build()
loaded_sink = kfk.KafkaSink.builder() \
    .set_bootstrap_servers('server01:0000') \
    .set_record_serializer(loaded_stream_serializer) \
    .build()

load_stream.sink_to(loaded_sink)

We started with our initial stream of transformed dictionary records in transformed_stream which is currently serialized as a pickled byte array. However, in order to put the data back into Kafka, we need to convert it back into its simple string schema. First, we json.dumps the record and specify the output serialization as Types.STRING() and then we build the Kafka sink and send the stream of data there.

Here’s the code in full:

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer, kafka as kfk
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
import json

class Transform:
    def __init__(self, json_data):
        self.data = json_data

    def make_uppercase(self):
        self.data['uniqueID'] = self.data['uniqueID'].upper()
        return self.data

    def new_field(self):
        self.data['new_field'] = self.data['uniqueID'] + str(self.data['value'])
        return self.data

    def process(self):
        self.make_uppercase()
        self.new_field()
        return self.data

env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///path/to/flink-sql-connector-kafka-1.17.1.jar")

kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'test-group'}
kafka_consumer = FlinkKafkaConsumer('test-topic', SimpleStringSchema(), kafka_props).set_start_from_earliest()

topic_stream = env.add_source(kafka_consumer)
parsed_stream = topic_stream.map(json.loads, output_type=Types.PICKLED_BYTE_ARRAY())
transformed_stream = parsed_stream.map(lambda x: Transform(x).process())
load_stream = transformed_stream.map(json.dumps, output_type=Types.STRING())

loaded_stream_serializer = kfk.KafkaRecordSerializationSchema.builder() \
   .set_topic('test-sink') \
   .set_value_serialization_schema(SimpleStringSchema()) \
   .build()

loaded_sink = kfk.KafkaSink.builder() \
   .set_bootstrap_servers('server01:0000') \
   .set_record_serializer(loaded_stream_serializer) \
   .build()

load_stream.sink_to(loaded_sink)

env.execute("Kafka Source and Sink Example")