本文介绍了阿帕奇光束到BigQuery的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在Google Cloud数据流中构建一个流程,该流程将使用发布/订阅中的消息,并基于一个键的值将它们写入BQ或GCS。我能够拆分消息,但我不确定如何将数据写入BigQuery。我已尝试使用beam.io.gcp.bigquery.WriteToBigQuery,但没有成功。

我的完整代码在这里:https://pastebin.com/4W9Vu4Km

基本上我的问题是我不知道如何在WriteBatchesToBQ(第73行)中指定变量element应该写入BQ。

我还尝试在管道中直接使用beam.io.gcp.bigquery.WriteToBigQuery(第128行),但随后收到错误AttributeError: 'list' object has no attribute 'items' [while running 'Write to BQ/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)'] 。这可能是因为我向它提供的不是词典,而是词典列表(我希望使用1分钟窗口)。

有什么想法吗?(另外,如果代码中有一些太愚蠢的地方,请告诉我-我只使用了很短的时间,我可能忽略了一些明显的问题)。

推荐答案

第二种方法是解决这个问题,您需要直接在管道中使用WriteToBigQuery函数。但是,需要包括beam.FlatMap步骤,以便WriteToBigQuery能够正确处理词典列表。

因此,完整的管道拆分数据按时间分组并写入BQ的定义如下:

 accepted_messages = tagged_lines_result[Split.OUTPUT_TAG_BQ] | "Window into BQ" >> GroupWindowsIntoBatches(
            window_size) | "FlatMap" >> beam.FlatMap(
            lambda elements: elements) | "Write to BQ" >> beam.io.gcp.bigquery.WriteToBigQuery(table=output_table_bq,
                                                                                               schema=(
                                                                                                   output_table_bq_schema),
                                                                                               write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                                                                                               create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)

完整的工作代码在此处:https://pastebin.com/WFwBvPcU

这篇关于阿帕奇光束到BigQuery的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-20 13:54