假设我有一个PCollection<Foo>
,我想将其写入多个BigQuery表,并为每个Foo
选择一个可能不同的表。
如何使用Apache Beam BigQueryIO
API做到这一点?
最佳答案
使用最近在Apache Beam中添加到BigQueryIO
的功能,可以做到这一点。
PCollection<Foo> foos = ...;
foos.apply(BigQueryIO.write().to(new SerializableFunction<ValueInSingleWindow<Foo>, TableDestination>() {
@Override
public TableDestination apply(ValueInSingleWindow<Foo> value) {
Foo foo = value.getValue();
// Also available: value.getWindow(), getTimestamp(), getPane()
String tableSpec = ...;
String tableDescription = ...;
return new TableDestination(tableSpec, tableDescription);
}
}).withFormatFunction(new SerializableFunction<Foo, TableRow>() {
@Override
public TableRow apply(Foo foo) {
return ...;
}
}).withSchema(...));
根据输入的
PCollection<Foo>
是有界还是无界,这将在后台创建多个BigQuery导入作业(每个表一个或多个,具体取决于数据量),或者将使用BigQuery流插入API。API的最灵活版本使用
DynamicDestinations
,它允许您将不同的值写入具有不同模式的不同表,甚至允许您在所有这些计算中使用来自管道其余部分的侧面输入。此外,BigQueryIO已重构为许多可重用的转换,您可以自己组合它们以实现更复杂的用例-请参见files in the source directory。
此功能将包含在Apache Beam的第一个稳定版本和Dataflow SDK的下一个版本(它将基于Apache Beam的第一个稳定版本)中。现在,您可以通过针对来自HEAD的Beam快照在Beam上运行管道来使用此功能。
关于google-bigquery - 在Apache Beam中将不同的值写入不同的BigQuery表,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/43505534/