Kafka test-topic:
{"uniqueID": "abc1", "value": 5}
{"uniqueID": "abc2", "value": 6}
{"uniqueID": "abc3", "value": 7}
{"uniqueID": "abc4", "value": 8}
Our goal is to take this Kafka data and get it into our database. The first thing we need to do is make a table:
from pyflink.common import Configuration
from pyflink.table import EnvironmentSettings, TableEnvironment, StreamTableEnvironment
from pyflink.datastream import StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///path/flink-sql-connector-kafka-1.17.1.jar")
config = Configuration()
env_settings = EnvironmentSettings.new_instance() \
.in_streaming_mode() \
.with_configuration(config) \
.build()
table_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
# Define the Kafka source table
table_env.execute_sql("""CREATE TABLE kafka_source_table (
uniqueID VARCHAR,
value INT
) WITH (
'connector' = 'kafka',
'topic' = 'test-topic',
'properties.bootstrap.servers' = 'server01:9000',
'properties.group.id' = 'test-topic',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'scan.startup.mode' = 'earliest-offset'
)"""
# Query the data
result = table_env.execute_sql("""SELECT
uniqueID,
value
FROM kafka_source_table""")
result.print()
This code makes a streaming table from Kafka records, and prints the following:
uniqueID | value |
abc1 | 5 |
abc2 | 6 |
abc3 | 7 |
abc4 | 8 |
Now how do we send this data to our database?
from pyflink.common import Configuration
from pyflink.table import EnvironmentSettings, TableEnvironment, StreamTableEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table.catalog import JdbcCatalog
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///data/python38/jupyter/lib/flink-sql-connector-kafka-1.17.1.jar")
config = Configuration()
env_settings = EnvironmentSettings.new_instance()\
.in_streaming_mode()\
.with_configuration(config)\
.build()
table_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
table_env.get_config().set("pipeline.jars", "file:///path/flink-connector-jdbc-3.1.0-1.17.jar,file:///path/postgresql-42.6.0.jar")
table_env.execute_sql("""CREATE TABLE kafka_source_table (
uniqueID VARCHAR,
value INT
) WITH (
'connector' = 'kafka',
'topic' = 'test-topic',
'properties.bootstrap.servers' = 'server01:9000',
'properties.group.id' = 'test-topic',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'scan.startup.mode' = 'earliest-offset'
)"""
stmt_set = table_env.create_statement_set()
stmt_set.add_insert_sql("""SELECT
uniqueID,
value
FROM kafka_source_table""")
table_env.execute_sql("""
CREATE TABLE postgres_sink_table (
uniqueID STRING,
value INT,
PRIMARY KEY (`uniqueID`) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://server01:9000/aureus',
'table-name' = 'test_tbl',
'drivername' = 'org.postgresql.Driver',
'username' = 'username',
'password' = 'password',
'sink.buffer-flush.max-rows' = '1000',
'sink.buffer-flush.interval' = '1s'
)
""")
stmt_set.execute().wait()
We use pipeline.jars to add the necessary jar files to connect to postgres, we make a select statement to select from the kafka table, and then we define our postgres sink. Submitting this job would run indefinitely and upsert to test_tbl on primary key uniqueID.