我从Apache Kakfa开始,它带有一个简单的Java生产者,消费者应用程序。我正在使用kafka-clients
版本0.10.0.1
并在Mac上运行它。
我创建了一个名为replicated_topic_partitioned
的主题,该主题具有3个分区,复制因子为3。
我在2181端口启动了Zookeeper。我分别在9092、9093和9094端口启动了三个ID为1、2和3的代理。
这是describe命令的输出
kafka_2.12-2.3.0/bin/kafka-topics.sh --describe --topic replicated_topic_partitioned --bootstrap-server localhost:9092
Topic:replicated_topic_partitioned PartitionCount:3 ReplicationFactor:3 Configs:segment.bytes=1073741824
Topic: replicated_topic_partitioned Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: replicated_topic_partitioned Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: replicated_topic_partitioned Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
我写了一个简单的生产者和消费者代码。生产者成功运行并发布了消息。但是,当我启动用户时,轮询呼叫将无限期地等待。在调试时,我发现它继续在ConsumerNetworkClient上的awaitMetadataUpdate方法上循环。
这是生产者和消费者的代码
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> myProducer = new KafkaProducer<>(properties);
DateFormat dtFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss:SSS");
String topic = "replicated_topic_partitioned";
int numberOfRecords = 10;
try {
for (int i = 0; i < numberOfRecords; i++) {
String message = String.format("Message: %s sent at %s", Integer.toString(i), dtFormat.format(new Date()));
System.out.println("Sending " + message);
myProducer.send(new ProducerRecord<String, String>(topic, message));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
myProducer.close();
}
消费者.java
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id", UUID.randomUUID().toString());
properties.put("auto.offset.reset", "earliest");
KafkaConsumer<String, String> myConsumer = new KafkaConsumer<>(properties);
String topic = "replicated_topic_partitioned";
myConsumer.subscribe(Collections.singletonList(topic));
try {
while (true){
ConsumerRecords<String, String> records = myConsumer.poll(1000);
printRecords(records);
}
} finally {
myConsumer.close();
}
从
server.properties
添加一些关键字段broker.id=1
host.name=localhost
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs-1
num.partitions=1
num.recovery.threads.per.data.dir=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
其他两个代理的server.properties是带有broker.id的上述内容的副本,端口和thelog.dirs已更改。
这对我不起作用:
Kafka 0.9.0.1 Java Consumer stuck in awaitMetadataUpdate()
但是,如果我从传递分区的命令行启动使用者,它将成功读取该分区的消息。但是,仅指定主题时,它不会收到任何消息。
作品:
kafka_2.12-2.3.0/bin/kafka-console-consumer.sh --topic replicated_topic_partitioned --bootstrap-server localhost:9092
--from-beginning --partition 1
不起作用:
kafka_2.12-2.3.0/bin/kafka-console-consumer.sh --topic replicated_topic_partitioned --bootstrap-server localhost:9092
--from-beginning
注意:上面的使用者可以完美地用于复制因子等于1的主题。
题:
为什么Java Producer不会读取复制因子大于一个的主题消息(即使将其分配给分区)(例如
myConsumer.assign(Collections.singletonList(new TopicPartition(topic, 2)
)?为什么控制台使用者仅在通过分区时才读取消息(同样适用于复制因子为1的主题)
最佳答案
因此,您发送了10条记录,但是所有10条记录都具有相同的键:
for (int i = 0; i < numberOfRecords; i++) {
String message = String.format("Message: %s sent at %s", Integer.toString(i), dtFormat.format(new Date()));
System.out.println("Sending " + message);
myProducer.send(new ProducerRecord<String, String>(topic, message)); <--- KEY=topic
}
除非另有说明(通过直接在
ProducerRecord
上设置分区),否则将通过以下方式确定将记录传送到的分区:partition = murmur2(serialize(key))%numPartitions
所以相同的键意味着相同的分区
您是否尝试过在分区0和2上搜索10条记录?
如果您希望在分区之间更好地“分散”记录,请使用null键(循环使用)或可变键。