问题描述
这是一个人为的示例,但是我一直在探索 docs 获取 CombineValues
,并希望了解我所看到的内容.
This is a bit of a contrived example, but I have been exploring the docs for CombineValues
and wish understand what I'm seeing.
如果我合并值并对值进行一些算术运算(目的是计算有界流中存在的键的百分比),那么我需要使用 AverageFn
(在 docs 中的示例8 ,并在源代码示例摘要).
If I combine values and perform some arithmetic operations on the values (the goal is to calculate percentages of keys present in a bounded stream), then I need to use the AverageFn
(as defined in Example 8 in docs and provided in the source code example snippets).
但是,这(基于示例5 )不起作用:
However, this (based on Example 5) does not work:
with beam.Pipeline() as pipeline:
counts = ( pipeline
| 'create' >> beam.Create(['xxx'])
| 'key it' >> beam.Map(lambda elem: (elem, 1))
| 'combine' >> beam.CombinePerKey(
lambda values: sum(values)/2
)
| 'print' >> beam.Map(print)
)
产生时
('xxx', 0.25)
我最终想通过计算
totals = pipeline | 'Count elements' >> beam.combiners.Count.Globally()
,然后使用他们建议的单例方法(我在此处提供 beam.pvalue.AsSingleton(总计)
到 beam.CombineValues
).
and then use the singleton approach they suggest (where I providebeam.pvalue.AsSingleton(totals)
to beam.CombineValues
).
我的问题是,为什么CombineValues似乎执行两次(可能有些面面俱到)?
My question is, why does CombineValues appear to execute twice (probably going to be some facepalming)?
推荐答案
组合器被调用两次的原因是因为MapReduce阶段.由于您正在使用的功能(均值的一半)不具有关联性,因此您需要一个高级合并器"来关联.如您提到的示例8 .
The reason the combiner is being called twice is because of the MapReduce phases. Since the function you are using (halving the mean) is not associative, you'd need a an "advance combiner" as in the example 8 you mention.
当前代码中发生的事情是,从(xxx,1)
计算一半平均值(xxx,0.5)
,然后在合并值时将其再次减半,得到(xxx,0.25)
.
What is happening in your current code is, from (xxx, 1)
calculate the half mean (xxx, 0.5)
and then, when merging the values, it halves it again, making (xxx, 0.25)
.
在此答案中,我解释了一个类似的概念.
In this answer I explain a similar concept.
对于您的特定情况,如前所述,您需要高级合并器"
For your particular case, as mentioned, you need "advance combiners"
with beam.Pipeline() as pipeline:
def combiner(elements):
print(elements)
return sum(elements)/2
class HalfMean(beam.CombineFn):
def create_accumulator(self):
# Tuple of sum, count
return (0, 0)
def add_input(self, accumulator, input):
# Add the current element to sum, add one to count
new_tuple = (accumulator[0] + input, accumulator[1] + 1)
return new_tuple
def merge_accumulators(self, accumulators):
# Join all accumulators
partial_sums = [x[0] for x in accumulators]
partial_counts = [x[1] for x in accumulators]
merged_tuple = (sum(partial_sums), sum(partial_counts))
return merged_tuple
def extract_output(self, sum_count_tuple):
# Now we can output half of the mean
mean = sum_count_tuple[0]/sum_count_tuple[1]
return mean/2
counts = ( pipeline
| 'create' >> beam.Create(['xxx'])
| 'key it' >> beam.Map(lambda elem: (elem, 1))
#| 'combine' >> beam.CombinePerKey(combiner)
| 'advance combine' >> beam.CombinePerKey(HalfMean())
| 'print' >> beam.Map(print)
)
我要给您的旧组合器留下印刷品,以便您了解正在发生的事情.
I'm leaving your old combiner with a print so you see what's happening.
无论如何,那仍然不是 CombineValues
,而是 CombinerPerKey
. CombineValues
接受一个键值对,该值是一个迭代器,并对其应用组合器.在以下情况下,它采用的元素是('a',[1、2、3])
和('b',[10])
.这里有示例
Anyhow, that is still not a CombineValues
but a CombinerPerKey
. CombineValues
takes a key value pair on which the value is an iterator, and applies the combiner to it. In the following case, the elements that it's taking are ('a', [1, 2, 3])
and ('b', [10])
. Here you have the example
kvs = [('a', 1),
('a', 2),
('a', 3),
('b', 10),
]
combine_values = (pipeline
| 'create_kvs' >> beam.Create(kvs)
| 'gbk' >> beam.GroupByKey()
| 'combine values' >> beam.CombineValues(HalfMean())
| 'print cv' >> beam.Map(print)
)
这篇关于执行算术运算时,Apache Beam的CombineValues如何对元素进行运算的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!