在Dataflow批处理执行之后,是否有任何方法以编程方式提取聚合器的最终值?

基于DirectePipelineRunner类,我编写了以下方法。它似乎可以工作,但是对于动态创建的计数器,它提供的值与控制台输出中显示的值不同。

PS。如果有帮助,我假设聚合器基于Long值,并且具有求和合并功能。

public static Map<String, Object> extractAllCounters(Pipeline p, PipelineResult pr)
{
    AggregatorPipelineExtractor aggregatorExtractor = new AggregatorPipelineExtractor(p);
    Map<String, Object> results = new HashMap<>();

    for (Map.Entry<Aggregator<?, ?>, Collection<PTransform<?, ?>>> e :
            aggregatorExtractor.getAggregatorSteps().entrySet()) {
        Aggregator agg = e.getKey();
        try {
            results.put(agg.getName(), pr.getAggregatorValues(agg).getTotalValue(agg.getCombineFn()));
        } catch(AggregatorRetrievalException|IllegalArgumentException aggEx) {
            //System.err.println("Can't extract " + agg.getName() + ": " + aggEx.getMessage());
        }
    }

    return results;
}

最佳答案

聚合器的值应在PipelineResult中可用。例如:

CountOddsFn countOdds = new CountOddsFn();
pipeline
  .apply(Create.of(1, 3, 5, 7, 2, 4, 6, 8, 10, 12, 14, 20, 42, 68, 100))
  .apply(ParDo.of(countOdds));
PipelineResult result = pipeline.run();
// Here you may need to use the BlockingDataflowPipelineRunner

AggregatorValues<Integer> values =
result.getAggregatorValues(countOdds.aggregator);
Map<String, Integer> valuesAtSteps = values.getValuesAtSteps();
// Now read the values from the step...


报告聚集器的示例DoFn

private static class CountOddsFn extends DoFn<Integer, Void> {

  Aggregator<Integer, Integer> aggregator =
    createAggregator("odds", new SumIntegerFn());

  @Override
  public void processElement(ProcessContext c) throws Exception {
    if (c.element() % 2 == 1) {
      aggregator.addValue(1);
    }
  }
}

关于java - 在批处理中提取聚合器值,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/38646195/

10-10 17:57