我正在尝试将PTransform应用于PCollectionTuple,但无法弄清编译器为何抱怨。
我想这样做是为了将连接某些csv行所需的多个步骤抽象为一个PTransform(PCollectionTuple中的每个PCollection都包含要连接的csv行),而我遇到的问题不是连接本身,而是如何将PTransform应用于PCollectionTuple。
这是我的代码:
static class JoinCsvLines extends DoFn<PCollectionTuple, String[]> {
@ProcessElement
public void processElement(ProcessContext context) {
PCollectionTuple element = context.element();
// TODO: Implement the output
}
}
我这样称呼PTransform:
TupleTag<String[]> tag1 = new TupleTag<>();
TupleTag<String[]> tag2 = new TupleTag<>();
PCollectionTuple toJoin = PCollectionTuple.of(tag1, csvLines1).and(tag2, csvLines2);
// Can't compile this line
PCollection<String[]> joinedLines = toJoin.apply("JoinLines", ParDo.of(new JoinCsvLines()));
当我将鼠标悬停在未编译的行上方时,IntelliJ IDEA将输出以下内容:
Required type:
PTransform
<? super PCollectionTuple,
OutputT>
Provided:
SingleOutput
<PCollectionTuple,
String[]>
reason: no instance(s) of type variable(s) InputT exist so that PCollectionTuple conforms to PCollection<? extends InputT>
如何将PTransform应用于PCollectionTuple?
最佳答案
DoFn<PCollectionTuple, String[]>
表示您希望为每个记录应用“ DoFn”,因此您不应使用PCollectionTuple作为输入类型。相反,您应该使用“ csvLines1”和“ csvLines2”的类型。
如果您打算合并两个PCollection,则可以检查Flatten转换:https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java#L41
关于java - 如何在Apache Beam中将DoFn PTransform应用于PCollectionTuple,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/60491804/