我正在学习Trident框架。在Trident Stream
s上有几种方法可用于批处理中的聚合元组,包括this one,它允许使用 Aggregator
接口(interface)对元组进行有状态映射。但不幸的是,不存在内置的副本来额外保留 map 状态,就像其他9个persistentAggregate()
重载(仅以Aggregator
作为参数)一样。
因此,如何通过结合较低级别的Trident和Storm抽象和工具来实现所需的功能?探索API相当困难,因为几乎没有Javadoc文档。
换句话说,persistentAggregate()
方法允许通过更新某些持久状态来结束流处理:
stream of tuples ---> persistent state
我想通过以下方式更新持久状态并发出不同的元组:
stream of tuples ------> stream of different tuples
with
persistent state
Stream.aggregate(Fields, Aggregator, Fields)
不提供容错功能:stream of tuples ------> stream of different tuples
with
simple in-memory state
最佳答案
您可以使用TridentState#newValuesStream()方法从状态创建新的流。
这将允许您检索汇总值流。
出于说明目的,我们可以通过添加此方法和Debug Filter来改进example in Trident documentation:
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
new Values("the cow jumped over the moon"),
new Values("the man went to the store and bought some candy"),
new Values("four score and seven years ago"),
new Values("how many apples can you eat"));
spout.setCycle(true);
TridentTopology topology = new TridentTopology();
topology.newStream("spout1", spout)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
.newValuesStream().each(new Fields("count"), new Debug());
运行此拓扑将输出(到控制台)汇总计数。
希望能帮助到你
关于stream - 如何在三叉戟中映射具有持久状态的元组?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/19859566/