我有一个可以在本地执行而没有任何错误的管道。我曾经在本地运行的管道中收到此错误

    'Clients have non-trivial state that is local and unpickleable.'
     PicklingError: Pickling client objects is explicitly not supported.

我相信我已通过降级为apache-beam = 2.3.0来解决此问题
然后在本地它将完美运行。

现在,我正在使用 DataflowRunner ,并且在requirements.txt文件中,我具有以下依赖性
    apache-beam==2.3.0
    google-cloud-bigquery==1.1.0
    google-cloud-core==0.28.1
    google-cloud-datastore==1.6.0
    google-cloud-storage==1.10.0
    protobuf==3.5.2.post1
    pytz==2013.7

但是我又得到了这个可怕的错误
    'Clients have non-trivial state that is local and unpickleable.'
     PicklingError: Pickling client objects is explicitly not supported.

怎么会给我DataflowRunner错误而不是DirectRunner错误?他们不应该使用相同的依赖项/环境吗?
任何帮助,将不胜感激。

我读过,这是解决问题的方法,但是当我尝试时,仍然遇到相同的错误
    class MyDoFn(beam.DoFn):

        def start_bundle(self, process_context):
            self._dsclient = datastore.Client()

        def process(self, context, *args, **kwargs):
        # do stuff with self._dsclient

来自https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3191

我以前的引用文章中我在本地修复此问题:

Using start_bundle() in apache-beam job not working. Unpickleable storage.Client()

提前致谢!

最佳答案

start_bundle方法初始化无法挑剔的客户端是正确的方法,Beam IO经常遵循这种方法,请以datastoreio.py为例。这是使用DoFn中的GCS python客户端执行简单操作的管道。我在Apache Beam 2.16.0上运行了它,没有任何问题。如果您仍然可以重现问题,请提供其他详细信息。

gcs_client.py文件:

import argparse
import logging
import time

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import storage

class MyDoFn(beam.DoFn):
  def start_bundle(self):
    self.storage_client = storage.Client()

  def process(self, element):
    bucket = self.storage_client.get_bucket("existing-gcs-bucket")
    blob = bucket.blob(str(int(time.time())))
    blob.upload_from_string("payload")
    return element

logging.getLogger().setLevel(logging.INFO)
_, options = argparse.ArgumentParser().parse_known_args()

pipeline_options = PipelineOptions(options)
p = beam.Pipeline(options=pipeline_options)
_ = p | beam.Create([None]) | beam.ParDo(MyDoFn())

p.run().wait_until_finish()

requirements.txt文件:
google-cloud-storage==1.23.0

命令行:
python -m gcs_client \
    --project=insert_your_project \
    --runner=DataflowRunner \
    --temp_location gs://existing-gcs-bucket/temp/ \
    --requirements_file=requirements.txt \
    --save_main_session

10-07 19:13
查看更多