eadAllFromText从DoFn接收字符串或列表的不同行为

eadAllFromText从DoFn接收字符串或列表的不同行为

本文介绍了Beam:ReadAllFromText从DoFn接收字符串或列表的不同行为?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个从GCSPub\Sub的管道读取文件,

I have one pipeline read file from GCS through Pub\Sub,

class ExtractFileNameFn(beam.DoFn):
    def process(self, element):
        file_name = 'gs://' + "/".join(element['id'].split("/")[:-1])
        logging.info("Load file: " + file_name)
        yield file_name

class LogFn(beam.DoFn):
    def process(self, element):
        logging.info(element)
        return [element]

class LogPassThroughFn(beam.DoFn):
    def process(self, element):
        logging.info(element)
        return element

...

    p
    | "Read Sub Message" >> beam.io.ReadFromPubSub(topic=args.topic)
    | "Convert Message to JSON" >> beam.Map(lambda message: json.loads(message))
    | "Extract File Name" >> beam.ParDo(ExtractFileNameFn())
    | 'Log Results' >> beam.ParDo(LogFn())
    # | 'Log Results' >> beam.ParDo(LogPassThroughFn())
    | "Read File from GCS" >> beam.io.ReadAllFromText()

LogPassThroughFnLogPassThroughFn的差是返回值的类型,一个是string,另一个是list.并且LogFn在测试代码中运行良好,但是LogPassThroughFn使管道无法运行.根据此问题的答案

The difference of LogPassThroughFn and LogPassThroughFn is the type of return value, one the string, the other is list. And the LogFn works well in test codes, but LogPassThroughFn make the pipeline failed to run. Per this issue answer

我们知道LogFn应该可以正常工作.

We know LogFn should work correctly.

但是,我注意到ExtractFileNameFn返回string而不是list.那是对的吗?然后我如下测试,在ExtractFileNameFn1

However, I notice the ExtractFileNameFn return string rather than list. Is that correct? Then I test it as below, return list in ExtractFileNameFn1

class ExtractFileNameFn1(beam.DoFn):
    def process(self, element):
        file_name = 'gs://' + "/".join(element['id'].split("/")[:-1])
        logging.info("Load file: " + file_name)
        yield [file_name]

...

    p
    | "Read Sub Message" >> beam.io.ReadFromPubSub(topic=args.topic)
    | "Convert Message to JSON" >> beam.Map(lambda message: json.loads(message))
    | "Extract File Name" >> beam.ParDo(ExtractFileNameFn1())
    | "Read File from GCS" >> beam.io.ReadAllFromText()

现在,管道无法运行...

Now, the pipeline failed to run...

我的问题是在DoFn中return string和return list有什么区别?为什么ReadAllFromText可以从ExtractFileNameFn接收string,但是从LogFn接收list?

My question is What the difference between return string and return list in DoFn? Why ReadAllFromText could receive string from ExtractFileNameFn, but receive list from LogFn?

光束版本:2.14.0

beam version: 2.14.0

推荐答案

ParDo的文档说:

https ://beam.apache.org/releases/pydoc/2.6.0/apache_beam.transforms.core.html#apache_beam.transforms.core.ParDo

返回可迭代对象的目的是您的输入元素可能不与您的输出元素映射1-1.一个输入可能会产生多个输出.

The purpose of returning an iterable is that your input elements may not map 1-1 with your output elements. A single input may produce multiple outputs.

您可以随时进行yield个操作,也可以将它们收集到一个列表中,并在最后将return个操作

you are able to yield them as you go, or you can gather them up into a list and return them at the end

所以这个:

class ExtractFileNameFn(beam.DoFn):
    def process(self, element):
        file_name = 'gs://' + "/".join(element['id'].split("/")[:-1])
        logging.info("Load file: " + file_name)
        yield file_name

将与此相同:

class ExtractFileNameFn(beam.DoFn):
    def process(self, element):
        file_name = 'gs://' + "/".join(element['id'].split("/")[:-1])
        logging.info("Load file: " + file_name)
        return [file_name]

两者的输出元素都是字符串,每个输出元素都是文件名

the output elements for both are strings, each output element being a filename

执行yield [file_name]时,每个输出元素实际上是一个包含字符串的列表

When you do yield [file_name], each output element is actually a list containing a string

这篇关于Beam:ReadAllFromText从DoFn接收字符串或列表的不同行为?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-02 17:31