DevOps

How to Reset Consumer Offsets in Kafka

In distributed messaging systems, consumer offsets determine where a consumer resumes reading messages from a topic partition. Resetting consumer offsets is a common operational task used during debugging, reprocessing historical data, or recovering from application bugs. Let us delve into understanding how to reset consumer offset in Kafka and demonstrate how it is used to control where a consumer starts reading messages in a Kafka topic.

1. Overview of Consumer Offsets in Apache Kafka

Apache Kafka is a distributed event streaming platform designed for high-throughput, fault-tolerant, and horizontally scalable data pipelines. It is commonly used for real-time data ingestion, stream processing, log aggregation, and event-driven architectures. Kafka operates as a cluster of brokers that work together to store, replicate, and serve data reliably. Kafka stores records as immutable events in topics, which are further divided into partitions to enable parallelism and scalability. Each record within a partition is assigned a sequential identifier called an offset, which represents its position in the log. Producers write records to topics, while consumers read records from partitions in a sequential manner. Consumers typically belong to consumer groups, allowing Kafka to distribute partitions among multiple consumers for load balancing and fault tolerance. Kafka tracks the read position of each consumer group using offsets, which are stored in an internal Kafka topic called __consumer_offsets. This design enables Kafka to manage consumer state independently of the application, making consumer recovery and scaling seamless. Each consumer group maintains its own offsets independently, ensuring isolation between different applications reading the same data.

1.1 Why Offsets Matter

  • They determine where a consumer resumes reading after a restart or failure
  • They enable message replay for debugging, reprocessing, or backfilling data
  • They play a critical role in achieving at-least-once or exactly-once delivery semantics
  • They help maintain data consistency across consumer instances in a group
  • Incorrectly managed offsets can lead to data loss, message duplication, or processing gaps

2. Setting Up Kafka Using Docker

Using Docker is one of the easiest and fastest ways to run Kafka locally for development and testing. Docker eliminates the need for manual installation and configuration, allowing you to spin up a fully functional Kafka environment with minimal effort. In this setup, Kafka runs in Zookeeper mode, which is still widely used and supported in many production environments. The following docker-compose.yml file defines two services: Zookeeper and Kafka. Zookeeper is responsible for managing broker metadata and cluster coordination, while Kafka handles message storage and streaming.

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

In this configuration:

  • Zookeeper listens on port 2181 and manages Kafka cluster coordination.
  • Kafka broker runs on port 9092 and exposes a plaintext listener for local access.
  • KAFKA_ADVERTISED_LISTENERS ensures clients can connect to Kafka from the host machine.
  • KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR is set to 1 to support single-broker setups.

Start Kafka using the following command:

docker-compose up -d

Once started, verify the containers are running with:

docker ps

This setup provides a lightweight local Kafka environment suitable for experimenting with topics, producers, consumers, and offset management.

3. Resetting Consumer Offsets Using Kafka CLI Tools

Kafka provides the kafka-consumer-groups.sh command-line tool to inspect and manage consumer group offsets. Resetting offsets is a powerful operation typically used for reprocessing data, recovering from errors, or skipping problematic records. To avoid inconsistent state, the consumer group must be inactive (no running consumers) before performing an offset reset.

3.1 Resetting Offsets to the Earliest Position

Resetting offsets to the earliest position forces the consumer group to reprocess all available records in the topic from the beginning of each partition. This is commonly used for full replays or backfilling data.

kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --group my-consumer-group \
  --topic my-topic \
  --reset-offsets \
  --to-earliest \
  --execute

3.2 Resetting Offsets to the Latest Position

Resetting offsets to the latest position skips all existing records and causes the consumer group to consume only new messages produced after the reset. This is useful when historical data is no longer relevant.

kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --group my-consumer-group \
  --topic my-topic \
  --reset-offsets \
  --to-latest \
  --execute

3.3 Resetting Offsets to a Specific Offset

This option resets the consumer group to a specific numeric offset for each partition. It is useful when you want to resume consumption from a known point in the log.

kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --group my-consumer-group \
  --topic my-topic \
  --reset-offsets \
  --to-offset 50 \
  --execute

3.4 Resetting Offsets to a Specific Offset

Kafka allows resetting offsets based on a timestamp. The consumer group will start reading from the first offset whose timestamp is greater than or equal to the specified datetime. This is helpful for time-based reprocessing.

kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --group my-consumer-group \
  --topic my-topic \
  --reset-offsets \
  --to-datetime 2024-01-01T00:00:00.000 \
  --execute

3.5 Performing a Dry Run Before Reset

Before applying any offset reset, it is highly recommended to perform a dry run using the --dry-run flag. This shows the expected offset changes without modifying the consumer group state.

kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --group my-consumer-group \
  --topic my-topic \
  --reset-offsets \
  --to-earliest \
  --dry-run

4. Programmatically Resetting Offsets in a Kafka Consumer

Offsets can be reset directly within consumer code by disabling auto-commit and explicitly seeking to a desired offset position (for example, the beginning, end, or a specific offset). This approach gives the application full control over when and how offsets are moved, making it useful for replaying data, backfilling, or handling error recovery scenarios. Unlike administrative resets performed via CLI tools, this method embeds offset management logic inside the consumer itself and executes at runtime using the Kafka Consumer API provided by Apache Kafka.

// OffsetResetConsumer.java
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.*;

public class OffsetResetConsumer {

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "manual-reset-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("my-topic"));

        // Wait for partition assignment
        consumer.poll(Duration.ofMillis(100));

        Set<TopicPartition> partitions = consumer.assignment();

        // Reset offsets to beginning
        consumer.seekToBeginning(partitions);

        while (true) {
            ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofSeconds(1));

            for (ConsumerRecord<String, String> record : records) {
                System.out.printf(
                        "Topic=%s Partition=%d Offset=%d Key=%s Value=%s%n",
                        record.topic(),
                        record.partition(),
                        record.offset(),
                        record.key(),
                        record.value()
                );
            }

            consumer.commitSync();
        }
    }
}

4.1 Code Explanation

This Java program creates an Apache Kafka consumer that manually resets and controls offsets before consuming messages: it first configures consumer properties such as the Kafka broker address, consumer group ID, string deserializers, and disables automatic offset commits; it then subscribes to a single topic and performs an initial poll to ensure partition assignment is completed; once partitions are assigned, it explicitly resets the offsets for all assigned partitions back to the beginning using seekToBeginning, ensuring all existing messages are re-read; inside an infinite loop, the consumer continuously polls for new records, prints detailed metadata (topic, partition, offset, key, and value) for each consumed message, and finally commits offsets synchronously after processing each batch to guarantee that only successfully processed records are marked as consumed.

4.2 Code Output

When executed, the consumer re-reads messages from the beginning of the topic and produces the below output:

Topic=my-topic Partition=0 Offset=0 Key=null Value=order-1
Topic=my-topic Partition=0 Offset=1 Key=null Value=order-2
Topic=my-topic Partition=0 Offset=2 Key=null Value=order-3

This output confirms that offsets were successfully reset and that messages were consumed starting from offset 0.

5. Resetting Consumer Offsets Using the Kafka Admin API

The Kafka Admin API provides a programmatic way to reset consumer group offsets without starting or modifying a consumer application. This approach is particularly useful for operational scenarios such as data reprocessing, incident recovery, automation scripts, CI/CD pipelines, and scheduled maintenance tasks. By interacting directly with Kafka’s internal consumer group metadata, administrators gain precise, partition-level control over offsets while keeping consumer logic unchanged.

// OffsetResetAdmin.java
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;

import java.util.*;

public class OffsetResetAdmin {

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        String groupId = "admin-reset-group";
        String topic = "my-topic";
        int partition = 0;
        long offset = 0L;

        try (AdminClient adminClient = AdminClient.create(props)) {

            TopicPartition tp = new TopicPartition(topic, partition);
            OffsetAndMetadata newOffset = new OffsetAndMetadata(offset);

            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
            offsets.put(tp, newOffset);

            System.out.println("Starting offset reset...");
            System.out.printf(
                "Consumer Group: %s%nTopic: %s%nPartition: %d%nNew Offset: %d%n",
                groupId, topic, partition, offset
            );

            adminClient
                .alterConsumerGroupOffsets(groupId, offsets)
                .all()
                .get();

            System.out.println("Offset reset completed successfully.");

        } catch (Exception e) {
            System.err.println("Offset reset failed:");
            e.printStackTrace();
        }
    }
}

5.1 Code Explanation

This Java program uses the Kafka Admin API to reset the committed offset for a specific consumer group, topic, and partition while providing clear runtime feedback. It begins by configuring an AdminClient with the Kafka bootstrap server address, then defines the target consumer group, topic name, partition number, and the new offset value to which consumption should resume. Within a try-with-resources block, the program constructs a TopicPartition object and associates it with an OffsetAndMetadata instance representing the desired offset, placing this mapping into a request map. Before executing the reset, the program prints detailed information to the console to confirm exactly which offsets will be modified. The call to alterConsumerGroupOffsets() updates Kafka’s stored offsets directly, and the blocking .all().get() ensures the operation completes successfully or throws an exception if it fails. Finally, the program reports either a success message or a detailed error, making the outcome immediately visible and easy to validate.

5.2 Code Output

When the offset reset operation completes successfully, the program produces the following console output, confirming the target consumer group, topic, partition, and new offset value that were applied:

Starting offset reset...
Consumer Group: admin-reset-group
Topic: my-topic
Partition: 0
New Offset: 0
Offset reset completed successfully.

If an error occurs—such as insufficient permissions, a non-existent consumer group, or concurrent offset commits by active consumers—the program will instead print an error message and stack trace, clearly indicating that the reset did not succeed.

6. Conclusion

Resetting consumer offsets in Apache Kafka is a powerful mechanism for replaying data, recovering from application errors, and controlling consumption behavior, where CLI tools suit quick operational fixes, consumer-level resets enable application-driven control, and the Admin API supports automation and large-scale offset management, ultimately helping prevent data loss and build reliable streaming systems.

Yatin Batra

An experience full-stack engineer well versed with Core Java, Spring/Springboot, MVC, Security, AOP, Frontend (Angular & React), and cloud technologies (such as AWS, GCP, Jenkins, Docker, K8).
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Back to top button