我从Apache Kakfa开始,它带有一个简单的Java生产者,消费者应用程序。我正在使用kafka-clients版本0.10.0.1并在Mac上运行它。

我创建了一个名为replicated_topic_partitioned的主题,该主题具有3个分区,复制因子为3。

我在2181端口启动了Zookeeper。我分别在9092、9093和9094端口启动了三个ID为1、2和3的代理。

这是describe命令的输出

kafka_2.12-2.3.0/bin/kafka-topics.sh --describe --topic replicated_topic_partitioned --bootstrap-server localhost:9092
Topic:replicated_topic_partitioned    PartitionCount:3    ReplicationFactor:3    Configs:segment.bytes=1073741824
     Topic: replicated_topic_partitioned    Partition: 0    Leader: 3    Replicas: 3,1,2    Isr: 3,1,2
     Topic: replicated_topic_partitioned    Partition: 1    Leader: 1    Replicas: 1,2,3    Isr: 1,2,3
     Topic: replicated_topic_partitioned    Partition: 2    Leader: 2    Replicas: 2,3,1    Isr: 2,3,1


我写了一个简单的生产者和消费者代码。生产者成功运行并发布了消息。但是,当我启动用户时,轮询呼叫将无限期地等待。在调试时,我发现它继续在ConsumerNetworkClient上的awaitMetadataUpdate方法上循环。

这是生产者和消费者的代码

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> myProducer = new KafkaProducer<>(properties);
DateFormat dtFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss:SSS");
String topic = "replicated_topic_partitioned";

int numberOfRecords = 10;
try {
    for (int i = 0; i < numberOfRecords; i++) {
       String message = String.format("Message: %s  sent at %s", Integer.toString(i), dtFormat.format(new Date()));
       System.out.println("Sending " + message);
       myProducer.send(new ProducerRecord<String, String>(topic, message));

    }
} catch (Exception e) {
    e.printStackTrace();
} finally {
    myProducer.close();
}


消费者.java

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id", UUID.randomUUID().toString());
properties.put("auto.offset.reset", "earliest");

KafkaConsumer<String, String> myConsumer = new KafkaConsumer<>(properties);

String topic = "replicated_topic_partitioned";
myConsumer.subscribe(Collections.singletonList(topic));

try {
    while (true){
        ConsumerRecords<String, String> records = myConsumer.poll(1000);
         printRecords(records);
    }
 } finally {
     myConsumer.close();
 }


server.properties添加一些关键字段

broker.id=1
host.name=localhost
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

log.dirs=/tmp/kafka-logs-1
num.partitions=1
num.recovery.threads.per.data.dir=1

transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

zookeeper.connection.timeout.ms=6000

group.initial.rebalance.delay.ms=0


其他两个代理的server.properties是带有broker.id的上述内容的副本,端口和thelog.dirs已更改。

这对我不起作用:
Kafka 0.9.0.1 Java Consumer stuck in awaitMetadataUpdate()



但是,如果我从传递分区的命令行启动使用者,它将成功读取该分区的消息。但是,仅指定主题时,它不会收到任何消息。

作品:

kafka_2.12-2.3.0/bin/kafka-console-consumer.sh --topic replicated_topic_partitioned --bootstrap-server localhost:9092
     --from-beginning --partition 1


不起作用:

kafka_2.12-2.3.0/bin/kafka-console-consumer.sh --topic replicated_topic_partitioned --bootstrap-server localhost:9092
    --from-beginning


注意:上面的使用者可以完美地用于复制因子等于1的主题。

题:


为什么Java Producer不会读取复制因子大于一个的主题消息(即使将其分配给分区)(例如myConsumer.assign(Collections.singletonList(new TopicPartition(topic, 2))?
为什么控制台使用者仅在通过分区时才读取消息(同样适用于复制因子为1的主题)

最佳答案

因此,您发送了10条记录,但是所有10条记录都具有相同的键:

for (int i = 0; i < numberOfRecords; i++) {
   String message = String.format("Message: %s  sent at %s", Integer.toString(i), dtFormat.format(new Date()));
   System.out.println("Sending " + message);
   myProducer.send(new ProducerRecord<String, String>(topic, message)); <--- KEY=topic
}


除非另有说明(通过直接在ProducerRecord上设置分区),否则将通过以下方式确定将记录传送到的分区:


  partition = murmur2(serialize(key))%numPartitions


所以相同的键意味着相同的分区

您是否尝试过在分区0和2上搜索10条记录?

如果您希望在分区之间更好地“分散”记录,请使用null键(循环使用)或可变键。

10-05 21:23
查看更多