问题描述
我正在研究 Kafka Streams,但我对 Java 8 中 WordCount 的第一个示例有疑问,该示例取自文档.
I'm studying Kafka Streams and I have a problem with the first example of WordCount in Java 8, taken from the documentation.
使用最新版本的 kafka 流、Kafka Connect 和 WordCount lambda 表达式示例.
Using the latest available versions of kafka streams, Kafka Connect and WordCount lambda expressions example.
我遵循以下步骤:我在 Kafka 中创建了一个输入主题和一个输出主题.启动应用程序流,然后通过从 .txt 文件中插入一些单词来上传输入主题
I follow the following steps:I create an input topic in Kafka, and an output one. Start the app streaming and then uploading the input topic by inserting some words from a .txt file
在第一次计数时,在输出主题中,我看到单词分组正确,但计数错误.如果我尝试重新插入相同的单词,则之前错误计数的连续计数都是正确的.
On the first count, in the output topic I see the words grouped correctly, but the counts are wrong. If I try to reinsert the same words, the successive counts from the previous incorrect counts are all correct.
如果我使用消费者控制台查看输入主题转储,它会正确加载并且没有脏数据.
If I Looking the input topic dump with a consumer console, it's loaded properly and there are no dirty data.
第一次数错了怎么办?
示例 [第一个数据]:(在Kafka中输入Topic)嗨嗨迈克迈克测试
Example [FIRST DATA]:(input Topic in Kafka)hi himike miketest
(应用流式传输正在运行)
(App streaming is running)
(输出主题) hi 12 mike 4 test 3 (casual counts)
(output Topic) hi 12 mike 4 test 3 (casual counts)
[SUCCESSIVE DATA - 在输入主题中张贴相同的词]
[SUCCESSIVE DATA - Posting in the input topic the same words]
(输出主题)嗨 14 麦克风 6 测试 4
(output Topic) hi 14 mike 6 test 4
[新尝试]
(输出主题)嗨 16 迈克 8 测试 5
(output Topic) hi 16 mike 8 test 5
等等......
推荐答案
Apache Kafka 中的 WordCount 演示有 以下几行:
The WordCount demo in Apache Kafka has the following lines:
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
// Note: To re-run the demo, you need to use the offset reset tool:
// https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
这意味着,当您重新启动应用程序时,它将从一开始(最早")读取其输入主题iff Kafka 中存储的 WordCount 应用程序没有现有的消费者偏移量.应用程序的消费者偏移量在一定量的应用程序不活动后在 Kafka 中到期,默认为 24 小时(参见 offsets.retention.minutes
代理配置).
This means that, when you restart the app, it will read its input topic from the very beginning ("earliest") iff there are no existing consumer offsets for the WordCount app stored in Kafka. An app's consumer offsets expire in Kafka after a certain amount of app inactivity, the default is 24 hours (cf. the offsets.retention.minutes
broker configuration).
我可以想象发生了以下事情:
I could imagine that the following happened:
- 您早些时候尝试过 Kafka,并将测试数据输入到输入主题中.
- 然后您在继续实验之前休息了 >24 小时.
- 现在,当应用重新启动时,它会从头开始一直重新读取输入主题,从而获取较旧的测试输入数据,从而导致膨胀"计数.
如果我使用消费者控制台查看输入主题转储,它会正确加载并且没有脏数据.
您可以通过控制台使用者再次查看输入主题,同时添加 CLI 选项 --from-beginning
(请参阅 https://kafka.apache.org/documentation/#quickstart_consume).
You can verify my hypothesis above by looking at the input topic again with the console consumer while adding the CLI option --from-beginning
(see https://kafka.apache.org/documentation/#quickstart_consume).
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yourInputTopic --from-beginning
这将向您显示主题yourInputTopic"中的所有可用数据——减去在此期间可能已从 Kafka 主题中清除的任何数据(默认代理配置将清除超过 7 天的数据,参见. log.retention.hours
).
This will show you all the available data in the topic "yourInputTopic" -- minus any data that might have been purged from the Kafka topics in the meantime (the default broker configuration will purge data that is older than 7 days, cf. log.retention.hours
).
这篇关于Kafka 流 - 第一个示例 WordCount 未正确计算第一圈的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!