问题描述
我有一个从GCS
到Pub\Sub
的管道读取文件,
I have one pipeline read file from GCS
through Pub\Sub
,
class ExtractFileNameFn(beam.DoFn):
def process(self, element):
file_name = 'gs://' + "/".join(element['id'].split("/")[:-1])
logging.info("Load file: " + file_name)
yield file_name
class LogFn(beam.DoFn):
def process(self, element):
logging.info(element)
return [element]
class LogPassThroughFn(beam.DoFn):
def process(self, element):
logging.info(element)
return element
...
p
| "Read Sub Message" >> beam.io.ReadFromPubSub(topic=args.topic)
| "Convert Message to JSON" >> beam.Map(lambda message: json.loads(message))
| "Extract File Name" >> beam.ParDo(ExtractFileNameFn())
| 'Log Results' >> beam.ParDo(LogFn())
# | 'Log Results' >> beam.ParDo(LogPassThroughFn())
| "Read File from GCS" >> beam.io.ReadAllFromText()
LogPassThroughFn
和LogPassThroughFn
的差是返回值的类型,一个是string
,另一个是list
.并且LogFn
在测试代码中运行良好,但是LogPassThroughFn
使管道无法运行.根据此问题的答案
The difference of LogPassThroughFn
and LogPassThroughFn
is the type of return value, one the string
, the other is list
. And the LogFn
works well in test codes, but LogPassThroughFn
make the pipeline failed to run. Per this issue answer
我们知道LogFn
应该可以正常工作.
We know LogFn
should work correctly.
但是,我注意到ExtractFileNameFn
返回string
而不是list
.那是对的吗?然后我如下测试,在ExtractFileNameFn1
However, I notice the ExtractFileNameFn
return string
rather than list
. Is that correct? Then I test it as below, return list
in ExtractFileNameFn1
class ExtractFileNameFn1(beam.DoFn):
def process(self, element):
file_name = 'gs://' + "/".join(element['id'].split("/")[:-1])
logging.info("Load file: " + file_name)
yield [file_name]
...
p
| "Read Sub Message" >> beam.io.ReadFromPubSub(topic=args.topic)
| "Convert Message to JSON" >> beam.Map(lambda message: json.loads(message))
| "Extract File Name" >> beam.ParDo(ExtractFileNameFn1())
| "Read File from GCS" >> beam.io.ReadAllFromText()
现在,管道无法运行...
Now, the pipeline failed to run...
我的问题是在DoFn中return string
和return list
有什么区别?为什么ReadAllFromText
可以从ExtractFileNameFn
接收string
,但是从LogFn
接收list
?
My question is What the difference between return string
and return list
in DoFn? Why ReadAllFromText
could receive string
from ExtractFileNameFn
, but receive list
from LogFn
?
光束版本:2.14.0
beam version: 2.14.0
推荐答案
ParDo
的文档说:
返回可迭代对象的目的是您的输入元素可能不与您的输出元素映射1-1.一个输入可能会产生多个输出.
The purpose of returning an iterable is that your input elements may not map 1-1 with your output elements. A single input may produce multiple outputs.
您可以随时进行yield
个操作,也可以将它们收集到一个列表中,并在最后将return
个操作
you are able to yield
them as you go, or you can gather them up into a list and return
them at the end
所以这个:
class ExtractFileNameFn(beam.DoFn):
def process(self, element):
file_name = 'gs://' + "/".join(element['id'].split("/")[:-1])
logging.info("Load file: " + file_name)
yield file_name
将与此相同:
class ExtractFileNameFn(beam.DoFn):
def process(self, element):
file_name = 'gs://' + "/".join(element['id'].split("/")[:-1])
logging.info("Load file: " + file_name)
return [file_name]
两者的输出元素都是字符串,每个输出元素都是文件名
the output elements for both are strings, each output element being a filename
执行yield [file_name]
时,每个输出元素实际上是一个包含字符串的列表
When you do yield [file_name]
, each output element is actually a list containing a string
这篇关于Beam:ReadAllFromText从DoFn接收字符串或列表的不同行为?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!