我有两个 Unbounded(KafkaIO) PCollections,我正在为其应用基于标签的 CoGroupByKey,固定窗口为 1 分钟,但是在加入时间 大多数情况下,对于某些具有相同键的测试数据,集合似乎错过了其中一个标记数据.请找到以下代码段.

am having two Unbounded(KafkaIO) PCollections for which am applying tag based CoGroupByKey with a fixed window of 1 min, however at the time of joining most of the time the collection seem to miss one of the tagged data for some test data having same keys. Please find the below snippet.

    KafkaIO.Read<Integer, String> event1 = ... ;

    KafkaIO.Read<Integer, String> event2 = ...;

    PCollection<KV<String,String>> event1Data = p.apply(event1.withoutMetadata())
            .apply(MapElements.via(new SimpleFunction<String, KV<String, String>>() {
                @Override public KV<String, String> apply(String input) {
                    log.info("Extracting Data");
                    . . . .//Some processing
                    return KV.of(record.get("myKey"), record.get("myValue"));

    PCollection<KV<String,String>> event2Data = p.apply(event2.withoutMetadata())
            .apply(MapElements.via(new SimpleFunction<String, KV<String, String>>() {
                @Override public KV<String, String> apply(String input) {
                    log.info("Extracting Data");
                    . . . .//Some processing
                    return KV.of(record.get("myKey"), record.get("myValue"));

   final TupleTag<String> event1Tag = new TupleTag<>();
   final TupleTag<String> event2Tag = new TupleTag<>();

   PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple
            .of(event1Tag, event1Data)
            .and(event2Tag, event2Data)

   PCollection<String> finalResultCollection =
            kvpCollection.apply("Join", ParDo.of(
                    new DoFn<KV<String, CoGbkResult>, String>() {
                        public void processElement(ProcessContext c) throws IOException {
                            KV<String, CoGbkResult> e = c.element();
                            Iterable<String> event1Values = e.getValue().getAll(event1Tag);
                            Iterable<String> event2Values = e.getValue().getAll(event2Tag);
                            if( event1.iterator().hasNext() && event2.iterator().hasNext() ){
                               // Process event1 and event2 data and write to c.output
                            }else {
                                System.out.println("Unable to join event1 and event2");

对于上面的代码,当我开始使用两个 kafka 主题的公共密钥抽取数据时,它永远不会加入,即 无法加入 event1 和 event2,如果我做错了什么,请告诉我或者有没有更好的方法将两个无界 PCollection 连接到一个公共键上.

For the above code when I start pumping data with a common key for the two kafka topics, its never getting joined i.e Unable to join event1 and event2, kindly let me know if am doing anything wrong or is there a better way to join two unbounded PCollection on a common key.


我最近遇到了类似的问题.根据 Beam 文档,要在无界 PCollection(特别是键值 PCollection)上使用 CoGroupByKey 转换,所有 PCollection 都应该具有相同的窗口和触发策略.因此,由于您正在使用流/无界集合,因此您必须根据您的触发策略在特定时间间隔后使用 Trigger 触发并发出窗口输出.这个触发器应该连续触发,因为你在这里处理流数据,即永远重复使用你的触发器.您还需要在窗口化 PCollection 上应用累积/丢弃选项来告诉光束在触发触发器后应该做什么,即累积丢弃窗格的结果.使用此加窗、触发和累加策略后,您应该使用 CoGroupByKey 变换将多个无界 PCollection 分组使用一个公共键.

I had similar issue recently. As per beam documentation, to use CoGroupByKey transfrom on unbounded PCollections (key-value PCollection, specifically), all the PCollection should have same windowing and trigger strategy. So you will have to use Trigger to fire and emit window output after certain interval based on your Triggering strategy since you are working with streaming/unbounded collections. This trigger should fire contineously since you are dealing with streaming data here i.e. use your Trigger repeatedly forever. You also need to apply accumulating/discarding option on your windowed PCollection to tell beam what should be done after trigger is fired i.e. to accumulate the result of discard the window pane. After using this windowing, trigger and accumulating strategy you should use CoGroupByKey transform to group multiple unbounded PCollection using a common key.


PCollection<KV<String, Employee>> windowedCollection1
                    = collection1.apply(Window.<KV<String, DeliveryTimeWindow>>into(FixedWindows.of(Duration.standardMinutes(5)))

PCollection<KV<String, Department>> windowedCollection2
                    = collection2.apply(Window.<KV<String, DeliveryTimeWindow>>into(FixedWindows.of(Duration.standardMinutes(5)))

然后使用 CoGroupByKey :

Then use CoGroupByKey :

final TupleTag<Employee> t1 = new TupleTag<>();
final TupleTag<Department> t2 = new TupleTag<>();

PCollection<KV<String, CoGbkResult>> groupByKeyResult =
                            .apply("Join Streams", CoGroupByKey.create());

现在您可以在 ParDo 变换中处理分组的 PCollection.

now you can process your grouped PCollection in ParDo transform.


09-23 02:48