我正在尝试使用python数据流将一些数据从Google PubSub流式传输到BigQuery。
为了进行测试,我通过设置以下代码将https://github.com/GoogleCloudPlatform/DataflowSDK-examples/blob/master/python/dataflow_examples/cookbook/bigquery_schema.py修改为流传输管道
options.view_as(StandardOptions).streaming = True
所以然后我更改了record_ids管道以从Pub/Sub中读取
# ADDED THIS
lines = p | 'Read PubSub' >> beam.io.ReadStringsFromPubSub(INPUT_TOPIC) | beam.WindowInto(window.FixedWindows(15))
# CHANGED THIS # record_ids = p | 'CreateIDs' >> beam.Create(['1', '2', '3', '4', '5'])
record_ids = lines | 'Split' >> (beam.FlatMap(split_fn).with_output_types(unicode))
records = record_ids | 'CreateRecords' >> beam.Map(create_random_record)
records | 'Write' >> beam.io.Write(
beam.io.BigQuerySink(
OUTPUT,
schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
注意:我已被Google列入白名单以运行代码(以Alpha版本显示)
现在,当我尝试它时,我有一个错误
工作流程失败。原因:(f215df7c8fcdbb00):未知流接收器:bigquery
您可以在此处找到完整的代码:https://github.com/marcorigodanzo/gcp_streaming_test/blob/master/my_bigquery_schema.py
我认为这与流式类型的管道有关,有人可以告诉我如何在流式管道中编写bigQuery吗?
最佳答案
Beam Python不支持从流传输管道写入BigQuery。现在,您将需要使用Beam Java-您可以分别使用PubsubIO.readStrings()
和BigQueryIO.writeTableRows()
。