假设我有一个PCollection<Foo>,我想将其写入多个BigQuery表,并为每个Foo选择一个可能不同的表。

如何使用Apache Beam BigQueryIO API做到这一点?

最佳答案

使用最近在Apache Beam中添加到BigQueryIO的功能,可以做到这一点。

PCollection<Foo> foos = ...;
foos.apply(BigQueryIO.write().to(new SerializableFunction<ValueInSingleWindow<Foo>, TableDestination>() {
  @Override
  public TableDestination apply(ValueInSingleWindow<Foo> value) {
    Foo foo = value.getValue();
    // Also available: value.getWindow(), getTimestamp(), getPane()
    String tableSpec = ...;
    String tableDescription = ...;
    return new TableDestination(tableSpec, tableDescription);
  }
}).withFormatFunction(new SerializableFunction<Foo, TableRow>() {
  @Override
  public TableRow apply(Foo foo) {
    return ...;
  }
}).withSchema(...));

根据输入的PCollection<Foo>是有界还是无界,这将在后台创建多个BigQuery导入作业(每个表一个或多个,具体取决于数据量),或者将使用BigQuery流插入API。

API的最灵活版本使用DynamicDestinations,它允许您将不同的值写入具有不同模式的不同表,甚至允许您在所有这些计算中使用来自管道其余部分的侧面输入。

此外,BigQueryIO已重构为许多可重用的转换,您可以自己组合它们以实现更复杂的用例-请参见files in the source directory

此功能将包含在Apache Beam的第一个稳定版本和Dataflow SDK的下一个版本(它将基于Apache Beam的第一个稳定版本)中。现在,您可以通过针对来自HEAD的Beam快照在Beam上运行管道来使用此功能。

关于google-bigquery - 在Apache Beam中将不同的值写入不同的BigQuery表,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/43505534/

10-09 20:32