我还是Apache Beam / Cloud Dataflow的新手,如果我的理解不正确,我深表歉意。
我正在尝试通过管道读取大约30,000行的数据文件。我的简单管道首先从GCS打开了csv,从数据中提取了标头,通过ParDo / DoFn函数运行了数据,然后将所有输出写回到csv中回到GCS。该管道有效,这是我的第一个测试。
然后,我编辑管道以读取csv,拉出标题,从数据中删除标题,通过ParDo / DoFn函数以标题作为侧面输入来运行数据,然后将所有输出写入csv。唯一的新代码是将标头作为侧面输入传递并从数据中过滤出来。
ParDo / DoFn函数build_rows只是产生context.element,因此我可以确保侧面输入正常工作。
我得到的错误如下:
我不确定是什么问题,但我认为可能是由于内存限制。我将示例数据从30,000行缩减为100行,我的代码终于可以工作了。
没有侧面输入的管道确实读取/写入了所有30,000行,但最后,我将需要侧面输入对数据进行转换。
如何修复管道,以便可以处理来自GCS的大型csv文件,并且仍将侧面输入用作文件的伪全局变量?
最佳答案
我最近为Apache Beam编写了CSV file source的代码,并将其添加到beam_utils
PiPy包中。具体来说,您可以按以下方式使用它:
安装Beam utils:pip install beam_utils
导入:from beam_utils.sources import CsvFileSource
。
将其用作来源:beam.io.Read(CsvFileSource(input_file))
。CsvFileSource
的默认行为是返回按标头索引的字典-但您可以查看文档来确定要使用的选项。
另外,如果要实现自己的自定义CsvFileSource
,则需要将Beam的FileBasedSource
子类化:
import csv
class CsvFileSource(beam.io.filebasedsource.FileBasedSource):
def read_records(self, file_name, range_tracker):
self._file = self.open_file(file_name)
reader = csv.reader(self._file)
for i, rec in enumerate(reader):
yield res
您可以扩展此逻辑以分析标头和其他特殊行为。
另外,请注意,由于需要顺序分析此源,因此无法拆分,因此在处理数据时它可能表示瓶颈(尽管可以)。
关于python - Python Apache Beam侧面输入断言错误,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/42399971/