问题描述
我能够创建一个数据流管道,该管道从pub/sub中读取数据,并在处理后将其以流模式写入大型查询.
I was able to create a dataflow pipeline which reads data from pub/sub and after processing it writes to big query in streaming mode.
现在,我希望以批处理模式运行管道,而不是流模式,以降低成本.
Now instead of stream mode i would like to run my pipeline in batch mode to reduce the costs.
当前,我的管道正在使用动态目标在bigquery中进行流式插入.我想知道是否有一种方法可以对动态目标执行批量插入操作.
Currently my pipeline is doing streaming inserts in bigquery with dynamic destinations. I would like to know if there is a way to perform a batch insert operation with dynamic destinations.
下面是
public class StarterPipeline {
public interface StarterPipelineOption extends PipelineOptions {
/**
* Set this required option to specify where to read the input.
*/
@Description("Path of the file to read from")
@Default.String(Constants.pubsub_event_pipeline_url)
String getInputFile();
void setInputFile(String value);
}
@SuppressWarnings("serial")
public static void main(String[] args) throws SocketTimeoutException {
StarterPipelineOption options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(StarterPipelineOption.class);
Pipeline p = Pipeline.create(options);
PCollection<String> datastream = p.apply("Read Events From Pubsub",
PubsubIO.readStrings().fromSubscription(Constants.pubsub_event_pipeline_url));
PCollection<String> windowed_items = datastream.apply(Window.<String>into(new GlobalWindows())
.triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(300))))
.withAllowedLateness(Duration.standardDays(10)).discardingFiredPanes());
// Write into Big Query
windowed_items.apply("Read and make event table row", new
ReadEventJson_bigquery())
.apply("Write_events_to_BQ",
BigQueryIO.writeTableRows().to(new DynamicDestinations<TableRow, String>() {
public String getDestination(ValueInSingleWindow<TableRow> element) {
String destination = EventSchemaBuilder
.fetch_destination_based_on_event(element.getValue().get("event").toString());
return destination;
}
@Override
public TableDestination getTable(String table) {
String destination =
EventSchemaBuilder.fetch_table_name_based_on_event(table);
return new TableDestination(destination, destination);
}
@Override
public TableSchema getSchema(String table) {
TableSchema table_schema =
EventSchemaBuilder.fetch_table_schema_based_on_event(table);
return table_schema;
}
}).withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));
p.run().waitUntilFinish();
log.info("Events Pipeline Job Stopped");
}
}
推荐答案
您可以使用为流作业加载文件. 插入方法部分指出,BigQueryIO.Write支持两种将数据插入使用BigQueryIO.Write.withMethod(org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method)指定的BigQuery中的方法.如果未提供任何方法,则将基于输入的PCollection选择默认方法.请参阅 BigQueryIO.Write.Method 了解有关方法的更多信息.
You can limit the costs by using file loads for Streaming jobs. The Insertion Method section states that BigQueryIO.Write supports two methods of inserting data into BigQuery specified using BigQueryIO.Write.withMethod (org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method). If no method is supplied, then a default method will be chosen based on the input PCollection. See BigQueryIO.Write.Method for more information about the methods.
不同的插入方法提供了成本,配额和数据一致性的不同折衷.请参阅 BigQuery文档了解有关这些权衡的更多信息.
The different insertion methods provide different tradeoffs of cost, quota, and data consistency. Please see BigQuery documentation for more information about these tradeoffs.
这篇关于谷歌云数据流-在BigQuery中批量插入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!