问题描述
我正在Google Cloud数据流中构建一个流程,该流程将使用发布/订阅中的消息,并基于一个键的值将它们写入BQ或GCS。我能够拆分消息,但我不确定如何将数据写入BigQuery。我已尝试使用beam.io.gcp.bigquery.WriteToBigQuery
,但没有成功。
我的完整代码在这里:https://pastebin.com/4W9Vu4Km
基本上我的问题是我不知道如何在WriteBatchesToBQ
(第73行)中指定变量element
应该写入BQ。
我还尝试在管道中直接使用beam.io.gcp.bigquery.WriteToBigQuery
(第128行),但随后收到错误AttributeError: 'list' object has no attribute 'items' [while running 'Write to BQ/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)']
。这可能是因为我向它提供的不是词典,而是词典列表(我希望使用1分钟窗口)。
有什么想法吗?(另外,如果代码中有一些太愚蠢的地方,请告诉我-我只使用了很短的时间,我可能忽略了一些明显的问题)。
推荐答案
第二种方法是解决这个问题,您需要直接在管道中使用WriteToBigQuery函数。但是,需要包括beam.FlatMap步骤,以便WriteToBigQuery能够正确处理词典列表。
因此,完整的管道拆分数据按时间分组并写入BQ的定义如下:
accepted_messages = tagged_lines_result[Split.OUTPUT_TAG_BQ] | "Window into BQ" >> GroupWindowsIntoBatches(
window_size) | "FlatMap" >> beam.FlatMap(
lambda elements: elements) | "Write to BQ" >> beam.io.gcp.bigquery.WriteToBigQuery(table=output_table_bq,
schema=(
output_table_bq_schema),
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
完整的工作代码在此处:https://pastebin.com/WFwBvPcU
这篇关于阿帕奇光束到BigQuery的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!