删除闲置的消费者组ID

删除闲置的消费者组ID

本文介绍了Kafka:删除闲置的消费者组ID的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在某些情况下,我使用Kafka-stream对主题的较小内存(哈希图)投影进行建模. K,V缓存确实需要一些操作,因此对于GlobalKTable来说不是一个好例子.在这种缓存"情况下,我希望所有同级实例都具有相同的缓存,因此我需要绕过使用者组机制.

In some cases, I use Kafka-stream to model a small in memory (hashmap) projection of a topic. The K,V cache does require some manipulations, so it is not a good case for a GlobalKTable. In such a "caching" scenario, I want all my sibling instances to have the same cache, so I need to bypass the consumer-group mechanism.

要启用此功能,通常只需要使用随机生成的应用程序ID来启动我的应用程序,因此每个应用程序每次重新启动都会重新加载该主题.唯一需要注意的是,我最终在kafka经纪人身上孤立了一些消费者群体,直到offsets.retention.minutes为止,这对于我们的运营监控工具而言并不理想.知道如何解决此问题吗?

To enable this, I normally simply start my apps with a randomly generated application Id, so each app will reload the topic each time it restarts. The only caveat to that is that I end up with some consumer group orphaned on the kafka brokers up to the offsets.retention.minutes which is not ideal for our operational monitoring tools.Any idea how to workaround this?

  • 我们可以将applicationId配置为短暂的,以使其在应用程序死后消失吗?
  • 还是我们可以强迫消费者仅在本地管理其抵销额?
  • 还是在正常关闭应用程序时可以使用一些java adminApi清理我的消费者组ID?

谢谢

推荐答案

AdminClient中有一个Java API,称为deleteConsumerGroups,可用于删除各个ConsumerGroup.

There is a Java API in the AdminClient called deleteConsumerGroups which can be used to delete individual ConsumerGroups.

您可以按如下所示在Kafka 2.5.0中使用它.

You can use it as shown below with Kafka 2.5.0.

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;

public class DeleteConsumerGroups {
  public static void main(String[] args) {
    System.out.println("*** Starting AdminClient to delete a Consumer Group ***");

    final Properties properties = new Properties();
    properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");
    properties.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "5000");

    AdminClient adminClient = AdminClient.create(properties);
    String consumerGroupToBeDeleted = "console-consumer-65092";
    DeleteConsumerGroupsResult deleteConsumerGroupsResult = adminClient.deleteConsumerGroups(Arrays.asList(consumerGroupToBeDeleted));

    KafkaFuture<Void> resultFuture = deleteConsumerGroupsResult.all();
    try {
      resultFuture.get();
    } catch (InterruptedException e) {
      e.printStackTrace();
    } catch (ExecutionException e) {
      e.printStackTrace();
    }

    adminClient.close();
  }
}

ConsumerGroups列表,然后运行上面的代码

$ kafka-consumer-groups --bootstrap-server localhost:9092 --list
console-consumer-65092
console-consumer-53268

运行上面的代码后,

ConsumerGroups列表

ConsumerGroups List after running the code above

$ kafka-consumer-groups --bootstrap-server localhost:9092 --list
console-consumer-53268

这篇关于Kafka:删除闲置的消费者组ID的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-05 00:54