本文介绍了如何在kafka中同步多个日志?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有2种类型的日志,它们有一个共同的字段'uid',如果这2个包含uid的日志的日志都到了,我想输出日志,就像一个join,kafka可以吗?

Suppose I have 2 types of logs, which have a common field 'uid', and I want to output the log if the log of both of these 2 logs containing the uid arrives, like a join, is it possible for Kafka ?

推荐答案

是的,绝对的.查看 Kafka Streams,特别是 DSL API.它是这样的:

Yes, absolutely. Check out Kafka Streams, specifically the DSL API. It goes something like:

 StreamsBuilder builder = new StreamsBuilder();

 KStream<byte[], Foo> fooStream = builder.stream("foo");

 KStream<byte[], Bar> barStream = builder.stream("bar");

 fooStream.join(barStream,
                (foo, bar) -> {
                    foo.baz = bar.baz;
                    return foo;
                },
                JoinWindows.of(1000))
          .to("buzz");

这个简单的应用程序使用两个输入主题(foo"和bar"),将它们连接起来并将它们写入主题buzz".由于流是无限的,因此在加入两个流时,您需要指定一个加入窗口(1000 毫秒以上),即各个流上的两条消息之间的相对时间差,以使它们有资格加入.

This simple application consumes two input topics ("foo" and "bar"), joins them and writes them to topic "buzz". Since streams are infinite, when joining two streams you need to specify a join window (1000 milliseconds above), which is the relative time difference between two messages on the respective streams to make them eligible for joining.

这里有一个更完整的例子:https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/流/PageViewRegionLambdaExample.java

Here is a more complete example: https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/PageViewRegionLambdaExample.java

这里是文档:https://docs.confluent.io/current/streams/developer-guide/dsl-api.html.您会发现可以执行多种不同类型的连接:

And here is the documentation: https://docs.confluent.io/current/streams/developer-guide/dsl-api.html. You'll find there are many different kinds of joins you can perform:

需要注意的是,虽然上面的例子会确定性地同步流——如果你重置并重新处理拓扑,你每次都会得到相同的结果——但并非 Kafka Streams 中的所有连接操作都是确定性的.从 1.0.0 版及之前的版本开始,大约一半是不确定的,可能取决于从底层主题分区消耗的数据的顺序.具体来说,内部 KStream-KStream 和所有 KTable-KTable 连接是确定性的.其他连接,如所有 KStream-KTable 连接和左/外 KStream-KStream 连接是非确定性和取决于消费者消费数据的顺序.如果您将拓扑设计为可重新处理,请记住这一点.如果您使用这些非确定性操作,当您的拓扑实时运行时,事件到达时的顺序将产生一个结果,但如果您正在重新处理您的拓扑,您可能会得到另一个结果.还要注意像 KStream#merge() 这样的操作也不会产生确定性的结果.有关此问题的更多信息,请参阅 为什么我的Kafka Streams 拓扑不能正确重放/重新处理? 和这个 邮件列表帖子

It is important to note that although the above example will deterministically synchronize streams—if you reset and reprocess the topology, you will get the same result each time—not all join operations in Kafka Streams are deterministic. As of version 1.0.0 and before, approximately half are not deterministic and may depend on the order of data consumed from the underlying topic-partitions. Specifically, inner KStream-KStream and all KTable-KTable joins are deterministic. Other joins, like all KStream-KTable joins and left/outer KStream-KStream joins are non-deterministic and depend on order of data consumed by consumers. Keep this in mind if you are designing your topology to be reprocessable. If you use these non-deterministic operations, when your topology is running live, the order of events as they arrive will produce one result, but if you are reprocessing your topology you may get another result. Note also operations like KStream#merge() do not produce deterministic results either. For more regarding this problem, see Why does my Kafka Streams topology does not replay/reprocess correctly? and this mailing list post

这篇关于如何在kafka中同步多个日志?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-11 01:09