我正在尝试使用DataFlow(Java)将数据从Cloud Storage插入到Big Query。我可以批量上传数据;但是,我想设置一个流媒体上传。因此,随着将新对象添加到我的存储桶中,它们将被推送到BigQuery。
我已将PipelineOptions设置为Streaming,它在GCP控制台UI中显示了数据流管道属于流式类型。存储桶中我最初的文件/对象集被推送到BigQuery。
但是,当我向存储桶中添加新对象时,这些对象不会被推送到BigQuery。这是为什么?如何使用蒸腾的数据流管道将添加到Cloud Storage中的对象推送到BigQuery?
//Specify PipelineOptions
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setProject(<project-name>);
options.setStagingLocation(<bucket/staging folder>);
options.setStreaming(true);
options.setRunner(DataflowRunner.class);
我的解释是,因为这是一条流传输管道,所以当我将对象添加到Cloud Storage时,它们将被推送到BigQuery。
请提出建议。
最佳答案
您如何创建输入集合?您需要无限制的输入,流传输管道才能继续运行,否则它将只是临时的(但将使用流传输插入)。
您可以通过读取包含存储桶中所有更改的订阅来实现此目的,有关详细信息,请参见https://cloud.google.com/storage/docs/pubsub-notifications。