TL; DR:
如何使用与CalendarWindows设置相同的窗口策略来CoGroupByKey一组PCollection?
LONG VERSION
我正在编写一个从两个不同的发布/订阅中读取的数据流管道,其中一个PCollections被拆分为一个PCollectionTuple,最后我尝试先join them in a CoGroupByKey,然后再将其保存在BigQuery中。
在管道测试期间,我的PCollections的窗口化策略是:
private static PCollection<KV<String, Long>> applyWindowsAndCount(final PCollection<KV<String, Long>> summary, final String OperationName){
return summary
.apply("Apply Windows " + OperationName, Window
.<KV<String, Long>>into(FixedWindows.of(Duration.standardMinutes(1)))
.discardingFiredPanes()
.withAllowedLateness(Duration.standardDays(1)))
.apply("Count " + OperationName, Count.perKey());
}
我将它们设置为长度为1分钟的 FixedWindow,以便快速获得结果。我的分组就像:
private static PCollection<KV<String, CoGbkResult>> MergeSummary(PCollection<KV<String, Long>> Avail, PCollection<KV<String, Long>> ValuationOK, PCollection<KV<String, Long>> ValuationKO){
return KeyedPCollectionTuple.of(Util.AVAIL, Avail)
.and(Util.VALUATION_OK, ValuationOK)
.and(Util.VALUATION_KO, ValuationKO)
.apply("Merge Summary", CoGroupByKey.create());
}
当我在本地和云上进行测试时,它运行得很顺利,但是,当我使用实际生产值设置窗口时, CalendarWindows的日期为 1天,如下所示:private static PCollection<KV<String, Long>> applyWindowsAndCount(final PCollection<KV<String, Long>> summary, final String OperationName){
return summary
.apply("Apply Windows " + OperationName, Window
.<KV<String, Long>>into(CalendarWindows.days(1).withTimeZone(DateTimeZone.UTC).withStartingDay(2016,9,20)) //Per day windowing.
.discardingFiredPanes()
.withAllowedLateness(Duration.standardDays(1))) //Accepts X days late data.
.apply("Count " + OperationName, Count.perKey());
}
然后,我什至无法编译代码,因为我收到如下消息:Exception in thread "main" java.lang.IllegalStateException: Inputs to Flatten had incompatible window windowFns: com.google.cloud.dataflow.sdk.transforms.windowing.CalendarWindows$DaysWindows@6af9fcb2, com.google.cloud.dataflow.sdk.transforms.windowing.CalendarWindows$DaysWindows@6cce16f4
at com.google.cloud.dataflow.sdk.transforms.Flatten$FlattenPCollectionList.apply(Flatten.java:121)
at com.google.cloud.dataflow.sdk.transforms.Flatten$FlattenPCollectionList.apply(Flatten.java:105)
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:413)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:367)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:274)
at com.google.cloud.dataflow.sdk.values.PCollectionList.apply(PCollectionList.java:175)
at com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey.apply(CoGroupByKey.java:124)
at com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey.apply(CoGroupByKey.java:74)
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:413)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:367)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:290)
at com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple.apply(KeyedPCollectionTuple.java:116)
阅读文档后,我发现:是否很清楚,数据流认为我的PCollection具有不兼容的窗口,但是,所有这些都使用我之前复制的函数来应用。因此,我如何CoGroupByKey一组具有CalendarWindows设置的相同窗口策略的PCollections?
最佳答案
看起来像是CalendarWindows中的错误;要解决此问题,您可以创建一个CalendarWindows对象,并将其用作每个PCollection的WindowFn,而不是为每个对象创建单独的CalendarWindows对象。