本文介绍了Kafka使用者 - 消费者进程和线程与主题分区的关系是什么的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我最近一直与卡夫卡合作,对消费者群体下的消费者有点混淆。混淆的中心是将消费者实现为流程还是线程。对于这个问题,假设我使用的是高级消费者。



让我们考虑一下我已经尝试过的场景。在我的主题中有2个分区(为简单起见,我们假设复制因子只有1)。我创建了一个消费者( ConsumerConnector )进程 consumer1 ,其中组 group1 ,然后创建一个大小为2的主题计数图,然后在该过程下生成2个消费者线程 consumer1_thread1 和 consumer1_thread2 。看起来 consumer1_thread1 正在消耗分区 0 而 consumer1_thread2 是消耗分区 1 。这种行为总是确定的吗?以下是代码段。 Class TestConsumer 是我的消费者线程类。

  ... 
Map< String,Integer> topicCountMap = new HashMap< String,Integer>();
topicCountMap.put(topic,new Integer(2));
Map< String,List< KafkaStream< byte [],byte []>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List< KafkaStream< byte [],byte []>> streams = consumerMap.get(topic);

executor = Executors.newFixedThreadPool(2);

int threadNumber = 0;
for(final KafkaStream stream:streams){
executor.submit(new TestConsumer(stream,threadNumber));
threadNumber ++;
}
...

现在,让我们考虑另一种情况(我没有尝试,但我很好奇)我开始2个消费者流程 consumer1 和 consumer2 都有相同的群组 group1 ,每个都是单线程进程。现在我的问题是:


  1. 两个独立的消费者流程(在同一个群体下)如何与分区相关联这个案例 ?它与上述单进程多线程场景有何不同?


  2. 一般来说,消费者线程或流程如何映射/与主题中的分区相关?


  3. Kafka文档确实说消费者群体下的每个消费者都会消耗一个分区。但是,这是指消费者线程(如我上面的代码示例)还是独立的消费者流程?


  4. 这里有关于实施的细微之处吗?消费者作为流程与线程?在此先感谢。



解决方案

消费者群组可以运行多个消费者实例(具有相同 group-id 的多个进程)。消耗时,每个分区仅由组中的一个消费者实例使用



例如。如果您的主题包含2个分区,并且您启动了具有2个使用者实例的消费者组 group-A ,则每个消费者实例中的每个将使用来自该主题的特定分区的消息。 / p>

如果您使用不同的组ID启动相同的2个消费者 group-A & group-B 然后来自该主题的两个分区的消息将被广播到它们中的每一个。因此,在这种情况下,在 group-A 下运行的消费者实例将具有来自该主题的两个分区的消息,对于 group-B也是如此。 。





编辑:根据您的评论说明,

消费者 group-id 在整个群集中是相同/全局的。假设你已经启动了一个有2个线程的进程,然后生成另一个进程(可能在不同的机器中),同一个groupId有2个以上的线程,那么kafka将添加这2个新线程来使用来自该主题的消息。因此最终将有4个线程负责从同一主题消费。然后Kafka将触发重新平衡以重新为线程分配分区,因此可能会发生对于线程消耗的特定分区进程P1的的T1可能是分配给进程P2 的线程 T2消耗。以下几行来自维基页面


I have been working with Kafka lately and have bit of confusion regarding the consumers under a consumer group. The center of the confusion is whether to implement consumers as processes or threads. For this question, assume I am using the high level consumer.

Let's consider a scenario that I have experimented with. In my topic there are 2 partitions (for simplicity let's assume replication factor is just 1). I created a consumer (ConsumerConnector) process consumer1 with group group1, then created a topic count map of size 2 and then spawned 2 consumer threads consumer1_thread1 and consumer1_thread2 under that process. It looks like consumer1_thread1 is consuming partition 0 and consumer1_thread2 is consuming partition 1. Is this behaviour always deterministic? Below is the code snippet. Class TestConsumer is my consumer thread class.

    ...
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(2));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    executor = Executors.newFixedThreadPool(2);

    int threadNumber = 0;
    for (final KafkaStream stream : streams) {
        executor.submit(new TestConsumer(stream, threadNumber));
        threadNumber++;
    }
    ...

Now, let's consider another scenario (which I haven't experimented but am curious) where I start 2 consumer processes consumer1 and consumer2 both having the same group group1 and each of them is a single threaded process. Now my questions are:

  1. How will the two independent consumer processes (under the same group nevertheless) be related to the partitions in this case ? How is it different from the above single process multi-thread scenario?

  2. In general, how are consumer threads or processes mapped / related to partitions in the topic?

  3. The Kafka documentation does say that each consumer under a consumer group will consume one partition. However, does that refer to a consumer thread (like my above code example) or independent consumer processes?

  4. Is there any subtle thing I am missing here regarding implementing consumers as processes vs threads? Thanks in advance.

解决方案

A consumer group can have multiple consumer instances running (multiple process with the same group-id). While consuming each partition is consumed by exactly one consumer instance in the group.

E.g. if your topic contains 2 partitions and you start a consumer group group-A with 2 consumer instances then each one of them will be consuming messages from a particular partition of the topic.

If you start the same 2 consumer with different group id group-A & group-B then the message from both partitions of the topic will be broadcast to each one of them. So in that case the consumer instance running under group-A will have messages from both the partitions of the topic, and same is true for group-B as well.

Read more on this on their documentation

EDIT : Based on your comment which says,

The consumer group-id is same/global across the cluster. Suppose you have started process-one with 2 threads and then spawn another process (may be in a different machine) with the same groupId having 2 more threads then kafka will add these 2 new threads to consume messages from the topic. So eventually there will be 4 threads responsible for consuming from the same topic. Kafka will then trigger a re-balance to re-assign partitions to threads, so it could happen that for a particular partition which was being consumed by thread T1 of process P1may be allocated to be consumed by thread T2 of process P2. The below few lines are taken from the wiki page

这篇关于Kafka使用者 - 消费者进程和线程与主题分区的关系是什么的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-28 02:36