本文介绍了AttributeError: '_DoFnParam' 对象没有属性 'start' [在运行 'Write to GCS-ptransform-146']的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我运行 Beam 程序时,出现以下错误.

2021-05-20T17:04:42.166994441Z 来自工人的错误消息:generic::unknown:回溯(最近一次调用最后一次):文件apache_beam/runners/common.py",第 1233 行,在 apache_beam.runners.common.DoFnRunner.process文件apache_beam/runners/common.py",第 762 行,在 apache_beam.runners.common.PerWindowInvoker.invoke_process文件apache_beam/runners/common.py",第 887 行,位于 apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window文件first.py",第 68 行,正在处理 AttributeError: '_DoFnParam' object has no attribute 'start'在处理上述异常的过程中,又发生了一个异常:回溯(最近一次调用最后一次):文件/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",第289行,_execute response = task()文件/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",第362行,在<lambda>lambda: self.create_worker().do_instruction(request), request)文件/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",第607行,在do_instruction getattr(request, request_type), request.instruction_id)文件/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",第 644 行,在 process_bundle bundle_processor.process_bundle(instruction_id))文件/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",第 1001 行,在 process_bundle element.data 中)文件/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",第229行,在process_encoded self.output(decoded_value)文件apache_beam/runners/worker/operations.py",第 356 行,在 apache_beam.runners.worker.operations.Operation.output 中文件apache_beam/runners/worker/operations.py",第 358 行,在 apache_beam.runners.worker.operations.Operation.output 中文件apache_beam/runners/worker/operations.py",第 220 行,在 apache_beam.runners.worker.operations.SingletonConsumerSet.receive文件apache_beam/runners/worker/operations.py",第 717 行,位于 apache_beam.runners.worker.operations.DoOperation.process文件apache_beam/runners/worker/operations.py",第 718 行,在 apache_beam.runners.worker.operations.DoOperation.process文件apache_beam/runners/common.py",第 1235 行,在 apache_beam.runners.common.DoFnRunner.process文件apache_beam/runners/common.py",第 1315 行,位于 apache_beam.runners.common.DoFnRunner._reraise_augmented文件/usr/local/lib/python3.7/site-packages/future/utils/__init__.py",第446行,在 raise_with_traceback raise exc.with_traceback(traceback)文件apache_beam/runners/common.py",第 1233 行,在 apache_beam.runners.common.DoFnRunner.process文件apache_beam/runners/common.py",第 762 行,在 apache_beam.runners.common.PerWindowInvoker.invoke_process文件apache_beam/runners/common.py",第 887 行,位于 apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window文件first.py",第 68 行,正在处理 AttributeError: '_DoFnParam' 对象没有属性 'start' [同时运行 'Write to GCS-ptransform-146']

代码:

导入 argparse导入日志随机导入从日期时间导入日期时间导入 apache_beam 作为梁从 apache_beam 导入 DoFn、GroupByKey、io、ParDo、Pipeline、PTransform、WindowInto、WithKeys从 apache_beam.options.pipeline_options 导入 PipelineOptions从 apache_beam.transforms.window 导入 FixedWindows类 CustomPipelineOptions(PipelineOptions):@类方法def _add_argparse_args(cls, 解析器):parser.add_value_provider_argument(--output_path",类型=字符串,help=包含前缀的输出 GCS 文件的路径.",)类 WriteToGCS(DoFn):def __init__(self, output_path):self.output_path = output_path定义过程(self,custom_options,output_path,window=DoFn.WindowParam):""""将消息批量写入 Google Cloud Storage."""ts_format = "%H:%M";window_start = window.start.to_utc_datetime().strftime(ts_format)window_end = window.end.to_utc_datetime().strftime(ts_format)output_path = custom_options.output_path.get()文件名 = "-".join([output_path, window_start, window_end, str(shard_id)])使用 io.gcsio.GcsIO().open(filename=filename, mode=w") 作为 f:批量处理 message_body:f.write("{}\n".format(message_body).encode("utf-8"))def run(input_topic, num_shards, window_size):全局 custom_options# 将 `save_main_session` 设置为 True,以便 DoFns 可以访问全局导入的模块.管道选项 = 管道选项(管道参数,流=真,save_main_session=真)custom_options = pipeline_options.view_as(CustomPipelineOptions)使用 Pipeline(options=pipeline_options) 作为管道:(管道|从 Pub/Sub 读取">>io.ReadFromPubSub(topic=input_topic)|写入 GCS">>ParDo(WriteToGCS(custom_options.output_path)))如果 __name__ == '__main__':解析器 = argparse.ArgumentParser()parser.add_argument(--input_topic",help=要读取的 Cloud Pub/Sub 主题."'"projects//topics/".',)parser.add_argument(--num_shards",默认值=5,类型=整数,help=将窗口元素写入 GCS 时使用的分片数.",)parser.add_argument(--window_size",默认值=1,类型=整数,help="输出文件的窗口大小,以分钟为单位.",)known_args, pipeline_args = parser.parse_known_args()跑(known_args.input_topic,known_args.num_shards,known_args.window_size)
解决方案

在你的 WriteToGCS DoFn 中,你在 process 方法中声明 DoFn 将采用 arg custom_optionsoutput_path,但是它们没有默认值,而且似乎参数转换会将 WindowParam 映射到错误的 arg 上.

您需要从 process 方法中删除未使用的参数以使 DoFn 参数转换正确,可以在 process fn 中从 **kwargs 传递和检索其他参数.


2021-05-20T17:04:42.166994441ZError message from worker: generic::unknown: 
Traceback (most recent call last): 
File "apache_beam/runners/common.py", line 1233, in apache_beam.runners.common.DoFnRunner.process 
File "apache_beam/runners/common.py", line 762, in apache_beam.runners.common.PerWindowInvoker.invoke_process 
File "apache_beam/runners/common.py", line 887, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window 
File "first.py", line 68, in process AttributeError: '_DoFnParam' object has no attribute 'start' 
During handling of the above exception, another exception occurred: 
Traceback (most recent call last): 
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute response = task() 
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda> lambda: self.create_worker().do_instruction(request), request) 
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction getattr(request, request_type), request.instruction_id) 
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle bundle_processor.process_bundle(instruction_id)) 
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1001, in process_bundle element.data) 
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 229, in process_encoded self.output(decoded_value) 
File "apache_beam/runners/worker/operations.py", line 356, in apache_beam.runners.worker.operations.Operation.output 
File "apache_beam/runners/worker/operations.py", line 358, in apache_beam.runners.worker.operations.Operation.output 
File "apache_beam/runners/worker/operations.py", line 220, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive 
File "apache_beam/runners/worker/operations.py", line 717, in apache_beam.runners.worker.operations.DoOperation.process 
File "apache_beam/runners/worker/operations.py", line 718, in apache_beam.runners.worker.operations.DoOperation.process 
File "apache_beam/runners/common.py", line 1235, in apache_beam.runners.common.DoFnRunner.process 
File "apache_beam/runners/common.py", line 1315, in apache_beam.runners.common.DoFnRunner._reraise_augmented 
File "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback) 
File "apache_beam/runners/common.py", line 1233, in apache_beam.runners.common.DoFnRunner.process 
File "apache_beam/runners/common.py", line 762, in apache_beam.runners.common.PerWindowInvoker.invoke_process 
File "apache_beam/runners/common.py", line 887, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window 
File "first.py", line 68, in process AttributeError: '_DoFnParam' object has no attribute 'start' [while running 'Write to GCS-ptransform-146']

Code:

import argparse
import logging
import random
from datetime import datetime

import apache_beam as beam
from apache_beam import DoFn, GroupByKey, io, ParDo, Pipeline, PTransform, WindowInto, WithKeys
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows

class CustomPipelineOptions(PipelineOptions):

    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            "--output_path",
            type=str,
            help="Path of the output GCS file including the prefix.",
        )
class WriteToGCS(DoFn):
    def __init__(self, output_path):
        self.output_path = output_path
    def process(self, custom_options, output_path, window=DoFn.WindowParam):
        """Write messages in a batch to Google Cloud Storage."""

        ts_format = "%H:%M"
        window_start = window.start.to_utc_datetime().strftime(ts_format)
        window_end = window.end.to_utc_datetime().strftime(ts_format)
        output_path = custom_options.output_path.get()
        filename = "-".join([output_path, window_start, window_end, str(shard_id)])

        with io.gcsio.GcsIO().open(filename=filename, mode="w") as f:
            for message_body in batch:
                f.write("{}\n".format(message_body).encode("utf-8"))

def run(input_topic, num_shards, window_size):

    global custom_options
    # Set `save_main_session` to True so DoFns can access globally imported modules.
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=True
    )

    custom_options = pipeline_options.view_as(CustomPipelineOptions)
    
    with Pipeline(options=pipeline_options) as pipeline:
        (
            pipeline
            | "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic)
            | "Write to GCS" >> ParDo(WriteToGCS(custom_options.output_path))
        )

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    
    parser.add_argument(
            "--input_topic",
            help="The Cloud Pub/Sub topic to read from."
            '"projects/<PROJECT_ID>/topics/<TOPIC_ID>".',
        )
    parser.add_argument(
            "--num_shards",
            default=5,
            type=int,
            help="Number of shards to use when writing windowed elements to GCS.",
        )
    parser.add_argument(
            "--window_size",
            default=1,
            type=int,
            help="Output file's window size in minutes.",
        )
    known_args, pipeline_args = parser.parse_known_args()

    run(
        known_args.input_topic,
        known_args.num_shards,
        known_args.window_size
        )
解决方案

In your WriteToGCS DoFn, you declared in the process method that the DoFn will take arg custom_options and output_path, however the they do not have default values and it seems the parameter translation will map the WindowParam onto the wrong arg.

you'll need to remove the unused parameters from the process method to make the DoFn parameter translation correct, additional parameters can be passed and retrieved from **kwargs in process fn.

这篇关于AttributeError: '_DoFnParam' 对象没有属性 'start' [在运行 'Write to GCS-ptransform-146']的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-24 02:00