我正在尝试使用DataFlow(Java)将数据从Cloud Storage插入到Big Query。我可以批量上传数据;但是,我想设置一个流媒体上传。因此,随着将新对象添加到我的存储桶中,它们将被推送到BigQuery。

我已将PipelineOptions设置为Streaming,它在GCP控制台UI中显示了数据流管道属于流式类型。存储桶中我最初的文件/对象集被推送到BigQuery。

但是,当我向存储桶中添加新对象时,这些对象不会被推送到BigQuery。这是为什么?如何使用蒸腾的数据流管道将添加到Cloud Storage中的对象推送到BigQuery?

//Specify PipelineOptions
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);


  options.setProject(<project-name>);
  options.setStagingLocation(<bucket/staging folder>);
  options.setStreaming(true);
  options.setRunner(DataflowRunner.class);


我的解释是,因为这是一条流传输管道,所以当我将对象添加到Cloud Storage时,它们将被推送到BigQuery。

请提出建议。

最佳答案

您如何创建输入集合?您需要无限制的输入,流传输管道才能继续运行,否则它将只是临时的(但将使用流传输插入)。
您可以通过读取包含存储桶中所有更改的订阅来实现此目的,有关详细信息,请参见https://cloud.google.com/storage/docs/pubsub-notifications

10-07 18:46