Kafka to Postgres

Kafka test-topic:
{"uniqueID": "abc1", "value": 5}
{"uniqueID": "abc2", "value": 6}
{"uniqueID": "abc3", "value": 7}
{"uniqueID": "abc4", "value": 8}

Our goal is to take this Kafka data and get it into our database. The first thing we need to do is make a table:

from pyflink.common import Configuration
from pyflink.table import EnvironmentSettings, TableEnvironment, StreamTableEnvironment
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///path/flink-sql-connector-kafka-1.17.1.jar")

config = Configuration()

env_settings = EnvironmentSettings.new_instance() \
   .in_streaming_mode() \
   .with_configuration(config) \
   .build()

table_env = StreamTableEnvironment.create(env, environment_settings=env_settings)

# Define the Kafka source table
table_env.execute_sql("""CREATE TABLE kafka_source_table (
    uniqueID VARCHAR,
    value INT
) WITH (
    'connector' = 'kafka',
    'topic' = 'test-topic',
    'properties.bootstrap.servers' = 'server01:9000',
    'properties.group.id' = 'test-topic',
    'format' = 'json',
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true',
    'scan.startup.mode' = 'earliest-offset'
)"""

# Query the data
result = table_env.execute_sql("""SELECT
uniqueID,
value 
FROM kafka_source_table""")

result.print()

This code makes a streaming table from Kafka records, and prints the following:

uniqueIDvalue
abc15
abc26
abc37
abc48

Now how do we send this data to our database?

from pyflink.common import Configuration
from pyflink.table import EnvironmentSettings, TableEnvironment, StreamTableEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table.catalog import JdbcCatalog

env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///data/python38/jupyter/lib/flink-sql-connector-kafka-1.17.1.jar")

config = Configuration()
env_settings = EnvironmentSettings.new_instance()\
   .in_streaming_mode()\
   .with_configuration(config)\
   .build()

table_env = StreamTableEnvironment.create(env, environment_settings=env_settings)

table_env.get_config().set("pipeline.jars", "file:///path/flink-connector-jdbc-3.1.0-1.17.jar,file:///path/postgresql-42.6.0.jar")

table_env.execute_sql("""CREATE TABLE kafka_source_table (
    uniqueID VARCHAR,
    value INT
) WITH (
    'connector' = 'kafka',
    'topic' = 'test-topic',
    'properties.bootstrap.servers' = 'server01:9000',
    'properties.group.id' = 'test-topic',
    'format' = 'json',
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true',
    'scan.startup.mode' = 'earliest-offset'
)"""

stmt_set = table_env.create_statement_set()


stmt_set.add_insert_sql("""SELECT
uniqueID,
value 
FROM kafka_source_table""")

table_env.execute_sql("""
CREATE TABLE postgres_sink_table (
    uniqueID STRING,
    value INT,
    PRIMARY KEY (`uniqueID`) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:postgresql://server01:9000/aureus',
    'table-name' = 'test_tbl',
    'drivername' = 'org.postgresql.Driver',
    'username' = 'username',
    'password' = 'password',
    'sink.buffer-flush.max-rows' = '1000',
    'sink.buffer-flush.interval' = '1s'
)
""")


stmt_set.execute().wait()

We use pipeline.jars to add the necessary jar files to connect to postgres, we make a select statement to select from the kafka table, and then we define our postgres sink. Submitting this job would run indefinitely and upsert to test_tbl on primary key uniqueID.