问题描述
有什么办法可以在kafka-console-producer中产生一个空值的消息(即标记它以便compactor用tombstone删除它)?
Is there any way to produce a message in the kafka-console-producer with a null value (ie. mark it for the compactor to delete it with a tombstone)?
我试过生成mykey"和mykey|".前者产生错误,后者使值成为空字符串.像这样运行生产者:
I've tried producing "mykey" and "mykey|". The former produces an error and the later makes the value the empty string. Running producer like this:
$KAFKA_HOME/bin/kafka-console-producer --broker-list localhost:9092 --topic mytopic --property "parse.key=true" --property "key.separator=|"
推荐答案
不幸的是,使用 console-producer 无法做到这一点
Unfortunately, there is no way to do that using console-producer
这是来自 ConsoleProducer 类的代码片段(它如何读取数据).Kafka 0.11.0(不要以为不同版本之间变化很大)
this is a code snippet from ConsoleProducer class (how it reads the data). Kafka 0.11.0 (don't think that it was changed significantly between different versions).
override def readMessage() = {
lineNumber += 1
print(">")
(reader.readLine(), parseKey) match {
case (null, _) => null
case (line, true) =>
line.indexOf(keySeparator) match {
case -1 =>
if (ignoreError) new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8))
else throw new KafkaException(s"No key found on line $lineNumber: $line")
case n =>
val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes(StandardCharsets.UTF_8)
new ProducerRecord(topic, line.substring(0, n).getBytes(StandardCharsets.UTF_8), value)
}
case (line, false) =>
new ProducerRecord(topic, line.getBytes(StandardCharsets.UTF_8))
}
}
如您所见,该值始终是一个不可为空的字节数组
as you can see, the value is always an non-nullable array of bytes
这篇关于从控制台生成具有空值(墓碑)的 Kafka 消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!