本文介绍了执行算术运算时,Apache Beam的CombineValues如何对元素进行运算的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是一个人为的示例,但是我一直在探索 docs 获取 CombineValues ,并希望了解我所看到的内容.

This is a bit of a contrived example, but I have been exploring the docs for CombineValues and wish understand what I'm seeing.

如果我合并值并对值进行一些算术运算(目的是计算有界流中存在的键的百分比),那么我需要使用 AverageFn (在 docs 中的示例8 ,并在源代码示例摘要).

If I combine values and perform some arithmetic operations on the values (the goal is to calculate percentages of keys present in a bounded stream), then I need to use the AverageFn (as defined in Example 8 in docs and provided in the source code example snippets).

但是,这(基于示例5 )不起作用:

However, this (based on Example 5) does not work:

with beam.Pipeline() as pipeline:

  counts = ( pipeline
             | 'create' >>  beam.Create(['xxx'])
             | 'key it' >> beam.Map(lambda elem: (elem, 1))
             | 'combine' >> beam.CombinePerKey(
                 lambda values: sum(values)/2
                 )
             | 'print' >> beam.Map(print)
           )

产生时

('xxx', 0.25)

我最终想通过计算

  totals = pipeline | 'Count elements' >> beam.combiners.Count.Globally()

,然后使用他们建议的单例方法(我在此处提供 beam.pvalue.AsSingleton(总计) beam.CombineValues ).

and then use the singleton approach they suggest (where I providebeam.pvalue.AsSingleton(totals) to beam.CombineValues).

我的问题是,为什么CombineValues似乎执行两次(可能有些面面俱到)?

My question is, why does CombineValues appear to execute twice (probably going to be some facepalming)?

推荐答案

组合器被调用两次的原因是因为MapReduce阶段.由于您正在使用的功能(均值的一半)不具有关联性,因此您需要一个高级合并器"来关联.如您提到的示例8 .

The reason the combiner is being called twice is because of the MapReduce phases. Since the function you are using (halving the mean) is not associative, you'd need a an "advance combiner" as in the example 8 you mention.

当前代码中发生的事情是,从(xxx,1)计算一半平均值(xxx,0.5),然后在合并值时将其再次减半,得到(xxx,0.25).

What is happening in your current code is, from (xxx, 1) calculate the half mean (xxx, 0.5) and then, when merging the values, it halves it again, making (xxx, 0.25).

在此答案中,我解释了一个类似的概念.

In this answer I explain a similar concept.

对于您的特定情况,如前所述,您需要高级合并器"

For your particular case, as mentioned, you need "advance combiners"

 with beam.Pipeline() as pipeline:

    def combiner(elements):
        print(elements)
        return sum(elements)/2

    class HalfMean(beam.CombineFn):
        def create_accumulator(self):
            # Tuple of sum, count
            return (0, 0)

        def add_input(self, accumulator, input):
            # Add the current element to sum, add one to count
            new_tuple = (accumulator[0] + input, accumulator[1] + 1)
            return new_tuple

        def merge_accumulators(self, accumulators):
            # Join all accumulators
            partial_sums = [x[0] for x in accumulators]
            partial_counts = [x[1] for x in accumulators]
            merged_tuple = (sum(partial_sums), sum(partial_counts))
            return merged_tuple

        def extract_output(self, sum_count_tuple):
            # Now we can output half of the mean
            mean = sum_count_tuple[0]/sum_count_tuple[1]
            return mean/2

    counts = ( pipeline
             | 'create' >> beam.Create(['xxx'])
             | 'key it' >> beam.Map(lambda elem: (elem, 1))
             #| 'combine' >> beam.CombinePerKey(combiner)
             | 'advance combine' >> beam.CombinePerKey(HalfMean())
             | 'print' >> beam.Map(print)
           )

我要给您的旧组合器留下印刷品,以便您了解正在发生的事情.

I'm leaving your old combiner with a print so you see what's happening.

无论如何,那仍然不是 CombineValues ,而是 CombinerPerKey . CombineValues 接受一个键值对,该值是一个迭代器,并对其应用组合器.在以下情况下,它采用的元素是('a',[1、2、3])('b',[10]).这里有示例

Anyhow, that is still not a CombineValues but a CombinerPerKey. CombineValues takes a key value pair on which the value is an iterator, and applies the combiner to it. In the following case, the elements that it's taking are ('a', [1, 2, 3]) and ('b', [10]). Here you have the example

    kvs = [('a', 1),
           ('a', 2),
           ('a', 3),
           ('b', 10),
   ]

    combine_values = (pipeline
             | 'create_kvs' >> beam.Create(kvs)
             | 'gbk' >> beam.GroupByKey()
             | 'combine values' >> beam.CombineValues(HalfMean())
             | 'print cv' >> beam.Map(print)
           )

这篇关于执行算术运算时,Apache Beam的CombineValues如何对元素进行运算的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-11 18:16