Apache Kafka: How-to set offsets to a fixed time

Hello đź‘‹,

This is a short article about setting offsets in Apache Kafka for a consumer group.

Normally, to reset offsets in Kafka you need to use the kafka-consumer-groups.sh tool, this means downloading the zip archive with Kafka's source code and setting up the Java SDK. All Kafka's tools are dependent on Java and this isn't that nice or developer friendly...

Sometimes getting Java correctly and getting the tools to run they don't work 🤷🏻‍♂️. Either the tool versions are incompatible with the Kafka version on the server or the command executes successfully but it doesn't seem to do anything...

Another method to set offsets for a consumer it is to use a Kafka library, and to do it through code.

I have Python installed on my setup and all I need to do is to install the confluent-kafka library:

Bash:
pip install confluent-kafka

And then run the following code snippet to reset the consumer's offsets to a specific timestamp:


Python:
from confluent_kafka import Consumer, TopicPartition
import time

# Configuration
consumer_config = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-consumer-group',
    'auto.offset.reset': 'earliest',
    'enable.partition.eof': True
}

topic = 'my-topic'
timestamp_ms = int(time.mktime(time.strptime("2025-04-01 12:00:00", "%Y-%m-%d %H:%M:%S")) * 1000) # or time in miliseconds

# Create consumer
consumer = Consumer(consumer_config)

# Get metadata to discover partitions
metadata = consumer.list_topics(topic)
partitions = [TopicPartition(topic, p.id, timestamp_ms) for p in metadata.topics[topic].partitions.values()]

# Lookup offsets for the timestamp
offsets = consumer.offsets_for_times(partitions, timeout=10.0)

# Assign partitions with correct offsets
consumer.assign(offsets)

# Start consuming
try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            print("Error:", msg.error())
            continue

        print(f"{msg.topic()} [{msg.partition()}] at offset {msg.offset()}: {msg.value().decode('utf-8')}")
        break

except KeyboardInterrupt:
    pass
finally:
    consumer.close()

#python #kafka
 
Back
Top