Advanced Kafka to Postgres – Real Time Enrichment With UDF

{'uniqueID': 'abc1',
'the_time': '2024-05-06 14:22:55 GMT'}

User Defined Functions (UDFs) let you run code inside of a PostgreSQL sink. In this example, our PostgreSQL database has a column ‘the_time’ with the format TIMESTAMP. However, the Kafka record is a string with ” GMT” at the end. To get the record into PostgreSQL, we must remove ” GMT” and also cast it as a TIMESTAMP. Here is all the code from the previous example, minus the UDF part, and adding TRY_CAST(the_time AS TIMESTAMP(0)).

from pyflink.common import Configuration
from pyflink.table import EnvironmentSettings, TableEnvironment, StreamTableEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table.catalog import JdbcCatalog

env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///data/python38/jupyter/lib/flink-sql-connector-kafka-1.17.1.jar")

config = Configuration()
env_settings = EnvironmentSettings.new_instance()\
   .in_streaming_mode()\
   .with_configuration(config)\
   .build()

table_env = StreamTableEnvironment.create(env, environment_settings=env_settings)

table_env.get_config().set("pipeline.jars", "file:///path/flink-connector-jdbc-3.1.0-1.17.jar,file:///path/postgresql-42.6.0.jar")

table_env.execute_sql("""CREATE TABLE kafka_source_table (
    uniqueID VARCHAR,
    the_time VARCHAR
) WITH (
    'connector' = 'kafka',
    'topic' = 'test-topic',
    'properties.bootstrap.servers' = 'server01:9000',
    'properties.group.id' = 'test-topic',
    'format' = 'json',
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true',
    'scan.startup.mode' = 'earliest-offset'
)"""

stmt_set = table_env.create_statement_set()


stmt_set.add_insert_sql("""SELECT
uniqueID,
the_time 
FROM kafka_source_table""")

table_env.execute_sql("""
CREATE TABLE postgres_sink_table (
    uniqueID STRING,
    TRY_CAST(the_time AS TIMESTAMP(0)),
    PRIMARY KEY (`uniqueID`) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:postgresql://server01:9000/aureus',
    'table-name' = 'test_tbl',
    'drivername' = 'org.postgresql.Driver',
    'username' = 'username',
    'password' = 'password',
    'sink.buffer-flush.max-rows' = '1000',
    'sink.buffer-flush.interval' = '1s'
)
""")


stmt_set.execute().wait()

This code will throw an error because the PostgreSQL column ‘the_time’ is not expecting a time with ” GMT” at the end. Fortunately, Python is great at string manipulation, and we can solve this with a User Defined Function (UDF):

#new imports
from pyflink.table import DataTypes, ScalarFunction
from pyflink.table.udf import udf
#new jar files
table_env.get_config().set(
    "pipeline.jars", 
    "file:///path/to/flink-python-1.17.1.jar;"
    "file:///path/to/flink-connector-jdbc-3.1.0-1.17.jar;"
    "file:///path/to/postgresql-42.6.0.jar;"
    "file:///path/to/flink-sql-connector-kafka-1.17.1.jar"
)

class removeGMT(ScalarFunction):
    def evaluate(self, timestring):
        if timestring is None:
            return None
        if timestring.endswith('GMT'):
            return timestring.replace(" GMT", "")
        else:
            return timestring
#the second argument is a a list of input data types for the udf, the third argument is the output type
removeGMT_udf = udf(removeGMT(), [DataTypes.STRING()], DataTypes.STRING())
table_env.create_temporary_system_function("remove_the_gmt", removeGMT_udf)

Alright, now we’ve registered this UDF. We can use it on any field we want, and you can get as creative as you’d like with the UDF. Here’s the full code, just apply the remove_the_gmt function to the field you want:

from pyflink.common import Configuration
from pyflink.table import EnvironmentSettings, TableEnvironment, StreamTableEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table.catalog import JdbcCatalog
from pyflink.table import DataTypes, ScalarFunctionfrom pyflink.table.udf
import udf

env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///data/python38/jupyter/lib/flink-sql-connector-kafka-1.17.1.jar")

config = Configuration()
env_settings = EnvironmentSettings.new_instance()\
   .in_streaming_mode()\
   .with_configuration(config)\
   .build()

table_env = StreamTableEnvironment.create(env, environment_settings=env_settings)

table_env.get_config().set(
    "pipeline.jars", 
    "file:///path/to/flink-python-1.17.1.jar;"
    "file:///path/to/flink-connector-jdbc-3.1.0-1.17.jar;"
    "file:///path/to/postgresql-42.6.0.jar;"
    "file:///path/to/flink-sql-connector-kafka-1.17.1.jar"
)

table_env.execute_sql("""CREATE TABLE kafka_source_table (
    uniqueID VARCHAR,
    the_time VARCHAR
) WITH (
    'connector' = 'kafka',
    'topic' = 'test-topic',
    'properties.bootstrap.servers' = 'server01:9000',
    'properties.group.id' = 'test-topic',
    'format' = 'json',
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true',
    'scan.startup.mode' = 'earliest-offset'
)"""

stmt_set = table_env.create_statement_set()


stmt_set.add_insert_sql("""SELECT
uniqueID,
TRY_CAST(remove_the_gmt(the_time), as TIMESTAMP(0))
FROM kafka_source_table""")

table_env.execute_sql("""
CREATE TABLE postgres_sink_table (
    uniqueID STRING,
    the_time TIMESTAMP,
    PRIMARY KEY (`uniqueID`) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:postgresql://server01:9000/aureus',
    'table-name' = 'test_tbl',
    'drivername' = 'org.postgresql.Driver',
    'username' = 'username',
    'password' = 'password',
    'sink.buffer-flush.max-rows' = '1000',
    'sink.buffer-flush.interval' = '1s'
)
""")


stmt_set.execute().wait()