Advanced Kafka to Kafka – Real Time Enrichment

Usually you’re going to want to enrich your data. This usually comes in the form of querying a database and adding some information you get from the query to the data. In this example we’re going to see data in a Kafka topic, query postgres, and enrich the data adding multiple fields, and finally sink it back to Kafka.

Starting Data in Kafka test-topic:
{"uniqueID": "abc1", "value": 5}
{"uniqueID": "abc2", "value": 6}
{"uniqueID": "abc3", "value": 7}
{"uniqueID": "abc4", "value": 8}

Let’s say you want to add the IP address to this data, and in your database you have the following table in postgres called awesome_table:

uniqueIDIP_addressLocationDevice_type
abc1192.168.1NJLinux
abc2192.168.2FLWindows
abc3192.168.3NJWindows
abc4192.168.4KSLinux

We will start off with the code from last time, but this time we will change the Transform class to Enrich, and add psycopg2 code to query postgres:

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer, kafka as kfk
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
import json
import psycopg2

class Enrich:
    def __init__(self, json_data):
        self.data = json_data

    def query_postgres(self):
        self.data['uniqueID'] = self.data['uniqueID']
        db_connection = psycopg2.connect(user='user',password='password',host='server123',port='123',database='db_name')
        db_cursor = db_connection.cursor()
        query = f"select uniqueID, IP_address, Location, Device_type from awesome_table where unique_ID = '{self.data['uniqueID']}'"
        db_cursor.execute(query)
        record = db_cursor.fetchone()
        db_cursor.close()
        db_connection.close()
        self.data.update(record)
        return self.data

    def process(self):
        self.query_postgres()
        return self.data

env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///path/to/flink-sql-connector-kafka-1.17.1.jar")

kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'test-group'}
kafka_consumer = FlinkKafkaConsumer('test-topic', SimpleStringSchema(), kafka_props).set_start_from_earliest()

topic_stream = env.add_source(kafka_consumer)
parsed_stream = topic_stream.map(json.loads, output_type=Types.PICKLED_BYTE_ARRAY())
enriched_stream = parsed_stream.map(lambda x: Enrich(x).process())
load_stream = transformed_stream.map(json.dumps, output_type=Types.STRING())

loaded_stream_serializer = kfk.KafkaRecordSerializationSchema.builder() \
   .set_topic('test-sink') \
   .set_value_serialization_schema(SimpleStringSchema()) \
   .build()

loaded_sink = kfk.KafkaSink.builder() \
   .set_bootstrap_servers('server01:0000') \
   .set_record_serializer(loaded_stream_serializer) \
   .build()

load_stream.sink_to(loaded_sink)

env.execute("Kafka Source and Sink Example")

For each record it will get the enriched data from postgres and then sink back to Kafka, here is the final result:

uniqueIDvalueIP_addressLocationDevice_type
abc15192.168.1NJLinux
abc26192.168.2FLWindows
abc37192.168.3NJWindows
abc48192.168.4KSLinux

There you have it, real time data enrichments as data travels in the Kafka pipeline.