我目前还不熟悉在Python中使用Apache Beam和Dataflow runner。我对创建发布到Google Cloud PubSub的批处理管道很感兴趣,我修改了beampythonapi并找到了解决方案。然而,在我的探索过程中,我遇到了一些有趣的问题,这让我很好奇。
一。成功的管道
目前,我成功地从GCS批量发布数据的beam管道如下所示:
class PublishFn(beam.DoFn):
def __init__(self, topic_path):
self.topic_path = topic_path
super(self.__class__, self).__init__()
def process(self, element, **kwargs):
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient()
future = publisher.publish(self.topic_path, data=element.encode("utf-8"))
return future.result()
def run_gcs_to_pubsub(argv):
options = PipelineOptions(flags=argv)
from datapipes.common.dataflow_utils import CsvFileSource
from datapipes.protos import proto_schemas_pb2
from google.protobuf.json_format import MessageToJson
with beam.Pipeline(options=options) as p:
normalized_data = (
p |
"Read CSV from GCS" >> beam.io.Read(CsvFileSource(
"gs://bucket/path/to/file.csv")) |
"Normalize to Proto Schema" >> beam.Map(
lambda data: MessageToJson(
proto_schemas_pb2(data, proto_schemas_pb2.MySchema()),
indent=0,
preserving_proto_field_name=True)
)
)
(normalized_data |
"Write to PubSub" >> beam.ParDo(
PublishFn(topic_path="projects/my-gcp-project/topics/mytopic"))
)
2。不成功的管道
在这里,我试图让发布者共享accross
DoFn
。我尝试过以下方法。a.在DoFn中初始化发布服务器
class PublishFn(beam.DoFn):
def __init__(self, topic_path):
from google.cloud import pubsub_v1
batch_settings = pubsub_v1.types.BatchSettings(
max_bytes=1024, # One kilobyte
max_latency=1, # One second
)
self.publisher = pubsub_v1.PublisherClient(batch_settings)
self.topic_path = topic_path
super(self.__class__, self).__init__()
def process(self, element, **kwargs):
future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
return future.result()
def run_gcs_to_pubsub(argv):
... ## same as 1
b.在DoFn外部初始化发布服务器,并将其传递给DoFn
class PublishFn(beam.DoFn):
def __init__(self, publisher, topic_path):
self.publisher = publisher
self.topic_path = topic_path
super(self.__class__, self).__init__()
def process(self, element, **kwargs):
future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
return future.result()
def run_gcs_to_pubsub(argv):
.... ## same as 1
batch_settings = pubsub_v1.types.BatchSettings(
max_bytes=1024, # One kilobyte
max_latency=1, # One second
)
publisher = pubsub_v1.PublisherClient(batch_settings)
with beam.Pipeline(options=options) as p:
... # same as 1
(normalized_data |
"Write to PubSub" >> beam.ParDo(
PublishFn(publisher=publisher, topic_path="projects/my-gcp-project/topics/mytopic"))
)
两次尝试在
DoFn
方法之间共享发布服务器失败,并显示以下错误消息: File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__
和
File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__
我的问题是:
共享发布服务器实现是否会提高光束管道性能?如果是,那么我想探索这个解决方案。
为什么错误会发生在我失败的管道上?是因为初始化自定义类对象并将其传递给
process
函数之外的DoFn吗?如果是因为这个原因,我如何实现一个管道,以便能够在DoFn中重用一个自定义对象?谢谢,非常感谢你的帮助。
编辑:解决方案
好的,所以Ankur解释了为什么会出现我的问题,并讨论了如何在DoFn上进行序列化。基于这些知识,我现在了解到在DoFn中共享/重用自定义对象有两种解决方案:
使自定义对象可序列化:这允许在DoFn对象创建期间(在
__init__
下)初始化/可用该对象。此对象必须是可序列化的,因为它将在创建DoFn对象(调用__init__
)的管道提交期间被序列化。你如何做到这一点,我的回答如下。另外,我发现这个要求实际上与[1][2]中的Beam文档相关。在
__init__
之外的DoFn函数中初始化不可序列化的对象,以避免序列化,因为在管道提交期间不会调用init之外的函数。安科尔的回答解释了你如何做到这一点。参考文献:
[1]https://beam.apache.org/documentation/programming-guide/#core-beam-transforms
[2]https://beam.apache.org/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms
最佳答案
PublisherClient
无法正确腌制。更多关于腌制here。
初始化PublisherClient
方法中的process
可避免PublisherClient
的酸洗。
如果目的是重用PublisherClient
,我建议在process方法中初始化PublisherClient
,并使用以下代码将其存储在self
中。
class PublishFn(beam.DoFn):
def __init__(self, topic_path):
self.topic_path = topic_path
super(self.__class__, self).__init__()
def process(self, element, **kwargs):
if not hasattr(self, 'publish'):
from google.cloud import pubsub_v1
self.publisher = pubsub_v1.PublisherClient()
future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
return future.result()