Kafka Group Offsets and Job Restarts

Want to deploy your job in a production environment that you can leave running and not have to manually restart it when one single crazy piece of data comes in from out of nowhere that totally messes with your code in unforseen ways that not even using .get would have prevented? Well look no further. Let’s start off with setting up Kafka group offsets in PyFlink:

test-topic:
{'uniqueID': 'abc1', 'awesome_value': 123}
{'uniqueID': 'abc2', 'awesome_value': '456'}
{'uniqueID': 'abc1', 'awesome_value': 789}

When your job does eventually fail, you’re going to want to restart and pick up back where you last left off. To pick up where you last left off we need to set the group offsets for Kafka. Group offsets basically says where you last read from in that particular Kafka topic.

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import MapFunction
from pyflink.datastream.connectors import KafkaSource
from pyflink.common import WatermarkStrategy
from pyflink.datastream.connectors.kafka import KafkaOffsetsInitializer, KafkaOffsetResetStrategy
from pyflink.common.serialization import SimpleStringSchema
import json

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.add_jars("file:///path/to/flink-sql-connector-kafka-1.17.1.jar")

source = KafkaSource.builder() \
    .set_bootstrap_servers('server01:1111') \
    .set_topics("test-topic") \
    .set_group_id("prod_group_id") \
    .set_starting_offsets(KafkaOffsetsInitializer.committed_offsets(KafkaOffsetResetStrategy.EARLIEST)) \
    .set_value_only_deserializer(SimpleStringSchema()) \
    .set_property("auto.offset.reset", "latest") \
    .set_property("enable.auto.commit", "true") \
    .set_property("auto.commit.interval.ms", "1000") \
    .build()

topic_stream = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")

We are focusing on the KafkaSource.builder(). First, we set the bootstrap servers, then the topic to “test-topic”, and the group ID to “prod_group_id”. Kafka tracks the last record read for each group ID. The line .set_starting_offsets(KafkaOffsetsInitializer.committed_offsets(KafkaOffsetResetStrategy.EARLIEST)) specifies that if our group ID isn’t tracked by Kafka, we will start reading from the earliest record in the topic. If Kafka has offsets for our group ID, it will resume from the last committed position. The property .set_property("auto.offset.reset", "latest") acts as a fallback, indicating that if neither the last committed offset nor the earliest offset is available, the consumer will start from the latest offset. As we read from Kafka, offsets are automatically committed every 1000 ms.

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import MapFunction
from pyflink.datastream.connectors import KafkaSource
from pyflink.common import WatermarkStrategy
from pyflink.datastream.connectors.kafka import KafkaOffsetsInitializer, KafkaOffsetResetStrategy
from pyflink.common.serialization import SimpleStringSchema
import json
from pyflink.common.restart_strategy import RestartStrategies

class FailingFunction(MapFunction):
    def map(self, value):
        # Perform addition operation that will cause a crash if 'awesome_value' is not a number
        record['awesome_value'] = record['awesome_value'] + 10
        return record
		
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.set_restart_strategy(RestartStrategies.fixed_delay_restart(2, 20000))

env.add_jars("file:///path/to/flink-sql-connector-kafka-1.17.1.jar")

source = KafkaSource.builder() \
    .set_bootstrap_servers('server01:1111') \
    .set_topics("test-topic") \
    .set_group_id("prod_group_id") \
    .set_starting_offsets(KafkaOffsetsInitializer.committed_offsets(KafkaOffsetResetStrategy.EARLIEST)) \
    .set_value_only_deserializer(SimpleStringSchema()) \
    .set_property("auto.offset.reset", "latest") \
    .set_property("enable.auto.commit", "true") \
    .set_property("auto.commit.interval.ms", "1000") \
    .build()

# Create a data stream from the Kafka source
topic_stream = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")
topic_stream.map(json.loads).map(FailingFunction())

There you have it, thanks to group offsets and restart strategies even if your code fails it will come back to life. Be sure to have alerts in your logs for words “RESTARTING” or “FAILING” because you’re going to want to find out why your job restarted/failed.