我正在使用Apache-Beam运行一些数据转换,包括从txt,csv和不同数据源提取数据。
我注意到的一件事是使用 beam.Map beam.ParDo 时结果的差异

在下一个示例中:

我正在读取csv数据,在第一种情况下,使用 beam.ParDo 将其传递到DoFn,它将提取第一个元素(即日期),然后进行打印。
在第二种情况下,我直接使用 beam.Map 做同样的事情,然后打印它。

class Printer(beam.DoFn):
    def process(self,data_item):
        print data_item

class DateExtractor(beam.DoFn):
    def process(self,data_item):
        return (str(data_item).split(','))[0]

data_from_source = (p
                    | 'ReadMyFile 01' >> ReadFromText('./input/data.csv')
                    | 'Splitter using beam.ParDo 01' >> beam.ParDo(DateExtractor())
                    | 'Printer the data 01' >> beam.ParDo(Printer())
                    )

copy_of_the_data =  (p
                    | 'ReadMyFile 02' >> ReadFromText('./input/data.csv')
                    | 'Splitter using beam.Map 02' >> beam.Map(lambda record: (record.split(','))[0])
                    | 'Printer the data 02' >> beam.ParDo(Printer())
                    )

我在两个输出中注意到的是下一个:
##With beam.ParDo##
2
0
1
7
-
0
4
-
0
3
2
0
1
7

##With beam.Map##
2017-04-03
2017-04-03
2017-04-10
2017-04-10
2017-04-11
2017-04-12
2017-04-12

我觉得这很奇怪。我想知道打印功能是否有问题?但是在使用不同的转换后,它显示出相同的结果。
作为示例运行:
| 'Group it 01' >> beam.Map(lambda record: (record, 1))

仍然返回相同的问题:
##With beam.ParDo##
('8', 1)
('2', 1)
('0', 1)
('1', 1)

##With beam.Map##
(u'2017-04-08', 1)
(u'2017-04-08', 1)
(u'2017-04-09', 1)
(u'2017-04-09', 1)

知道原因是什么吗?我在 beam.Map beam.ParDo 之间的区别中缺少什么?

最佳答案

简短答案

您需要将ParDo的返回值包装到列表中。

较长版本
ParDos通常可以为单个输入返回任意数量的输出,即对于单个输入字符串,您可以发出零个,一个或多个结果。因此,Beam SDK将ParDo的输出视为单个元素,而不是元素的集合。

在您的情况下,ParDo发出单个字符串而不是集合。 Beam Python SDK仍尝试将ParDo的输出解释为好像是元素的集合。它通过将您发出的字符串解释为字符集合来实现。因此,您的ParDo现在可以有效地产生单个字符流,而不是字符串流。

您需要做的是将返回值包装在一个列表中:

class DateExtractor(beam.DoFn):
    def process(self,data_item):
        return [(str(data_item).split(','))[0]]

注意方括号。有关更多示例,请参见programming guide

另一方面,可以将Map视为ParDo的特例。 Map预期会为每个输入产生一个准确的输出。因此,在这种情况下,您只需从lambda中返回一个值即可,它可以按预期工作。

而且您可能不需要将data_item包装在str中。 According to the docs ReadFromText转换产生字符串。

关于python-2.7 - 输出类型中beam.ParDo和beam.Map之间的区别?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/53912918/

10-15 09:15