本文介绍了如何用APACHE BEAM计算运行总数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用用于Google数据流的ApacheBeam SDK,我想计算一组交易的每日余额。

示例数据集可能如下所示:收款人名称、交易日期和金额:

John, 2017-12-01, 100
John, 2017-12-01, 200
Jane, 2017-12-01, 150
John, 2017-12-02, -100
John, 2017-12-02, 300

所需的结果集如下所示:

John, 2017-12-01, 300  (100 + 200)
Jane, 2017-12-01, 150
John, 2017-12-02, 500  (300 + -100 + 300)

我尝试将KV<Pair, BigDecimal>Combine.perKey函数一起使用,该函数计算BigDecimal的总和,但这不会将前一天的期末余额作为第二天的期初余额。

推荐答案

beam的窗口接口在这里使用是正确的:

https://beam.apache.org/documentation/programming-guide/#windowing

具体而言,您必须回答以下问题:

  1. 您希望在事件时间的哪个位置执行聚合?
  2. 在处理时间,您是否希望得到您的答复?
你的帖子中没有足够的信息来回答这些问题--你必须提供更多的细节--你是在批处理模式还是流模式下运行?您是想在每天结束时得到一个答案,还是想要一个每次有新交易时都会更新的汇总?或者介于两者之间的东西?如果非要我猜的话,听起来您似乎想要保留全局总计(全局事件时间窗口),并每天更新一次当前值。

回答完上述问题后,我们可以编写一些伪代码:

PCollection<KV<String, Double>> transactionsByName = .... ;     // Read input

PCollection<KV<String, Double> dailyTotalsByName = transactionsByName
  // Group by name
  .apply(GroupByKey.<String, Double>create())   
  // 1-day windows       
  .apply(Window.<KV<String, Iterable<Double>>>into(    
      FixedWindows.of(Duration.standardDays(1))))    
  // Combine each window (see combiners guide [here][1])  
  .apply(Combine.<String, Iterable<Double>, Double>perKey(new SumTotals())); 

PCollection<KV<String, Double> globalTotalsByName = dailyTotalsByName
  // Global windows allow you to combine a running total. Triggering condition
  // specifies 'when' in processing time the answers are materialized. Here we
  // have chosen to output the answer each time a new daily total arrives.
  .apply(Window.<KV<String, Iterable<Double>>>into(new GlobalWindows()))
     .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))))
  // Combine daily totals
  .apply(Combine.<String, Iterable<Double>, Double>perKey(new SumTotals()))
上面的代码可能不会完全按原样构建,但至少概述了一种合理的方法。当然,根据你的输入和问题的具体情况,你可能需要调整触发的频率,等等。如前所述,这只会在每天结束时给你结果。如果需要实时运行合计,可以使用更复杂的触发条件输出当前值。

这篇关于如何用APACHE BEAM计算运行总数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-25 01:09