接收器流式传输管道

接收器流式传输管道

本文介绍了使用 Python 中的 BigQuery 接收器流式传输管道的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在构建一个 apache 光束流管道,其源是 Pubsub,接收器是 BigQuery.我收到了错误消息:

I'm building an apache beam streaming pipeline whose source is Pubsub and sink is BigQuery. I've gotten the error messsage:

工作流程失败.原因:未知的消息代码."

"Workflow failed. Causes: Unknown message code."

尽管这条消息很神秘,但我现在相信 BigQuery 不支持作为流管道的接收器,它在这里说:从 Pub/Sub 流式传输到 BigQuery

As cryptic as this message is I now believe it to be the case that BigQuery is not supported as a sink for streaming pipelines, it says this here:Streaming from Pub/Sub to BigQuery

我确定这是导致问题的原因吗?或者,如果不是,无论如何它仍然不受支持?

Am I certainly correct that this is what's causing the problem? Or if not is it still not supported in any case?

谁能暗示这个功能什么时候发布?很遗憾,我很高兴能使用它.

Can anyone hint at when this feature will be released? It's a shame, I was pretty excited to get using this.

推荐答案

Python Streaming 管道从 Beam 2.5.0 开始在实验中可用,如 Beam 文档中所述 这里

Python Streaming pipelines are experimentally available since Beam 2.5.0 as documented in beam docs here

因此您需要安装 apache-beam 2.5.0 和 apache-beam[gcp]

Therefore you will need to install apache-beam 2.5.0 and apache-beam[gcp]

pip install apache-beam==2.5.0
pip install apache-beam[gcp]

我运行了这个命令:

python pubsub_to_bq.py --runner DataflowRunner --input_topic=projects/pubsub-public-data/topics/taxirides-realtime --project <my-project> --temp_location gs://<my-bucket>/tmp --staging_location gs://<my-bucket>/staging --streaming

使用下面的代码,它工作正常:

Using the code below, and it works alright:

from __future__ import absolute_import

import argparse
import logging

import apache_beam as beam

def parse_pubsub(line):
    import json
    record = json.loads(line)
    return (record['ride_id']), (record['point_idx']), (record['latitude']), (record['longitude']), (record['timestamp']), (record['meter_increment']), (record['ride_status']), (record['meter_reading']), (record['passenger_count'])

def run(argv=None):
  """Build and run the pipeline."""

  parser = argparse.ArgumentParser()
  parser.add_argument(
      '--input_topic', dest='input_topic', required=True,
      help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
  known_args, pipeline_args = parser.parse_known_args(argv)

  with beam.Pipeline(argv=pipeline_args) as p:

    # Read from PubSub
    lines = p | beam.io.ReadFromPubSub(known_args.input_topic)
    #Adapt messages from PubSub to BQ table
    lines = lines | beam.Map(parse_pubsub)
    lines = lines | beam.Map(lambda (ride_id, point_idx, latitude, longitude, timestamp, meter_increment, ride_status,meter_reading, passenger_count): {'ride_id':ride_id, 'point_idx':point_idx, 'latitude':latitude, 'longitude':longitude, 'timestamp':timestamp, 'meter_increment':meter_increment,'ride_status': ride_status,'meter_reading':meter_reading,'passenger_count': passenger_count})
    #Write to a BQ table
    lines | beam.io.WriteToBigQuery(table ='<my-table>',dataset='<my-dataset>',project='<my-project>' )

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

此代码使用 公开主题 "--topicproject/pubsub-public-data/topics/taxirides-realtime"和我使用正确架构创建的 BQ 表.

This code uses the publicly available topic "--topicprojects/pubsub-public-data/topics/taxirides-realtime" and BQ table that I have created with the right schema.

如果您使用此示例,请小心不要让它一直运行,否则您会产生成本,因为您会收到来自此 PubSub 主题的大量消息.

If you use this example be careful not leaving it running or you will incur into costs as you will receive a lot messages coming from this PubSub topic.

这篇关于使用 Python 中的 BigQuery 接收器流式传输管道的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-22 12:28