假设我有一个KV<String, Integer>类型的有界PCollection p。假定p无法容纳在内存中,因此不能是DoFn的侧面输入。

示例p:

("foo", 0)
("bar", 1)
("baz", 2)


我该如何取p及其自身的笛卡尔积?

例如,p x p可能如下所示:

("foo+foo", [("foo", 0), ("foo", 0)])
("foo+bar", [("foo", 0), ("bar", 1)])
("foo+baz", [("foo", 0), ("baz", 2)])
("bar+foo", [("bar", 1), ("foo", 0)])
("bar+bar", [("bar", 1), ("bar", 1)])
("bar+baz", [("bar", 1), ("baz", 2)])
("baz+foo", [("baz", 2), ("foo", 0)])
("baz+bar", [("baz", 2), ("bar", 1)])
("baz+baz", [("baz", 2), ("baz", 2)])

最佳答案

如您所料,最简单的方法是拥有一个将您的PCollection作为主输入和副输入处理的DoFn。

如果由于PCollection太大而无法放入内存而无法解决问题,则可以将其划分为N个不相交的PCollection,将其传递给每个PCollection,然后将结果展平。例如,您可以编写类似

class CrossProduct(beam.PTransform):
  def expand(self, pcoll):
    N = 10
    parts = pcoll | beam.Partition(lambda element, n: hash(element) % n, N)
    cross_parts = [
        pcoll | str(ix) >> beam.FlatMap(
            lambda x, side: [(x, s) for s in side],
            beam.pvalue.AsIter(part))
        for ix, part in enumerate(parts)]
    return cross_parts | beam.Flatten()

output = input | CrossProduct()


但是请注意,除非PCollection的元素特别大,否则如果PCollection无法容纳到内存中,则其叉积可能无法生产(和处理)。

10-02 11:01