问题描述
由于数据流sdk 1.x语法在sdk 2.0中不起作用,我们想出了如何在Beam sdk 2.0中创建自定义合并功能(经过大量的猜测和Beam sdk 2.0代码读取后).
We figured out how to create a custom combine function (after lots of guesswork and beam sdk 2.0 code reading) in beam sdk 2.0, as the dataflow sdk 1.x syntax did not work in sdk 2.0.
但是,我们无法弄清楚如何在 beam sdk 2.0 中创建自定义组合PER KEY 函数.任何帮助或指针(或更好的一个实际示例)将不胜感激. (我们在Internet上搜索了文档或示例,但没有找到;我们还尝试查看Beam sdk 2.0的Combine类中的代码,但无法弄清楚,尤其是因为PerKey类现在具有私有构造函数,因此我们不能再扩展它了.)
However, we can't figure out how to create a custom combine PER KEY function in beam sdk 2.0. Any help or pointers (or better yet an actual example) would be greatly appreciated. (We scoured the internet for documentation or examples and found none; we also attempted to look at the code within beam sdk 2.0's Combine class, but couldn't figure it out, especially since the PerKey class now has a private constructor, so we can't extend it any longer.)
如果有帮助,这是我们在Beam sdk 2.0中正确创建自定义组合器( without )键的方法,但是我们不知道如何使用 >键:
In case it helps, here's how we correctly created a custom combiner (without) keys in beam sdk 2.0, but we can't figure out how to create one with a key:
public class CombineTemplateIntervalsIntoBlocks
extends Combine.AccumulatingCombineFn<ImmutableMySetOfIntervals, TemplateIntervalAccum, ArrayList<ImmutableMySetOfIntervals>>{
public CombineTemplateIntervalsIntoBlocks() {
}
@Override
public TemplateIntervalAccum createAccumulator() {
return new TemplateIntervalAccum()
}
然后
public class TemplateIntervalAccum
implements Combine.AccumulatingCombineFn.Accumulator<ImmutableMySetOfIntervals, TemplateIntervalAccum, ArrayList<ImmutableMySetOfIntervals>>, Serializable {
...
推荐答案
您无需以其他方式创建CombineFn即可使用Combine.PerKey.
You don't need to create your CombineFn differently to use a Combine.PerKey.
您可以扩展AccumulatingCombineFn
(将合并逻辑放入累加器中)或扩展CombineFn
(将合并逻辑放入CombineFn
中).还有其他选项,例如BinaryCombineFn
和IterableCombineFn
.
You can extend either AccumulatingCombineFn
(which puts the merging logic in the accumulator) or extend CombineFn
(which puts the merging logic in the CombineFn
). There are also other options such as BinaryCombineFn
and IterableCombineFn
.
假设您有一个名为combineFn
的CombineFn<InputT, AccumT, OutputT>
:
- 您可以使用
Combine.globally(combineFn)
创建一个采用PCollection<InputT>
并结合所有元素的PTransform
. - 或者,您可以使用
Combine.perKey(combineFn)
创建一个PTransform
,该PTransform
接受一个PCollection<KV<K, InputT>>
并将与每个键关联的所有值组合在一起.这对应于我相信您所指的Combine.PerKey
.
- You can use
Combine.globally(combineFn)
to create aPTransform
that takes aPCollection<InputT>
and combines all the elements. - Or, you can use
Combine.perKey(combineFn)
to create aPTransform
that takes aPCollection<KV<K, InputT>>
and combines all the values associated with a each key and combines them. This corresponds to theCombine.PerKey
I believe you are referring to.
这篇关于如何在Beam sdk 2.0中创建自定义Combine.PerKey的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!