我还是Apache Beam / Cloud Dataflow的新手,如果我的理解不正确,我深表歉意。

我正在尝试通过管道读取大约30,000行的数据文件。我的简单管道首先从GCS打开了csv,从数据中提取了标头,通过ParDo / DoFn函数运行了数据,然后将所有输出写回到csv中回到GCS。该管道有效,这是我的第一个测试。

然后,我编辑管道以读取csv,拉出标题,从数据中删除标题,通过ParDo / DoFn函数以标题作为侧面输入来运行数据,然后将所有输出写入csv。唯一的新代码是将标头作为侧面输入传递并从数据中过滤出来。

python - Python Apache Beam侧面输入断言错误-LMLPHP
python - Python Apache Beam侧面输入断言错误-LMLPHP

ParDo / DoFn函数build_rows只是产生context.element,因此我可以确保侧面输入正常工作。

我得到的错误如下:
python - Python Apache Beam侧面输入断言错误-LMLPHP

我不确定是什么问题,但我认为可能是由于内存限制。我将示例数据从30,000行缩减为100行,我的代码终于可以工作了。

没有侧面输入的管道确实读取/写入了所有30,000行,但最后,我将需要侧面输入对数据进行转换。

如何修复管道,以便可以处理来自GCS的大型csv文件,并且仍将侧面输入用作文件的伪全局变量?

最佳答案

我最近为Apache Beam编写了CSV file source的代码,并将其添加到beam_utils PiPy包中。具体来说,您可以按以下方式使用它:


安装Beam utils:pip install beam_utils
导入:from beam_utils.sources import CsvFileSource
将其用作来源:beam.io.Read(CsvFileSource(input_file))


CsvFileSource的默认行为是返回按标头索引的字典-但您可以查看文档来确定要使用的选项。

另外,如果要实现自己的自定义CsvFileSource,则需要将Beam的FileBasedSource子类化:

import csv
class CsvFileSource(beam.io.filebasedsource.FileBasedSource):
  def read_records(self, file_name, range_tracker):
    self._file = self.open_file(file_name)
    reader = csv.reader(self._file)
    for i, rec in enumerate(reader):
      yield res


您可以扩展此逻辑以分析标头和其他特殊行为。

另外,请注意,由于需要顺序分析此源,因此无法拆分,因此在处理数据时它可能表示瓶颈(尽管可以)。

关于python - Python Apache Beam侧面输入断言错误,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/42399971/

10-11 08:32