我目前还不熟悉在Python中使用Apache Beam和Dataflow runner。我对创建发布到Google Cloud PubSub的批处理管道很感兴趣,我修改了beampythonapi并找到了解决方案。然而,在我的探索过程中,我遇到了一些有趣的问题,这让我很好奇。
一。成功的管道
目前,我成功地从GCS批量发布数据的beam管道如下所示:

class PublishFn(beam.DoFn):
    def __init__(self, topic_path):
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        from google.cloud import pubsub_v1
        publisher = pubsub_v1.PublisherClient()
        future = publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()


def run_gcs_to_pubsub(argv):
    options = PipelineOptions(flags=argv)

    from datapipes.common.dataflow_utils import CsvFileSource
    from datapipes.protos import proto_schemas_pb2
    from google.protobuf.json_format import MessageToJson

    with beam.Pipeline(options=options) as p:
        normalized_data = (
                p |
                "Read CSV from GCS" >> beam.io.Read(CsvFileSource(
                    "gs://bucket/path/to/file.csv")) |
                "Normalize to Proto Schema" >> beam.Map(
                        lambda data: MessageToJson(
                            proto_schemas_pb2(data, proto_schemas_pb2.MySchema()),
                            indent=0,
                            preserving_proto_field_name=True)
                    )
        )
        (normalized_data |
            "Write to PubSub" >> beam.ParDo(
                    PublishFn(topic_path="projects/my-gcp-project/topics/mytopic"))
            )

2。不成功的管道
在这里,我试图让发布者共享accrossDoFn。我尝试过以下方法。
a.在DoFn中初始化发布服务器
class PublishFn(beam.DoFn):
    def __init__(self, topic_path):
        from google.cloud import pubsub_v1

        batch_settings = pubsub_v1.types.BatchSettings(
             max_bytes=1024,  # One kilobyte
             max_latency=1,  # One second
         )
        self.publisher = pubsub_v1.PublisherClient(batch_settings)
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()

def run_gcs_to_pubsub(argv):
    ... ## same as 1

b.在DoFn外部初始化发布服务器,并将其传递给DoFn
class PublishFn(beam.DoFn):
    def __init__(self, publisher, topic_path):
        self.publisher = publisher
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()


def run_gcs_to_pubsub(argv):
    .... ## same as 1

    batch_settings = pubsub_v1.types.BatchSettings(
        max_bytes=1024,  # One kilobyte
        max_latency=1,  # One second
    )
    publisher = pubsub_v1.PublisherClient(batch_settings)

    with beam.Pipeline(options=options) as p:
        ... # same as 1
        (normalized_data |
            "Write to PubSub" >> beam.ParDo(
                PublishFn(publisher=publisher, topic_path="projects/my-gcp-project/topics/mytopic"))
        )

两次尝试在DoFn方法之间共享发布服务器失败,并显示以下错误消息:
  File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__


  File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__

我的问题是:
共享发布服务器实现是否会提高光束管道性能?如果是,那么我想探索这个解决方案。
为什么错误会发生在我失败的管道上?是因为初始化自定义类对象并将其传递给process函数之外的DoFn吗?如果是因为这个原因,我如何实现一个管道,以便能够在DoFn中重用一个自定义对象?
谢谢,非常感谢你的帮助。
编辑:解决方案
好的,所以Ankur解释了为什么会出现我的问题,并讨论了如何在DoFn上进行序列化。基于这些知识,我现在了解到在DoFn中共享/重用自定义对象有两种解决方案:
使自定义对象可序列化:这允许在DoFn对象创建期间(在__init__下)初始化/可用该对象。此对象必须是可序列化的,因为它将在创建DoFn对象(调用__init__)的管道提交期间被序列化。你如何做到这一点,我的回答如下。另外,我发现这个要求实际上与[1][2]中的Beam文档相关。
__init__之外的DoFn函数中初始化不可序列化的对象,以避免序列化,因为在管道提交期间不会调用init之外的函数。安科尔的回答解释了你如何做到这一点。
参考文献:
[1]https://beam.apache.org/documentation/programming-guide/#core-beam-transforms
[2]https://beam.apache.org/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms

最佳答案

PublisherClient无法正确腌制。更多关于腌制here
初始化PublisherClient方法中的process可避免PublisherClient的酸洗。
如果目的是重用PublisherClient,我建议在process方法中初始化PublisherClient,并使用以下代码将其存储在self中。

class PublishFn(beam.DoFn):
    def __init__(self, topic_path):
        self.topic_path = topic_path
        super(self.__class__, self).__init__()

    def process(self, element, **kwargs):
        if not hasattr(self, 'publish'):
            from google.cloud import pubsub_v1
            self.publisher = pubsub_v1.PublisherClient()
        future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
        return future.result()

07-24 21:45