本文介绍了Apache Beam-在两个无边界的PCollection上通过键进行流连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

am具有两个Unbounded(KafkaIO)PCollections,为此他们应用基于标记的CoGroupByKey且窗口固定为1分钟,但是在大多数情况下,加入该集合时似乎错过了具有相同密钥的某些测试数据的标记数据.请找到以下代码段.

am having two Unbounded(KafkaIO) PCollections for which am applying tag based CoGroupByKey with a fixed window of 1 min, however at the time of joining most of the time the collection seem to miss one of the tagged data for some test data having same keys. Please find the below snippet.

    KafkaIO.Read<Integer, String> event1 = ... ;


    KafkaIO.Read<Integer, String> event2 = ...;

    PCollection<KV<String,String>> event1Data = p.apply(event1.withoutMetadata())
            .apply(Values.<String>create())
            .apply(MapElements.via(new SimpleFunction<String, KV<String, String>>() {
                @Override public KV<String, String> apply(String input) {
                    log.info("Extracting Data");
                    . . . .//Some processing
                    return KV.of(record.get("myKey"), record.get("myValue"));
                }
            }))
            .apply(Window.<KV<String,String>>into(
                    FixedWindows.of(Duration.standardMinutes(1))));

    PCollection<KV<String,String>> event2Data = p.apply(event2.withoutMetadata())
            .apply(Values.<String>create())
            .apply(MapElements.via(new SimpleFunction<String, KV<String, String>>() {
                @Override public KV<String, String> apply(String input) {
                    log.info("Extracting Data");
                    . . . .//Some processing
                    return KV.of(record.get("myKey"), record.get("myValue"));
                }
            }))
            .apply(Window.<KV<String,String>>into(
                    FixedWindows.of(Duration.standardMinutes(1))));

   final TupleTag<String> event1Tag = new TupleTag<>();
   final TupleTag<String> event2Tag = new TupleTag<>();

   PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple
            .of(event1Tag, event1Data)
            .and(event2Tag, event2Data)
            .apply(CoGroupByKey.<String>create());

   PCollection<String> finalResultCollection =
            kvpCollection.apply("Join", ParDo.of(
                    new DoFn<KV<String, CoGbkResult>, String>() {
                        @ProcessElement
                        public void processElement(ProcessContext c) throws IOException {
                            KV<String, CoGbkResult> e = c.element();
                            Iterable<String> event1Values = e.getValue().getAll(event1Tag);
                            Iterable<String> event2Values = e.getValue().getAll(event2Tag);
                            if( event1.iterator().hasNext() && event2.iterator().hasNext() ){
                               // Process event1 and event2 data and write to c.output
                            }else {
                                System.out.println("Unable to join event1 and event2");
                            }
                        }
                    }));

对于上面的代码,当我开始使用两个kafka主题的通用密钥开始抽取数据时,它永远都不会加入,即Unable to join event1 and event2,请让我知道是否做错了什么,或者是否有更好的方法来合并两个无边界PCollection使用公共密钥.

For the above code when I start pumping data with a common key for the two kafka topics, its never getting joined i.e Unable to join event1 and event2, kindly let me know if am doing anything wrong or is there a better way to join two unbounded PCollection on a common key.

推荐答案

我最近有类似的问题.根据梁文档,要在无边界PCollection(特别是键值PCollection)上使用CoGroupByKey转换,所有PCollection应该具有相同的窗口和触发策略.因此,由于您要使用流式/无界集合,因此必须根据触发策略使用触发"在一定间隔后触发并发出窗口输出.由于您正在此处处理流数据,因此该触发器应连续触发,即永久重复使用触发器.您还需要在窗口化的PCollection上应用累积/丢弃"选项,以告诉Beam在触发触发器后应执行的操作,即累积丢弃窗格的结果.使用此窗口,触发和累加策略后,应使用CoGroupByKey变换使用公共密钥对多个无边界的PCollection进行分组.

I had similar issue recently. As per beam documentation, to use CoGroupByKey transfrom on unbounded PCollections (key-value PCollection, specifically), all the PCollection should have same windowing and trigger strategy. So you will have to use Trigger to fire and emit window output after certain interval based on your Triggering strategy since you are working with streaming/unbounded collections. This trigger should fire contineously since you are dealing with streaming data here i.e. use your Trigger repeatedly forever. You also need to apply accumulating/discarding option on your windowed PCollection to tell beam what should be done after trigger is fired i.e. to accumulate the result of discard the window pane. After using this windowing, trigger and accumulating strategy you should use CoGroupByKey transform to group multiple unbounded PCollection using a common key.

类似这样的东西:

PCollection<KV<String, Employee>> windowedCollection1
                    = collection1.apply(Window.<KV<String, DeliveryTimeWindow>>into(FixedWindows.of(Duration.standardMinutes(5)))
                    .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
                    .withAllowedLateness(Duration.ZERO).accumulatingFiredPanes());


PCollection<KV<String, Department>> windowedCollection2
                    = collection2.apply(Window.<KV<String, DeliveryTimeWindow>>into(FixedWindows.of(Duration.standardMinutes(5)))
                    .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
                    .withAllowedLateness(Duration.ZERO).accumulatingFiredPanes());

然后使用CoGroupByKey:

Then use CoGroupByKey :

final TupleTag<Employee> t1 = new TupleTag<>();
final TupleTag<Department> t2 = new TupleTag<>();

PCollection<KV<String, CoGbkResult>> groupByKeyResult =
                    KeyedPCollectionTuple.of(t1,windowedCollection1)
.and(t2,windowedCollection2) 
                            .apply("Join Streams", CoGroupByKey.create());

现在您可以在ParDo转换中处理分组的PCollection.

now you can process your grouped PCollection in ParDo transform.

希望这会有所帮助!

这篇关于Apache Beam-在两个无边界的PCollection上通过键进行流连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-23 02:48