在Dataflow批处理执行之后,是否有任何方法以编程方式提取聚合器的最终值?
基于DirectePipelineRunner类,我编写了以下方法。它似乎可以工作,但是对于动态创建的计数器,它提供的值与控制台输出中显示的值不同。
PS。如果有帮助,我假设聚合器基于Long值,并且具有求和合并功能。
public static Map<String, Object> extractAllCounters(Pipeline p, PipelineResult pr)
{
AggregatorPipelineExtractor aggregatorExtractor = new AggregatorPipelineExtractor(p);
Map<String, Object> results = new HashMap<>();
for (Map.Entry<Aggregator<?, ?>, Collection<PTransform<?, ?>>> e :
aggregatorExtractor.getAggregatorSteps().entrySet()) {
Aggregator agg = e.getKey();
try {
results.put(agg.getName(), pr.getAggregatorValues(agg).getTotalValue(agg.getCombineFn()));
} catch(AggregatorRetrievalException|IllegalArgumentException aggEx) {
//System.err.println("Can't extract " + agg.getName() + ": " + aggEx.getMessage());
}
}
return results;
}
最佳答案
聚合器的值应在PipelineResult
中可用。例如:
CountOddsFn countOdds = new CountOddsFn();
pipeline
.apply(Create.of(1, 3, 5, 7, 2, 4, 6, 8, 10, 12, 14, 20, 42, 68, 100))
.apply(ParDo.of(countOdds));
PipelineResult result = pipeline.run();
// Here you may need to use the BlockingDataflowPipelineRunner
AggregatorValues<Integer> values =
result.getAggregatorValues(countOdds.aggregator);
Map<String, Integer> valuesAtSteps = values.getValuesAtSteps();
// Now read the values from the step...
报告聚集器的示例
DoFn
:private static class CountOddsFn extends DoFn<Integer, Void> {
Aggregator<Integer, Integer> aggregator =
createAggregator("odds", new SumIntegerFn());
@Override
public void processElement(ProcessContext c) throws Exception {
if (c.element() % 2 == 1) {
aggregator.addValue(1);
}
}
}
关于java - 在批处理中提取聚合器值,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/38646195/