如果我的 class 扩展DoFn,如何访问侧面输入的元素?
例如:
假设我有一个ParDo转换,例如:
PCollection<String> data = myData.apply("Get data",
ParDo.of(new MyClass()).withSideInputs(myDataView));
我有一堂课:
static class MyClass extends DoFn<String,String>
{
//How to access side input here
}
c.sideInput()在这种情况下不起作用。
谢谢。
最佳答案
在这种情况下,问题在于DoFn中的processElement
方法无权访问main方法中的PCollectionView实例。
您可以在构造函数中将PCollectionView传递给DoFn:
class MyClass extends DoFn<String,String>
{
private final PCollectionView<..> mySideInput;
public MyClass(PCollectionView<..> mySideInput) {
// List, or Map or anything:
this.mySideInput = mySideInput;
}
@ProcessElement
public void processElement(ProcessContext c) throws IOException
{
// List or Map or any type you need:
List<..> sideInputList = c.sideInput(mySideInput);
}
}
对此的解释是,当您使用匿名DoFn时,该处理方法将具有一个闭包,该闭包可访问封装DoFn的范围内的所有对象(其中包括PCollectionView)。当您不使用匿名DoFn时,没有闭包,并且您需要另一种传递PCollectionView的方法。