我试图用Beam / Java编写Dataflow作业,以处理来自Pub / Sub并写入Parquet的一系列事件。 Pub / Sub中的事件采用JSON格式,每个事件都可以生成一个或多个行。我能够编写一个非常简单的示例,编写一个只返回1条记录的ParDo转换。 ParDo看起来像这样
static class GenerateRecords extends DoFn<String, GenericRecord> {
@ProcessElement
public void processElement(ProcessContext context) {
final GenericData.Record record = new GenericData.Record(schema);
String msg = context.element();
com.tsp.de.schema.mschema pRecord = GenerateParquetRecord(msg);
context.output(pRecord);
}
}
和管道的写入部分
.apply("Write to file",
FileIO.<GenericRecord>
write()
.via(
ParquetIO.sink(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
)
.to(options.getOutputDirectory())
.withNumShards(options.getNumShards())
.withSuffix("pfile")
);
我的问题是,如何概括此ParDo转换以返回记录列表?我尝试了List,但是不起作用,ParquetIO.sink(schema)吠叫“无法通过以下方法解析方法”。
最佳答案
您可以根据需要多次调用context.output()
。因此,如果您知道在什么情况下需要发出多个记录的业务逻辑,则只需为每个输出记录调用DoFn
。它比拥有context.output(record)
容器要简单得多。
PS:顺便说一句,我对如何用PCollection
和GenericRecord
编写ParquetIO
可能有帮助。