假设我有一个KV<String, Integer>
类型的有界PCollection p。假定p无法容纳在内存中,因此不能是DoFn
的侧面输入。
示例p:
("foo", 0)
("bar", 1)
("baz", 2)
我该如何取p及其自身的笛卡尔积?
例如,
p x p
可能如下所示:("foo+foo", [("foo", 0), ("foo", 0)])
("foo+bar", [("foo", 0), ("bar", 1)])
("foo+baz", [("foo", 0), ("baz", 2)])
("bar+foo", [("bar", 1), ("foo", 0)])
("bar+bar", [("bar", 1), ("bar", 1)])
("bar+baz", [("bar", 1), ("baz", 2)])
("baz+foo", [("baz", 2), ("foo", 0)])
("baz+bar", [("baz", 2), ("bar", 1)])
("baz+baz", [("baz", 2), ("baz", 2)])
最佳答案
如您所料,最简单的方法是拥有一个将您的PCollection作为主输入和副输入处理的DoFn。
如果由于PCollection太大而无法放入内存而无法解决问题,则可以将其划分为N个不相交的PCollection,将其传递给每个PCollection,然后将结果展平。例如,您可以编写类似
class CrossProduct(beam.PTransform):
def expand(self, pcoll):
N = 10
parts = pcoll | beam.Partition(lambda element, n: hash(element) % n, N)
cross_parts = [
pcoll | str(ix) >> beam.FlatMap(
lambda x, side: [(x, s) for s in side],
beam.pvalue.AsIter(part))
for ix, part in enumerate(parts)]
return cross_parts | beam.Flatten()
output = input | CrossProduct()
但是请注意,除非PCollection的元素特别大,否则如果PCollection无法容纳到内存中,则其叉积可能无法生产(和处理)。