本文介绍了流媒体流(Apache Beam/PYTHON)上的使用问题add_value_provider_argument的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们要使用功能参数add_value_provider_argument

在不输入add_value_provider_argument ()

class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):     
        parser.add_value_provider_argument(
            '--input_topic',
            help='The Cloud Pub/Sub topic to read from.\n'
                 '"projects/<PROJECT_NAME>/topics/<TOPIC_NAME>".'
        )
        parser.add_value_provider_argument(
            '--window_size',
            type=float,
            default=1.0,
            help='Output file\'s window size in number of minutes.'
        )
        parser.add_value_provider_argument(
            '--output_path',
            help='GCS Path of the output file including filename prefix.'
        )

def run():
    pipeline_options = PipelineOptions(streaming=True, save_main_session=True)
    custom_options = pipeline_options.view_as(UserOptions)

    with beam.Pipeline(options=custom_options)as pipeline:
        print ("cecei est un test", custom_options.input_topic)
        (pipeline 
         | 'Read PubSub Messages' >> beam.io.ReadFromPubSub(topic=custom_options.input_topic.get())
         | 'Window into' >> GroupWindowsIntoBatches(custom_options.window_size.get())
         | 'Write to GCS' >> beam.ParDo(WriteBatchesToGCS(custom_options.output_path.get()))

        )               

if __name__ == '__main__':
    run()

我使用以下命令执行该文件

I execute this file with

python luckycart_check.py \
    --runner DataflowRunner \
    --project $PROJECT_NAME \
    --staging_location gs://$BUCKET_NAME/staging \
    --temp_location gs://$BUCKET_NAME/temp \
    --template_location gs://$BUCKET_NAME/templates/luckycartTEMPLATE \

,我收到以下错误消息:

and I get the following error:

 File "/home/jupyter/env/local/lib/python2.7/site-packages/apache_beam/options/value_provider.py", line 106, in get
    '%s.get() not called from a runtime context' % self)
apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: input_topic, type: str, default_value: None).get() not called from a runtime context
(env) jupyter@luckykart:~/clement/terraform/basics$ 

推荐答案

如果在创建管道时未指定--input_topic,则该管道的类型将为RuntimeValueProvider,这意味着您只能在get()值时数据流作业正在运行.这是正常的.

If you don't specify --input_topic when creating the pipeline, it will be of type RuntimeValueProvider, meaning you can only get() its value when the Dataflow job is running. This is normal.

某些转换,例如WriteToBigQuery接受ValueProvider自变量(不包含.get()).但是,ReadFromPubSub当前不接受ValueProvider参数,因为它已作为Dataflow中的本机转换实现.

Some transforms like WriteToBigQuery accept ValueProvider arguments (without the .get()). However, ReadFromPubSub does not currently accept ValueProvider arguments since it is implemented as a native transform in Dataflow.

有关使用ValueProvider创建模板的更多信息,请参阅此文档: https: //cloud.google.com/dataflow/docs/guides/templates/creating-templates

See this documentation for more on creating templates with ValueProviders: https://cloud.google.com/dataflow/docs/guides/templates/creating-templates

这篇关于流媒体流(Apache Beam/PYTHON)上的使用问题add_value_provider_argument的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-24 02:01