Usually you’re going to want to enrich your data. This usually comes in the form of querying a database and adding some information you get from the query to the data. In this example we’re going to see data in a Kafka topic, query postgres, and enrich the data adding multiple fields, and finally sink it back to Kafka.
Starting Data in Kafka test-topic:
{"uniqueID": "abc1", "value": 5}
{"uniqueID": "abc2", "value": 6}
{"uniqueID": "abc3", "value": 7}
{"uniqueID": "abc4", "value": 8}
Let’s say you want to add the IP address to this data, and in your database you have the following table in postgres called awesome_table:
uniqueID | IP_address | Location | Device_type |
abc1 | 192.168.1 | NJ | Linux |
abc2 | 192.168.2 | FL | Windows |
abc3 | 192.168.3 | NJ | Windows |
abc4 | 192.168.4 | KS | Linux |
We will start off with the code from last time, but this time we will change the Transform class to Enrich, and add psycopg2 code to query postgres:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer, kafka as kfk
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
import json
import psycopg2
class Enrich:
def __init__(self, json_data):
self.data = json_data
def query_postgres(self):
self.data['uniqueID'] = self.data['uniqueID']
db_connection = psycopg2.connect(user='user',password='password',host='server123',port='123',database='db_name')
db_cursor = db_connection.cursor()
query = f"select uniqueID, IP_address, Location, Device_type from awesome_table where unique_ID = '{self.data['uniqueID']}'"
db_cursor.execute(query)
record = db_cursor.fetchone()
db_cursor.close()
db_connection.close()
self.data.update(record)
return self.data
def process(self):
self.query_postgres()
return self.data
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///path/to/flink-sql-connector-kafka-1.17.1.jar")
kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'test-group'}
kafka_consumer = FlinkKafkaConsumer('test-topic', SimpleStringSchema(), kafka_props).set_start_from_earliest()
topic_stream = env.add_source(kafka_consumer)
parsed_stream = topic_stream.map(json.loads, output_type=Types.PICKLED_BYTE_ARRAY())
enriched_stream = parsed_stream.map(lambda x: Enrich(x).process())
load_stream = transformed_stream.map(json.dumps, output_type=Types.STRING())
loaded_stream_serializer = kfk.KafkaRecordSerializationSchema.builder() \
.set_topic('test-sink') \
.set_value_serialization_schema(SimpleStringSchema()) \
.build()
loaded_sink = kfk.KafkaSink.builder() \
.set_bootstrap_servers('server01:0000') \
.set_record_serializer(loaded_stream_serializer) \
.build()
load_stream.sink_to(loaded_sink)
env.execute("Kafka Source and Sink Example")
For each record it will get the enriched data from postgres and then sink back to Kafka, here is the final result:
uniqueID | value | IP_address | Location | Device_type |
abc1 | 5 | 192.168.1 | NJ | Linux |
abc2 | 6 | 192.168.2 | FL | Windows |
abc3 | 7 | 192.168.3 | NJ | Windows |
abc4 | 8 | 192.168.4 | KS | Linux |
There you have it, real time data enrichments as data travels in the Kafka pipeline.