Overview
In Apache Kafka, offset management actions such as clearing, resetting, and skipping play a vital role in controlling consumer group behavior.
- Resetting offsets enables consumers to reprocess messages from a specific point in time or offset - commonly used during recovery, testing, or reprocessing scenarios.
- Skipping offsets allows consumers to move past specific records, such as malformed or corrupted records, without disrupting the entire processing flow.
- Clearing offsets removes committed offsets for a consumer group or member, effectively resetting its consumption history and causing it to re-consume.
Together, these capabilities are essential tools for maintaining data integrity, troubleshooting issues, and ensuring robust and flexible stream processing.
Kpow version 94.2 enhances consumer group management capabilities, providing greater control and visibility into Kafka consumption. This article provides a step-by-step guide on how to manage consumer offsets in Kpow. We will walk through these features using simple Kafka producer and consumer clients.
About Factor House
Factor House is a leader in real-time data tooling, empowering engineers with innovative solutions for Apache Kafka® and Apache Flink®.
Our flagship product, Kpow for Apache Kafka, is the market-leading enterprise solution for Kafka management and monitoring.
Explore our live multi-cluster demo environment or grab a free Community license and dive into streaming tech on your laptop with Factor House Local.
Prerequisites
To manage consumer group offsets in Kpow, the logged-in user must have appropriate permissions, including the GROUP_EDIT action. For more information, refer to the User Authorization section of the Kpow documentation.
Also, if your Kafka cluster has ACLs enabled, the Kafka user specified in Kpow's cluster connection must have the necessary permissions for Kpow to operate correctly. You can find the full list of required permissions in the Minimum ACL Permissions guide.
Development Environment
At Factor House, we recently created a GitHub repository called examples. As the name suggests, it holds Factor House product feature and integration examples.
The Kafka clients used in this post can be found in the offset-management folder. Also, we use Factor House Local to deploy a Kpow instance and a 3-node Kafka cluster.
We can set up a local development environment as follows.
## clone examples $ git clone https://github.com/factorhouse/examples $ cd examples ## install python packages in a virtual environment $ python -m venv venv $ source venv/bin/activate $ pip install -r offset-management/requirements.txt ## clone factorhouse-local $ git clone https://github.com/factorhouse/factorhouse-local ## show key files and folders $ tree -L 1 factorhouse-local offset-management # factorhouse-local # ├── LICENSE # ├── README.md # ├── compose-analytics.yml # ├── compose-flex-community.yml # ├── compose-flex-trial.yml # ├── compose-kpow-community.yml # ├── compose-kpow-trial.yml # ├── compose-pinot.yml # ├── images # ├── quickstart # └── resources # offset-management # ├── README.md # ├── consumer.py # ├── producer.py # └── requirements.txt # 5 directories, 12 files
We can use either the Kpow Community or Enterprise edition. To get started, let's make sure a valid Kpow license is available. For details on how to request and configure a license, refer to this section of the project README.
In this post, we’ll be using the Community edition.
$ docker compose -f factorhouse-local/compose-kpow-community.yml up -d $ docker compose -f factorhouse-local/compose-kpow-community.yml ps # NAME IMAGE COMMAND SERVICE CREATED STATUS PORTS # connect confluentinc/cp-kafka-connect:7.8.0 "/etc/confluent/dock…" connect 44 seconds ago Up 42 seconds (healthy) 0.0.0.0:8083->8083/tcp, [::]:8083->8083/tcp, 9092/tcp # kafka-1 confluentinc/cp-kafka:7.8.0 "/etc/confluent/dock…" kafka-1 44 seconds ago Up 42 seconds 0.0.0.0:9092->9092/tcp, [::]:9092->9092/tcp # kafka-2 confluentinc/cp-kafka:7.8.0 "/etc/confluent/dock…" kafka-2 44 seconds ago Up 42 seconds 9092/tcp, 0.0.0.0:9093->9093/tcp, [::]:9093->9093/tcp # kafka-3 confluentinc/cp-kafka:7.8.0 "/etc/confluent/dock…" kafka-3 44 seconds ago Up 42 seconds 9092/tcp, 0.0.0.0:9094->9094/tcp, [::]:9094->9094/tcp # kpow-ce factorhouse/kpow-ce:latest "/usr/local/bin/kpow…" kpow 44 seconds ago Up 15 seconds 0.0.0.0:3000->3000/tcp, [::]:3000->3000/tcp # schema_registry confluentinc/cp-schema-registry:7.8.0 "/etc/confluent/dock…" schema 44 seconds ago Up 42 seconds 0.0.0.0:8081->8081/tcp, [::]:8081->8081/tcp # zookeeper confluentinc/cp-zookeeper:7.8.0 "/etc/confluent/dock…" zookeeper 44 seconds ago Up 43 seconds 2888/tcp, 0.0.0.0:2181->2181/tcp, [::]:2181->2181/tcp, 3888/tcp
Kafka Clients
The Kafka producer interacts with a Kafka cluster using the confluent_kafka
library to create a topic and produce messages. It first checks if a specified Kafka topic exists and creates it if necessary - a topic having a single partition is created for simplicity. The app then generates 10 JSON messages containing the current timestamp (both in ISO 8601 and Unix epoch format) and sends them to the Kafka topic, using a callback function to log success or failure upon delivery. Configuration values like the Kafka bootstrap servers and topic name are read from environment variables or default to localhost:9092
and offset-management
.
import os import time import datetime import json import logging from confluent_kafka import Producer, Message, KafkaError from confluent_kafka.admin import AdminClient, NewTopic def topic_exists(admin_client: AdminClient, topic_name: str): ## check if a topic exists metadata = admin_client.list_topics() for value in iter(metadata.topics.values()): logging.info(value.topic) if value.topic == topic_name: return True return False def create_topic(admin_client: AdminClient, topic_name: str): ## create a new topic new_topic = NewTopic(topic_name, num_partitions=1, replication_factor=1) result_dict = admin_client.create_topics([new_topic]) for topic, future in result_dict.items(): try: future.result() logging.info(f"Topic {topic} created") except Exception as e: logging.info(f"Failed to create topic {topic}: {e}") def callback(err: KafkaError, event: Message): ## callback function that gets triggered when a message is delivered if err: logging.info( f"Produce to topic {event.topic()} failed for event: {event.key()}" ) else: value = event.value().decode("utf8") logging.info( f"Sent: {value} to partition {event.partition()}, offset {event.offset()}." ) if __name__ == "__main__": logging.basicConfig(level=logging.INFO) BOOTSTRAP_SERVERS = os.getenv("BOOTSTRAP_SERVERS", "localhost:9092") TOPIC_NAME = os.getenv("TOPIC_NAME", "offset-management") ## create a topic conf = {"bootstrap.servers": BOOTSTRAP_SERVERS} admin_client = AdminClient(conf) if not topic_exists(admin_client, TOPIC_NAME): create_topic(admin_client, TOPIC_NAME) ## producer messages producer = Producer(conf) for _ in range(10): dt = datetime.datetime.now() epoch = int(dt.timestamp() * 1000) ts = dt.isoformat(timespec="seconds") producer.produce( topic=TOPIC_NAME, value=json.dumps({"epoch": epoch, "ts": ts}).encode("utf-8"), key=str(epoch), on_delivery=callback, ) producer.flush() time.sleep(1)
The Kafka consumer polls messages from a Kafka topic using the confluent_kafka
library. It configures a Kafka consumer with parameters like bootstrap.servers, group.id, and auto.offset.reset. The consumer subscribes to a specified topic, and a callback function (assignment_callback
) is used to log when partitions are assigned to the consumer. The main loop continuously polls for messages, processes them by decoding their values, logs the message details (including partition and offset), and commits the message offset. If any errors occur, the script raises a KafkaException
. The script gracefully handles a user interrupt (KeyboardInterrupt
), ensuring proper cleanup by closing the consumer connection.
import os import logging from typing import List from confluent_kafka import Consumer, KafkaException, TopicPartition, Message def assignment_callback(_: Consumer, partitions: List[TopicPartition]): ## callback function that gets triggered when a topic partion is assigned for p in partitions: logging.info(f"Assigned to {p.topic}, partiton {p.partition}") if __name__ == "__main__": logging.basicConfig(level=logging.INFO) BOOTSTRAP_SERVERS = os.getenv("BOOTSTRAP_SERVERS", "localhost:9092") TOPIC_NAME = os.getenv("TOPIC_NAME", "offset-management") conf = { "bootstrap.servers": BOOTSTRAP_SERVERS, "group.id": f"{TOPIC_NAME}-group", "auto.offset.reset": "earliest", "enable.auto.commit": False, } consumer = Consumer(conf) consumer.subscribe([TOPIC_NAME], on_assign=assignment_callback) try: while True: message: Message = consumer.poll(1.0) if message is None: continue if message.error(): raise KafkaException(message.error()) else: value = message.value().decode("utf8") partition = message.partition() logging.info( f"Reveived {value} from partition {message.partition()}, offset {message.offset()}." ) consumer.commit(message) except KeyboardInterrupt: logging.warning("cancelled by user") finally: consumer.close()
Before diving into offset management, let's first create a Kafka topic named offset-management and produce 10 sample messages.
$ python offset-management/producer.py # ... # INFO:root:Topic offset-management created # INFO:root:Sent: {"epoch": 1744847497314, "ts": "2025-04-17T09:51:37"} to partition 0, offset 0. # INFO:root:Sent: {"epoch": 1744847499328, "ts": "2025-04-17T09:51:39"} to partition 0, offset 1. # INFO:root:Sent: {"epoch": 1744847500331, "ts": "2025-04-17T09:51:40"} to partition 0, offset 2. # INFO:root:Sent: {"epoch": 1744847501334, "ts": "2025-04-17T09:51:41"} to partition 0, offset 3. # INFO:root:Sent: {"epoch": 1744847502337, "ts": "2025-04-17T09:51:42"} to partition 0, offset 4. # INFO:root:Sent: {"epoch": 1744847503339, "ts": "2025-04-17T09:51:43"} to partition 0, offset 5. # INFO:root:Sent: {"epoch": 1744847504342, "ts": "2025-04-17T09:51:44"} to partition 0, offset 6. # INFO:root:Sent: {"epoch": 1744847505344, "ts": "2025-04-17T09:51:45"} to partition 0, offset 7. # INFO:root:Sent: {"epoch": 1744847506347, "ts": "2025-04-17T09:51:46"} to partition 0, offset 8. # INFO:root:Sent: {"epoch": 1744847507349, "ts": "2025-04-17T09:51:47"} to partition 0, offset 9.
Next, let the consumer keep polling messages from the topic. By subscribing the topic, it creates a consumer group named offset-management-group.
$ python offset-management/consumer.py # INFO:root:Assigned to offset-management, partiton 0 # INFO:root:Reveived {"epoch": 1744847497314, "ts": "2025-04-17T09:51:37"} from partition 0, offset 0. # INFO:root:Reveived {"epoch": 1744847499328, "ts": "2025-04-17T09:51:39"} from partition 0, offset 1. # INFO:root:Reveived {"epoch": 1744847500331, "ts": "2025-04-17T09:51:40"} from partition 0, offset 2. # INFO:root:Reveived {"epoch": 1744847501334, "ts": "2025-04-17T09:51:41"} from partition 0, offset 3. # INFO:root:Reveived {"epoch": 1744847502337, "ts": "2025-04-17T09:51:42"} from partition 0, offset 4. # INFO:root:Reveived {"epoch": 1744847503339, "ts": "2025-04-17T09:51:43"} from partition 0, offset 5. # INFO:root:Reveived {"epoch": 1744847504342, "ts": "2025-04-17T09:51:44"} from partition 0, offset 6. # INFO:root:Reveived {"epoch": 1744847505344, "ts": "2025-04-17T09:51:45"} from partition 0, offset 7. # INFO:root:Reveived {"epoch": 1744847506347, "ts": "2025-04-17T09:51:46"} from partition 0, offset 8. # INFO:root:Reveived {"epoch": 1744847507349, "ts": "2025-04-17T09:51:47"} from partition 0, offset 9.
Offset Management
Clear Offsets
This feature removes committed offsets for an entire consumer group, a specific group member, or a group member's partition assignment. When you trigger Clear offsets and confirm the action, it is scheduled and visible under the Mutations tab. The action remains in a scheduled state until its prerequisite is fulfilled. For group offsets, the prerequisite is that the consumer group must be in an EMPTY state - meaning no active members. To meet this condition, you must manually scale down or stop all instances of the consumer group. This requirement exists because offset-related mutations cannot be applied to a running consumer group. When we stop the Kafka consumer, we see the status becomes Success.
Below shows that the consumer polls messages from the earliest offset when it is restarted.
$ python offset-management/consumer.py # INFO:root:Assigned to offset-management, partiton 0 # INFO:root:Reveived {"epoch": 1744847497314, "ts": "2025-04-17T09:51:37"} from partition 0, offset 0. # INFO:root:Reveived {"epoch": 1744847499328, "ts": "2025-04-17T09:51:39"} from partition 0, offset 1. # INFO:root:Reveived {"epoch": 1744847500331, "ts": "2025-04-17T09:51:40"} from partition 0, offset 2. # INFO:root:Reveived {"epoch": 1744847501334, "ts": "2025-04-17T09:51:41"} from partition 0, offset 3. # INFO:root:Reveived {"epoch": 1744847502337, "ts": "2025-04-17T09:51:42"} from partition 0, offset 4. # INFO:root:Reveived {"epoch": 1744847503339, "ts": "2025-04-17T09:51:43"} from partition 0, offset 5. # INFO:root:Reveived {"epoch": 1744847504342, "ts": "2025-04-17T09:51:44"} from partition 0, offset 6. # INFO:root:Reveived {"epoch": 1744847505344, "ts": "2025-04-17T09:51:45"} from partition 0, offset 7. # INFO:root:Reveived {"epoch": 1744847506347, "ts": "2025-04-17T09:51:46"} from partition 0, offset 8. # INFO:root:Reveived {"epoch": 1744847507349, "ts": "2025-04-17T09:51:47"} from partition 0, offset 9.
Reset Offsets
Kpow provides powerful and flexible controls for managing consumer group offsets across various levels of granularity. You can adjust offsets based on different dimensions, including:
- Whole group assignments - reset offsets for the entire consumer group at once.
- Host-level assignments - target offset changes to consumers running on specific hosts.
- Topic-level assignments - reset offsets for specific topics within the group.
- Partition-level assignments - make fine-grained adjustments at the individual partition level.
In addition to flexible targeting, Kpow supports multiple methods for resetting offsets, tailored to different operational needs:
- Offset - reset to a specific offset value.
- Timestamp - rewind or advance to the earliest offset with a timestamp equal to or later than the given epoch timestamp.
- Datetime (Local) - use a human-readable local date and time to select the corresponding offset.
These options enable precise control over Kafka consumption behavior, whether for incident recovery, message reprocessing, or routine operational adjustments.
In this example, we reset the offset of topic partition 0
using the Timestamp method. After entering the timestamp value 1744847503339
, Kpow displays both the current committed offset (Commit Offset) and the calculated target offset (New Offset). Once we confirm the action, it appears in the Mutations tab as a scheduled mutation. Similar to the Clear offsets feature, the mutation remains pending until the consumer is stopped - at which point its status updates to Success. We can then verify the updated offset value directly within the group member's assignment record, confirming that the reset has taken effect.
As expected, the consumer polls messages from the new offset.
$ python offset-management/consumer.py # INFO:root:Assigned to offset-management, partiton 0 # INFO:root:Reveived {"epoch": 1744847503339, "ts": "2025-04-17T09:51:43"} from partition 0, offset 5. # INFO:root:Reveived {"epoch": 1744847504342, "ts": "2025-04-17T09:51:44"} from partition 0, offset 6. # INFO:root:Reveived {"epoch": 1744847505344, "ts": "2025-04-17T09:51:45"} from partition 0, offset 7. # INFO:root:Reveived {"epoch": 1744847506347, "ts": "2025-04-17T09:51:46"} from partition 0, offset 8. # INFO:root:Reveived {"epoch": 1744847507349, "ts": "2025-04-17T09:51:47"} from partition 0, offset 9.
Skip Offsets
We can demonstrate the Skip offsets feature in two steps. First, reset the offset of topic partition 0
to 8
and then stop the consumer. This ensures the offset update is applied successfully. Next, trigger the Skip offsets action, which advances the offset from 8
to 9
, effectively skipping the message at offset 8
. We can verify that the new offset has taken effect by checking the updated value in the group member's assignment record, and it confirms that the change has been applied as expected.
We can also verify the change by restarting the consumer.
$ python offset-management/consumer.py # INFO:root:Assigned to offset-management, partiton 0 # INFO:root:Reveived {"epoch": 1744847507349, "ts": "2025-04-17T09:51:47"} from partition 0, offset 9.
Conclusion
Managing Kafka consumer offsets is crucial for reliable data processing and operational flexibility. Actions such as clearing, resetting (by offset, timestamp, or datetime), and skipping offsets provide powerful capabilities for debugging, data recovery, and bypassing problematic messages.
This article demonstrated these concepts using simple Kafka clients and showcased how Kpow offers an intuitive interface with granular controls - spanning entire groups down to individual partitions - to execute these essential offset management tasks effectively. Kpow significantly simplifies these complex Kafka operations, ultimately providing developers and operators with enhanced visibility and precise control over their Kafka consumption patterns.