问题描述
是否可以获取窗口最终结果在Kafka Streams中通过抑制中间结果.
我无法实现这个目标.我的代码有什么问题?
val builder = StreamsBuilder()builder.stream(inputTopic).groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(15))).数数().suppress(Suppressed.untilWindowClose(unbounded()))//不工作).toStream().print(Printed.toSysOut())
导致这个错误:
无法刷新状态存储 KSTREAM-AGGREGATE-STATE-STORE-0000000001:java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed 无法转换为 java.lang.String
代码/错误详细信息:https://gist.github.com/robie2011/1caa4772b6983e9a2f998e9a2f
问题在于 Streams 在窗口期间自动包装显式 serde,但不自动包装默认 serde 的方式存在令人困惑的不对称性.恕我直言,这是一个应该纠正的疏忽,所以我提交了:https://issues.apache.org/jira/browse/KAFKA-7806
正如其他人所指出的,解决方案是在上游显式设置密钥 serde,而不依赖于默认密钥 serde.您可以:
使用 Materialized
val builder = StreamsBuilder()builder.stream(inputTopic).groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(15))).count(Materialized.with(Serdes.String(), Serdes.Long())).suppress(Suppressed.untilWindowClose(unbounded()))).toStream().print(Printed.toSysOut())
(如妮舒推荐的那样)
(注意没有必须为count
操作命名,这会产生可查询的副作用)
或者在上游设置 serdes,例如在输入上:
val builder = StreamsBuilder()builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.Double())).groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(15))).数数().suppress(Suppressed.untilWindowClose(unbounded()))).toStream().print(Printed.toSysOut())
(正如 wardziniak 推荐的那样)
选择权在你;我认为在这种情况下,两种情况都没有太大不同.如果您正在执行与 count
不同的聚合,则无论如何您都可能通过 Materialized
设置值 serde,因此也许前者会更统一.>
我还注意到您的窗口定义没有设置宽限期.窗口关闭时间定义为 window end + 宽限期
,默认为 24 小时,因此在应用程序运行 24 小时的数据之前,您不会看到抑制发出的任何内容.
为了您的测试工作,我建议您尝试:
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)).grace(Duration.ZERO))
在生产中,您需要选择一个宽限期,以平衡您在流中期望的事件延迟量与您希望从抑制中看到的发射及时性量.
最后一点,我在您的要点中注意到您没有更改默认缓存或提交间隔.因此,您会注意到 count
运算符本身将在默认 30 秒内缓冲更新,然后再将它们传递给抑制.这是一个很好的生产配置,因此您不会对本地磁盘或 Kafka 代理造成瓶颈.但在您进行测试时,它可能会让您大吃一惊.
通常对于测试(或以交互方式尝试内容),我将禁用缓存并将提交间隔设置为较短,以最大限度地提高开发人员的理智:
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
对 serde 的疏忽感到抱歉.我希望我们能尽快解决 KAFKA-7806.
我希望这会有所帮助!
Is it possible to get window final result in Kafka Streams by suppressing the intermediate results.
I can not achieve this goal. What is wrong with my code?
val builder = StreamsBuilder()
builder.stream<String,Double>(inputTopic)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded())) // not working)
.toStream()
.print(Printed.toSysOut())
It leads to this error:
Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000001:
java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String
Code / Error details: https://gist.github.com/robie2011/1caa4772b60b5a6f993e6f98e792a380
The problem is a confusing asymmetry in the way that Streams automatically wraps explicit serdes during windowing, but does not automatically wrap the default serde. IMHO, this is an oversight that should be corrected, so I've filed: https://issues.apache.org/jira/browse/KAFKA-7806
As others have noted, the solution is to explicitly set the key serde upstream and not rely on the default key serde. You can either:
Set the serdes on the windowed aggregation with Materialized
val builder = StreamsBuilder()
builder.stream<String,Double>(inputTopic)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
.count(Materialized.with(Serdes.String(), Serdes.Long()))
.suppress(Suppressed.untilWindowCloses(unbounded())))
.toStream()
.print(Printed.toSysOut())
(as Nishu recommended)
(note that it is not necessary to name the count
operation, which has the side effect of making it queriable)
Or set the serdes further upstream, for example on the input:
val builder = StreamsBuilder()
builder.stream<String,Double>(inputTopic, Consumed.with(Serdes.String(), Serdes.Double()))
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded())))
.toStream()
.print(Printed.toSysOut())
(as wardziniak recommended)
The choice is yours; I think in this case it's not too much different in either case. If you were doing a different aggregation than count
, you'd probably be setting the value serde via Materialized
anyway, so maybe the former would be a more uniform style.
I also noticed that your window definition doesn't have a grace period set. The window close time is defined as window end + grace period
, and the default is 24 hours, so you wouldn't see anything emitted from the suppression until 24 hours' worth of data have run through the app.
For your testing effort, I'd recommend trying:
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)).grace(Duration.ZERO))
In production, you'll want to select a grace period that balances the amount of event lateness you expect in your stream with the amount of emission promptness you wish to see from the suppression.
One final note, I noticed in your gist that you haven't changed the default caching or commit interval. As a result, you'll notice that the count
operator itself will buffer updates for the default 30 seconds before passing them on to suppression. This is a good config for production so you don't create a bottleneck to your local disk or to the Kafka broker. But it might surprise you while you're testing.
Typically for tests (or interactively trying stuff out), I'll disable caching and set the commit interval short for maximum developer sanity:
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
Sorry about the serde oversight. I hope we get KAFKA-7806 addressed soon.
I hope this helps!
这篇关于KafkaStreams:获取窗口最终结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!