问题描述
我们要使用功能参数add_value_provider_argument
在不输入add_value_provider_argument ()
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--input_topic',
help='The Cloud Pub/Sub topic to read from.\n'
'"projects/<PROJECT_NAME>/topics/<TOPIC_NAME>".'
)
parser.add_value_provider_argument(
'--window_size',
type=float,
default=1.0,
help='Output file\'s window size in number of minutes.'
)
parser.add_value_provider_argument(
'--output_path',
help='GCS Path of the output file including filename prefix.'
)
def run():
pipeline_options = PipelineOptions(streaming=True, save_main_session=True)
custom_options = pipeline_options.view_as(UserOptions)
with beam.Pipeline(options=custom_options)as pipeline:
print ("cecei est un test", custom_options.input_topic)
(pipeline
| 'Read PubSub Messages' >> beam.io.ReadFromPubSub(topic=custom_options.input_topic.get())
| 'Window into' >> GroupWindowsIntoBatches(custom_options.window_size.get())
| 'Write to GCS' >> beam.ParDo(WriteBatchesToGCS(custom_options.output_path.get()))
)
if __name__ == '__main__':
run()
我使用以下命令执行该文件
I execute this file with
python luckycart_check.py \
--runner DataflowRunner \
--project $PROJECT_NAME \
--staging_location gs://$BUCKET_NAME/staging \
--temp_location gs://$BUCKET_NAME/temp \
--template_location gs://$BUCKET_NAME/templates/luckycartTEMPLATE \
,我收到以下错误消息:
and I get the following error:
File "/home/jupyter/env/local/lib/python2.7/site-packages/apache_beam/options/value_provider.py", line 106, in get
'%s.get() not called from a runtime context' % self)
apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: input_topic, type: str, default_value: None).get() not called from a runtime context
(env) jupyter@luckykart:~/clement/terraform/basics$
推荐答案
如果在创建管道时未指定--input_topic
,则该管道的类型将为RuntimeValueProvider
,这意味着您只能在get()
值时数据流作业正在运行.这是正常的.
If you don't specify --input_topic
when creating the pipeline, it will be of type RuntimeValueProvider
, meaning you can only get()
its value when the Dataflow job is running. This is normal.
某些转换,例如WriteToBigQuery
接受ValueProvider
自变量(不包含.get()
).但是,ReadFromPubSub
当前不接受ValueProvider
参数,因为它已作为Dataflow中的本机转换实现.
Some transforms like WriteToBigQuery
accept ValueProvider
arguments (without the .get()
). However, ReadFromPubSub
does not currently accept ValueProvider
arguments since it is implemented as a native transform in Dataflow.
有关使用ValueProvider创建模板的更多信息,请参阅此文档: https: //cloud.google.com/dataflow/docs/guides/templates/creating-templates
See this documentation for more on creating templates with ValueProviders: https://cloud.google.com/dataflow/docs/guides/templates/creating-templates
这篇关于流媒体流(Apache Beam/PYTHON)上的使用问题add_value_provider_argument的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!