想象一下简单的Google Dataflow管道。在此管道中,您使用apache beam函数从BQ中读取数据,并且必须根据返回的pcollection更新这些行

Journeys = (p
                    | 'Read from BQ' >> beam.io.Read(
                    beam.io.BigQuerySource(query=query, dataset="dataset", use_standard_sql=True)))

Update = ( Journeys
                   | 'Updating Journey Table' >> beam.Map(UpdateBQ))

Write = (Journeys
                    | 'Write transform to BigQuery' >> WriteToBigQuery('table', TABLE_SCHEMA_CANONICAL))


该管道的问题是,当您读取表(beam.Map)时,对返回的pcollection中的每个项目执行UpdateBQ。



哪一种是对BigQuery表执行更新的更好方法?

我想这可以不用使用beam.Map而完成,仅执行并立即更新哪个进程的所有输入过程。



额外

def UpdateBQ(input):
    from google.cloud import bigquery
    import uuid
    import time
    client = bigquery.Client()
    STD = "#standardSQL"
    QUERY = STD + "\n" + """UPDATE table SET Field= 'YYY' WHERE Field2='XXX'"""
    client.use_legacy_sql = False
    query_job = client.run_async_query(query=QUERY, job_name='temp-query-job_{}'.format(uuid.uuid4()))  # API request
    query_job.begin()
    <...>




可能的解决方案

with beam.Pipeline(options=options) as p:
    Journeys = (p
                | 'Read from BQ' >> beam.io.Read(
                beam.io.BigQuerySource(query=query, dataset="dataset", use_standard_sql=True))
                )

    Write = (Journeys
                | 'Write transform to BigQuery' >> WriteToBigQuery('table', TABLE_SCHEMA_CANONICAL))


UpdateBQ();

最佳答案

从BQ读取后,您是否正在使用光束管道做任何进一步的转换?还是只是您在代码中显示的方式,即从BQ读取然后在BQ中触发更新命令?在这种情况下,您根本不需要光束。只需使用BQ查询来更新使用另一个表的表中的数据。 BQ best practices建议避免一次插入/更新单行。

10-02 07:38