问题描述
在使用Kafka Streams的Processor API时,我使用的是这样的内容:
While using Processor API of Kafka Streams, I use something like this:
context.forward(key,value)
context.commit()
实际上,我在这里所做的是每分钟从状态存储中转发一个状态以陷入(使用init()方法中的context.schedule()).我在这里不明白的是:
我正在转发的[键,值]对,然后从状态存储中获取了commit().它是根据我的特定逻辑从很多 非顺序输入[键,值]对中聚合的.每个这样的输出[key,value]对都是来自输入(kafka主题)的少量未排序的 [key,value]对的集合.因此,我不了解Kafka集群和Kafka Streams lib如何知道原始输入[key,value]对与正在发送的最终输出[key,value]之间的相关性.如果Kafka不知道输入对和输出对之间的连接,则如何通过事务包装它(故障安全).而当我执行context.commit()时,实际上是在落实什么呢?
谢谢!
Actually, what I'm doing here is sending forward a state from state store to sink every minute (using context.schedule() in init() method). What I don't understand here is:
[Key,Value] pair I'm sending forward and then doing commit() is taken from state store. It is aggregated according to my specific logic from many not sequential input [key,value] pairs. Each such output [key,value] pair is aggregation of few not ordered [key,value] pairs from input (kafka topic). So, I don't understand how Kafka cluster and Kafka Streams lib can know the correlation between the original input [key,value] pairs and the eventual output [key,value] that is being sent out. How it can be wrapped by transaction (fail-safe), if Kafka doesn't know the connection between input pairs and output pair. And what is actually being committed when I do context.commit()?
Thanks!
推荐答案
要详细解释所有这些内容,还不如我在答案中所写的内容.
To explain all this in details goes beyond what I can write here in an answer.
基本上,如果提交事务,则当前输入主题的偏移量和对Kafka主题的所有写操作都是原子完成的.这意味着,在完成提交之前,所有未完成的写操作都已刷新.
Basically the current input topic offsets and all writes to Kafka topics are done atomically if a transaction is committed. This implies, that all pending writes are flushed before the commit is done.
交易不需要了解您的实际业务逻辑.他们只是将输入主题的进度跟踪与输出主题的写入同步".
Transactions don't need to know about your actual business logic. They just "synchronize" the progress tracking on the input topics with writes to output topics.
我建议阅读相应的博客文章,并在Kafka观看有关一次准确的讨论,以获取更多详细信息:
I would recommend to read corresponding blog posts and watch talks about exactly-once in Kafka to get more details:
- 博客: https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
- 博客: https://www.confluent.io/blog/enabled-exactly-kafka-streams/
- 对话: https://www.confluent. io/kafka-summit-nyc17/resource/#exactly-once-semantics_slide
- 对话: https://www.confluent.io/kafka-summit-sf17/resource/#Exactly-once-Stream-Processing-with-Kafka-Streams_slide
- Blog: https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
- Blog: https://www.confluent.io/blog/enabling-exactly-kafka-streams/
- Talk: https://www.confluent.io/kafka-summit-nyc17/resource/#exactly-once-semantics_slide
- Talk: https://www.confluent.io/kafka-summit-sf17/resource/#Exactly-once-Stream-Processing-with-Kafka-Streams_slide
顺便说一句:这是有关Streams API中手动提交的问题.您应该考虑以下问题:如何使用Kafka Stream手动提交?
Btw: This is a question about manual commits in Streams API. You should consider this: How to commit manually with Kafka Stream?
这篇关于了解Kafka Streams中处理器实现中的事务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!