TypeError: 'PCollection' object does not support indexing
上面的错误是由于尝试将Pcollection转换为list而导致的:
filesList = (files | beam.combiners.ToList())
lines = (p | 'read' >> beam.Create(ReadSHP().ReadSHP(filesList))
| 'map' >> beam.Map(_to_dictionary))
和:
def ReadSHP(self, filesList):
"""
"""
sf = shp.Reader(shp=filesList[1], dbf=filesList[2])
如何解决这个问题?任何帮助表示赞赏。
最佳答案
通常,您不能将PCollection
转换为列表。PCollection
是可能无界且无序的项目的集合。 Beam允许您将转换应用于PCollection
。将PTransform
应用于PCollection
会产生另一个PCollection
。转换的应用过程可能会分布在一组机器上。因此,一般情况下不可能将此类事物转换为本地内存中的元素集合。
组合器只是PTransforms
的特殊类。他们要做的是累积所有看到的元素,对元素应用一些合并逻辑,然后输出合并结果。例如,组合器可以查看传入的元素,将它们求和,然后输出总和作为结果。这种组合器将元素的PCollection
转换为这些元素的和的PCollection
。beam.combiners.ToList
只是应用于PCollection
的另一种转换,可能会在一组工作机上进行转换,并产生另一个PCollection
。但是在产生输出元素之前,它实际上并没有进行任何复杂的组合,它只将所有可见元素累积到一个列表中,然后输出可见元素的列表。因此,它将作为键值对的元素(在多台计算机上)放入列表中,然后输出这些列表。
缺少从潜在多台计算机中获取这些列表并将它们加载到本地程序(如果需要)的逻辑。不能以通用方式(在所有运行程序,所有可能的IO和管道结构之间)轻松解决(如果有的话)这个问题。
解决方法之一是在流水线中添加另一步,以将合并的输出(例如,总和或列表)写入通用存储(例如,数据库中的表或文件。然后,当管道完成时,您的程序可以从该位置加载管道执行的结果。
有关详细信息,请参见文档:
光束执行模型:https://beam.apache.org/documentation/execution-model/
设计管道:https://beam.apache.org/documentation/pipelines/design-your-pipeline/
光束编程指南:https://beam.apache.org/documentation/programming-guide/