本文介绍了如何用APACHE BEAM计算运行总数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
使用用于Google数据流的ApacheBeam SDK,我想计算一组交易的每日余额。
示例数据集可能如下所示:收款人名称、交易日期和金额:
John, 2017-12-01, 100
John, 2017-12-01, 200
Jane, 2017-12-01, 150
John, 2017-12-02, -100
John, 2017-12-02, 300
所需的结果集如下所示:
John, 2017-12-01, 300 (100 + 200)
Jane, 2017-12-01, 150
John, 2017-12-02, 500 (300 + -100 + 300)
我尝试将KV<Pair, BigDecimal>
与Combine.perKey
函数一起使用,该函数计算BigDecimal
的总和,但这不会将前一天的期末余额作为第二天的期初余额。
推荐答案
beam的窗口接口在这里使用是正确的:
https://beam.apache.org/documentation/programming-guide/#windowing
具体而言,您必须回答以下问题:
- 您希望在事件时间的哪个位置执行聚合?
- 在处理时间,您是否希望得到您的答复?
回答完上述问题后,我们可以编写一些伪代码:
PCollection<KV<String, Double>> transactionsByName = .... ; // Read input
PCollection<KV<String, Double> dailyTotalsByName = transactionsByName
// Group by name
.apply(GroupByKey.<String, Double>create())
// 1-day windows
.apply(Window.<KV<String, Iterable<Double>>>into(
FixedWindows.of(Duration.standardDays(1))))
// Combine each window (see combiners guide [here][1])
.apply(Combine.<String, Iterable<Double>, Double>perKey(new SumTotals()));
PCollection<KV<String, Double> globalTotalsByName = dailyTotalsByName
// Global windows allow you to combine a running total. Triggering condition
// specifies 'when' in processing time the answers are materialized. Here we
// have chosen to output the answer each time a new daily total arrives.
.apply(Window.<KV<String, Iterable<Double>>>into(new GlobalWindows()))
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))))
// Combine daily totals
.apply(Combine.<String, Iterable<Double>, Double>perKey(new SumTotals()))
上面的代码可能不会完全按原样构建,但至少概述了一种合理的方法。当然,根据你的输入和问题的具体情况,你可能需要调整触发的频率,等等。如前所述,这只会在每天结束时给你结果。如果需要实时运行合计,可以使用更复杂的触发条件输出当前值。 这篇关于如何用APACHE BEAM计算运行总数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!