docs所指定,我们为ParDo定义了2个BigQuery边输入。当管道在本地执行时(即DirectPipelineRunner),侧面输入可以正常工作。但是,当它在云中执行时,它会陷入:

java.lang.IllegalArgumentException: calling sideInput() with unknown view; did you forget to pass the view in ParDo.withSideInputs()?

为什么它会在本地运行,而在通过云执行时却无法运行?
Connected to the target VM, address: '127.0.0.1:61484', transport: 'socket'
Mar 05, 2015 4:58:26 PM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner fromOptions
INFO: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 75 files. Enable logging at DEBUG level to see which files will be staged.
Mar 05, 2015 4:58:27 PM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner run
INFO: Executing pipeline on the Dataflow Service, which will have billing implications related to Google Compute Engine usage and other Google Cloud Services.
Mar 05, 2015 4:58:27 PM com.google.cloud.dataflow.sdk.util.PackageUtil stageClasspathElementsToGcs
INFO: Uploading 75 files from PipelineOptions.filesToStage to GCS to prepare for execution in the cloud.
Mar 05, 2015 4:59:20 PM com.google.cloud.dataflow.sdk.util.PackageUtil stageClasspathElementsToGcs
INFO: Uploading PipelineOptions.filesToStage complete: 4 files newly uploaded, 71 files cached
Dataflow SDK version: 0.3.150227
Mar 05, 2015 4:59:30 PM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner run
Submitted job: 2015-03-04_21_59_29-8181498263178343117
INFO: To access the Dataflow monitoring console, please navigate to https://console.developers.google.com/project/<removed>/dataflow/job/2015-03-04_21_59_29-8181498263178343117
2015-03-05T06:00:06.431Z: (470854a2051eb53e): Expanding GroupByKey operations into optimizable parts.
2015-03-05T06:00:06.434Z: (470854a2051ebec0): Annotating graph with Autotuner information.
2015-03-05T06:00:38.227Z: (470854a2051ebf4a): Fusing adjacent ParDo, Read, Write, and Flatten operations
2015-03-05T06:00:38.230Z: (470854a2051eb8cc): Fusing consumer Impressions-CPT-transformation into Impressions-GCS-read
2015-03-05T06:00:38.233Z: (470854a2051eb24e): Fusing consumer Impressions-GSC-write into Impressions-CPT-transformation
2015-03-05T06:00:38.236Z: (470854a2051ebbd0): Fusing consumer ActiveViews-CPT-transformation into ActiveViews-GCS-read
2015-03-05T06:00:38.239Z: (470854a2051eb552): Fusing consumer ActiveViews-GSC-write into ActiveViews-CPT-transformation
2015-03-05T06:00:38.241Z: (470854a2051ebed4): Fusing consumer Clicks-CPT-transformation into Clicks-GCS-read
2015-03-05T06:00:38.243Z: (470854a2051eb856): Fusing consumer Clicks-GSC-write into Clicks-CPT-transformation
2015-03-05T06:00:38.291Z: (470854a2051ebcfc): Adding StepResource setup and teardown to workflow graph.
2015-03-05T06:00:38.298Z: (470854a2051eb000): Not adding lease related steps.
2015-03-05T06:00:38.311Z: (470854a2051eb982): Starting the input generators.
2015-03-05T06:00:38.332Z: (91191d48cd0d620e): Adding workflow start and stop steps.
2015-03-05T06:00:38.335Z: (91191d48cd0d6d50): Assigning stage ids.
2015-03-05T06:00:38.796Z: S11: (c3f8963c90f0500f): Executing operation Lineitems2
2015-03-05T06:00:38.839Z: S06: (6090a361d1acd580): Executing operation Advertisers3
2015-03-05T06:00:38.839Z: (b3da167931dd7955): Value "Lineitems2.out" materialized.
2015-03-05T06:00:38.840Z: S01: (d248458030712ffd): Executing operation Advertisers
2015-03-05T06:00:38.861Z: S03: (470854a2051ebfc5): Executing operation Lineitems
2015-03-05T06:00:38.869Z: S08: (84f12b285dcf746b): Executing operation Lineitems3
2015-03-05T06:00:38.874Z: S13: (fd7e694921bb42e4): Executing operation Advertisers2
2015-03-05T06:00:38.876Z: S12: (902bfbb49c2e797d): Executing operation AsIterable4/CreatePCollectionView
2015-03-05T06:00:38.880Z: (19dca2539403f653): Value "Advertisers.out" materialized.
2015-03-05T06:00:38.881Z: (76efcecc96a971f3): Value "Advertisers3.out" materialized.
2015-03-05T06:00:38.905Z: (84f12b285dcf736c): Starting worker pool setup.
2015-03-05T06:00:38.906Z: (19dca2539403f3d2): Value "Lineitems.out" materialized.
2015-03-05T06:00:38.912Z: (84f12b285dcf72a0): Starting 50 workers...
2015-03-05T06:00:38.915Z: S07: (91191d48cd0d6b3b): Executing operation AsIterable5/CreatePCollectionView
2015-03-05T06:00:38.920Z: (fd7e694921bb4b90): Value "Advertisers2.out" materialized.
2015-03-05T06:00:38.926Z: (221dd8de96cdfdbc): Value "Lineitems3.out" materialized.
2015-03-05T06:00:38.929Z: (6e68655249248499): Value "AsIterable4/CreatePCollectionView.out" materialized.
2015-03-05T06:00:38.933Z: S02: (90c9f0b8bf17cb53): Executing operation AsIterable/CreatePCollectionView
2015-03-05T06:00:38.938Z: S04: (b80a39fe501a7928): Executing operation AsIterable2/CreatePCollectionView
2015-03-05T06:00:38.949Z: S14: (221dd8de96cdf7a8): Executing operation AsIterable3/CreatePCollectionView
2015-03-05T06:00:38.954Z: (91191d48cd0d61bf): Value "AsIterable5/CreatePCollectionView.out" materialized.
2015-03-05T06:00:38.959Z: S09: (7001cd4292313be4): Executing operation AsIterable6/CreatePCollectionView
2015-03-05T06:00:38.966Z: (84f12b285dcf716e): Value "AsIterable/CreatePCollectionView.out" materialized.
2015-03-05T06:00:38.974Z: (76efcecc96a9777f): Value "AsIterable2/CreatePCollectionView.out" materialized.
2015-03-05T06:00:38.989Z: (84f12b285dcf70d5): Value "AsIterable3/CreatePCollectionView.out" materialized.
2015-03-05T06:00:38.993Z: (90c9f0b8bf17ca82): Value "AsIterable6/CreatePCollectionView.out" materialized.
2015-03-05T06:00:39.006Z: S05: (76efcecc96a97d0b): Executing operation Impressions-GCS-read+Impressions-CPT-transformation+Impressions-GSC-write
2015-03-05T06:00:39.018Z: S10: (fd7e694921bb4092): Executing operation ActiveViews-GCS-read+ActiveViews-CPT-transformation+ActiveViews-GSC-write
2015-03-05T06:00:39.018Z: S15: (84f12b285dcf7009): Executing operation Clicks-GCS-read+Clicks-CPT-transformation+Clicks-GSC-write
2015-03-05T06:03:02.735Z: (1f5324f21f91a6ba): java.lang.IllegalArgumentException: calling sideInput() with unknown view; did you forget to pass the view in ParDo.withSideInputs()?
    at com.google.cloud.dataflow.sdk.util.DoFnContext.sideInput(DoFnContext.java:107)
    at com.google.cloud.dataflow.sdk.util.DoFnProcessContext.sideInput(DoFnProcessContext.java:81)
    at com.telstra.cpt.engine.DFPDoFn.processElement(DFPDoFn.java:54)

编辑更新1

通过为转换的每个步骤创建一个新的ParDo实例,我们设法解决了IllegalArgumentException的问题。但是,现在在尝试处理侧面输入时,我们得到了一个类强制转换异常(再次是,这在本地执行时运行得很好):
2015-03-16T08:12:29.107Z: (480fb99418f6923b): java.lang.ClassCastException: com.google.api.services.bigquery.model.TableRow cannot be cast to com.google.cloud.dataflow.sdk.util.WindowedValue
    at com.google.cloud.dataflow.sdk.transforms.View$IterablePCollectionView$1.apply(View.java:192)
    at com.google.common.collect.Iterators$8.transform(Iterators.java:799)
    at com.google.common.collect.TransformedIterator.next(TransformedIterator.java:48)
    at com.telstra.cdf.dfp.DFPDoFn.buildAdvertisers(DFPDoFn.java:129)
    at com.telstra.cdf.dfp.DFPDoFn.buildSideInputs(DFPDoFn.java:80)
    at com.telstra.cdf.dfp.DFPDoFn.processElement(DFPDoFn.java:50)

职位ID:2015-03-16_01_10_11-6267129041459219709

编辑更新2

尝试根据Github上的最新资源进行构建,因为延迟将其发布到Maven。现在看来是回归错误。在查询ProcessContext的侧面输入时,它返回null

但是,和以前一样,它可以在本地完美运行。

最佳答案

造成这种问题的一个可能原因是多次使用DoFn并在两次使用之间更改其字段。例如:

MyDoFn doFn = new MyDoFn();

final PCollectionView<Iterable<String>> v1 = createView(...);
doFn.view = v1;
p.apply(ParDo.of(doFn).withSideInputs(v1));

final PCollectionView<Iterable<String>> v2 = createView(...);
doFn.view = v2;
p.apply(ParDo.of(doFn).withSideInputs(v2));

上面应该使用MyDoFn的两个实例。考虑使用构造函数和final字段使此操作更容易:
final PCollectionView<Iterable<String>> v1 = createView(...);
MyDoFn doFn1 = new MyDoFn(v1);
p.apply(ParDo.of(doFn).withSideInputs(v1));

final PCollectionView<Iterable<String>> v2 = createView(...);
MyDoFn doFn2 = new MyDoFn(v2);
p.apply(ParDo.of(doFn).withSideInputs(v2));

07-24 21:47