本文介绍了使用Apache Beam笔记本启动Dataflow作业时处理名称错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
class ReadWordsFromText(beam.PTransform):
def __init__(self, file_pattern):
self._file_pattern = file_pattern
def expand(self, pcoll):
return (pcoll.pipeline
| beam.io.ReadFromText(self._file_pattern)
| beam.FlatMap(lambda line: re.findall(r'[\w\']+', line.strip(), re.UNICODE)))
p = beam.Pipeline(InteractiveRunner())
words = p | 'read' >> ReadWordsFromText('gs://apache-beam-samples/shakespeare/kinglear.txt')
counts = (words
| 'count' >> beam.combiners.Count.PerElement())
lower_counts = (words
| "lower" >> beam.Map(lambda word: word.lower())
| "lower_count" >> beam.combiners.Count.PerElement()
If I refactor the part that extract the words using a new function as follows
def extract_words(line):
return re.findall(r'[\w\']+', line.strip(), re.UNICODE)
class ReadWordsFromText(beam.PTransform):
def __init__(self, file_pattern):
self._file_pattern = file_pattern
def expand(self, pcoll):
return (pcoll.pipeline
| beam.io.ReadFromText(self._file_pattern)
| beam.FlatMap(lambda line: extract_words(line)))
and run the notebook I get the following error message:
DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 570, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "<ipython-input-3-d48b3d7d5e4f>", line 12, in <lambda>
NameError: name 'extract_words' is not defined
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 638, in do_work
work_executor.execute()
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute
op.start()
File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start
File "apache_beam/runners/worker/operations.py", line 356, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 570, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "<ipython-input-3-d48b3d7d5e4f>", line 12, in <lambda>
NameError: name 'extract_words' is not defined [while running '[3]: read/FlatMap(<lambda at <ipython-input-3-d48b3d7d5e4f>:12>)']
Note: imports, functions and other variables defined in the global context of your __main__ file of your Dataflow pipeline are, by default, not available in the worker execution environment, and such references will cause a NameError, unless the --save_main_session pipeline option is set to True. Please see https://cloud.google.com/dataflow/faq#how-do-i-handle-nameerrors for additional documentation on configuring your worker execution environment.
要处理名称错误,请遵循说明并添加以下行
To handle nameerrors I follow the instructions and add the following line
options.view_as(SetupOptions).save_main_session=True
但是运行笔记本时出现以下错误
but I get the following error when I run the notebook
DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 760, in run
self._load_main_session(self.local_staging_directory)
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 501, in _load_main_session
pickler.load_session(session_file)
File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 307, in load_session
return dill.load_session(file_path)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 368, in load_session
module = unpickler.load()
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load
obj = StockUnpickler.load(self)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 462, in find_class
return StockUnpickler.find_class(self, module, name)
ModuleNotFoundError: No module named 'IPython'
有解决此问题的简便方法吗?
Is there an easy way to fix this issue?
推荐答案
而不是使用save_main_session,而是将提取单词解压缩到ReadWordsFromText复合转换之外.这是示例:
Instead of using save_main_session, unpack the extract words outside ReadWordsFromText composite transform. Here is the example:
def extract_words(line):
return re.findall(r'[\w\']+', line.strip(), re.UNICODE)
class ReadWordsFromText(beam.PTransform):
def __init__(self, file_pattern):
self._file_pattern = file_pattern
def expand(self, pcoll):
return (pcoll.pipeline
| beam.io.ReadFromText(self._file_pattern)
)
words = (p | 'read' >> ReadWordsFromText('gs://apache-beam-samples/shakespeare/kinglear.txt')
| 'Extract' >> beam.FlatMap(extract_words)
)
counts = (words
| 'count' >> beam.combiners.Count.PerElement())
这篇关于使用Apache Beam笔记本启动Dataflow作业时处理名称错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!