Hello
,
This is a short article about setting offsets in Apache Kafka for a consumer group.
Normally, to reset offsets in Kafka you need to use the kafka-consumer-groups.sh tool, this means downloading the zip archive with Kafka's source code and setting up the Java SDK. All Kafka's tools are dependent on Java and this isn't that nice or developer friendly...
Sometimes getting Java correctly and getting the tools to run they don't work
. Either the tool versions are incompatible with the Kafka version on the server or the command executes successfully but it doesn't seem to do anything...
Another method to set offsets for a consumer it is to use a Kafka library, and to do it through code.
I have Python installed on my setup and all I need to do is to install the confluent-kafka library:
And then run the following code snippet to reset the consumer's offsets to a specific timestamp:
#python #kafka

This is a short article about setting offsets in Apache Kafka for a consumer group.
Normally, to reset offsets in Kafka you need to use the kafka-consumer-groups.sh tool, this means downloading the zip archive with Kafka's source code and setting up the Java SDK. All Kafka's tools are dependent on Java and this isn't that nice or developer friendly...
Sometimes getting Java correctly and getting the tools to run they don't work

Another method to set offsets for a consumer it is to use a Kafka library, and to do it through code.
I have Python installed on my setup and all I need to do is to install the confluent-kafka library:
Bash:
pip install confluent-kafka
And then run the following code snippet to reset the consumer's offsets to a specific timestamp:
Python:
from confluent_kafka import Consumer, TopicPartition
import time
# Configuration
consumer_config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-consumer-group',
'auto.offset.reset': 'earliest',
'enable.partition.eof': True
}
topic = 'my-topic'
timestamp_ms = int(time.mktime(time.strptime("2025-04-01 12:00:00", "%Y-%m-%d %H:%M:%S")) * 1000) # or time in miliseconds
# Create consumer
consumer = Consumer(consumer_config)
# Get metadata to discover partitions
metadata = consumer.list_topics(topic)
partitions = [TopicPartition(topic, p.id, timestamp_ms) for p in metadata.topics[topic].partitions.values()]
# Lookup offsets for the timestamp
offsets = consumer.offsets_for_times(partitions, timeout=10.0)
# Assign partitions with correct offsets
consumer.assign(offsets)
# Start consuming
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
print("Error:", msg.error())
continue
print(f"{msg.topic()} [{msg.partition()}] at offset {msg.offset()}: {msg.value().decode('utf-8')}")
break
except KeyboardInterrupt:
pass
finally:
consumer.close()
#python #kafka