我在kafka消费者文档中看到了此注释-


由于分区很多,因此仍然可以平衡许多负载
消费者实例。但是请注意,不能有更多的消费者
实例而不是分区。


一个主题有50个分区。如果我给a_numThreads值设置为50,则从每个分区中提取1条消息?以上消息是否表示我在任何时候都不能创建超过50个线程?

public void run(int a_numThreads) {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(a_numThreads));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    // now launch all the threads
    //
    executor = Executors.newFixedThreadPool(a_numThreads);

    // now create an object to consume the messages
    //
    int threadNumber = 0;
    for (final KafkaStream stream : streams) {
        executor.submit(new ConsumerTest(stream, threadNumber));
        threadNumber++;
    }
}

最佳答案

您先执行a_numThreads = 50然后执行Executors.newFixedThreadPool(a_numThreads);的事实,这意味着您至少在任何时间点都不能使用该执行程序创建50个以上的线程。

文档说的是,一个分区只能分配给1个流,如果您不是创建50个流,而是创建51个流,则后者将一无所获,如here所述

10-05 22:28
查看更多