Trident是基于Storm进行实时留处理的高级抽象,提供了对实时流4的聚集,投影,过滤等操作,从而大大减少了开发Storm程序的工作量。Trident还提供了针对数据库或则其他持久化存储的有状态的,增量的更新操作的原语。

  若我们要开发一个对文本中的词频进行统计的程序,使用Storm框架的话我们需要开发三个Storm组件:

    1.一个Spout负责收集文本信息并分段,做为sentence字段发送给下游的Bolt

    2.一个Bolt将将每段文本粉刺,将分词结果以word字段发送给下游的Bolt

    3.一个Bolt对词频进行统计,把统计结果记录在count字段并存储

  如果使用Trident我们可以使用一下代码完成上述操作:

 FixedBatchSpout spout = new FixedBatchSpout(new Fields("setence"),,
new Values("the cow jump over the moon"),
new Values("the man went to the store and bought some candy"),
new Values("four score and seven years ago"),
new Values("how many apples can you eat"));
spout.setCycle(true);
TridentTopology topology = new TridentTopology();
TridentState workcount = topology.newStream("spout",spout)
.each(new Fields("setence"),new Split(),new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(),new Count(),new Fields("count"))
.parallelismHint();

  上述这段代码会被Trident框架转为为使用Storm开发时的三个步骤

  代码的前两行使用FixedBatchSpout不断循环生成参数里列出的四个句子,第7行声明了TridentTopology对象,并在第8行的newStream方法中引用了FixedBatchSpout。Trident是按批处理数据的,FixedBatchSpout生成的数据是按照下图的方式一批一批的发送到下一个处理单元的,后续处理单元也是按照这种方式把数据发送到其他节点。

  Storm Trident详解-LMLPHP

  在上述的第9行使用Split对文本分词,并发分词结果存储到Word字段中Split的定义如下:

 public class Split extends BaseFunction {

     @Override
public void execute(TridentTuple tuple, TridentCollector collector) {
for(String word: tuple.getString().split(" ")) {
if(word.length() > ) {
collector.emit(new Values(word));
}
}
} }

  在each方法宏也可以实现过滤,如只统计单词长度超过10个字母长度的单词的过滤可以定义如下:

.each(new Fields("word"), new BaseFilter() {
public boolean isKeep(TridentTuple tuple) {
return tuple.getString().length() > ;
}
})

  代码gropuBy(new Fields("word"))对word进行聚集操作,并在其后使用Count对象进行计数。之后将得到的结果储存到内存中。Trident不仅支持将结果存储到内存中,也支持将结果存储到其他的介质,如数据库,Memcached。如要将最终结果以key-value的方式存储到Memcached,可以使用下面的方式:

persistentAggregate(new Memcached.transactional(local),new Count(),new Fields("count"))

  实时任务的关键问题是如何处理对数据更新的幂等问题,任务可能失败或则重启,因此更新操作可能被重复执行。以上述为例,发送到Count的数据可能因为节点的重启或则网络故障导致的其他原因致使被重复发送,从而引起数据的重复统计,为了避免这个问题Trident提供了事物支持,由于数据是按批发送到Count节点的,Trident对每批单词都分配一个Transaction id。上面的代码中,每完成一批单词的统计,就将这批数据的统计结果连同Transaction id一起存储到Memcached中。数据更新的时候,Trident会比较Memcached中的Transaction id和新到达数据的Transaction id,如果同一批数据被重复发送,其Transaction id就会等于Memcached存储的Transaction id,新数据将会被忽略。另外每批数据的Transaction id是有严格的顺序的Transaction id 为2的数据没有处理完的情况下,绝对不会处理Transaction id为3的数据。

  有时,一个任务有多个数据源,每一个数据源都是以TridentState的形式出现在任务定义中的,比如上面提到的wordcount任务生成的数据就可以被其他的任务所使用,可以使用stateQuery方法引用别的TridentState,stateQuery的定义如下:

Stream stateQuery(TridentState state, QueryFunction function, Fields functionFields)

  Trident的数据模型称作"TridentTuple"---带名字的Values列表。在Topology中,tuple是在顺序的操作集合中增量生成的。Operation通常包含一组输入字段和提交的功能字段。Operation的输入字段通常是将tuple中的一个子集作为操作集合的输入,而功能字段则是命名提交的字段。

  例如,声明一个名为"students"的Stream,可能包含名字,性别,学号,分数等字段。添加一个按分数过滤的过滤器ScoreFilter,使得tuples只过滤分数大于60的学生,定义一个分数过滤器,当选择输入字段的时候Trident会自动过滤出一个子集,该操作十分的高效。

class ScoreFilter extends BaseFilter{

        public boolean isKeep(TridentTuple tuple) {
return tuple.getInteger() >= ;
}
}

  如果我们相对字段进行计算,并且提交给TridentTuple,可以模拟一下计算。

class AddAndSubFuction extends BaseFunction{

        public void execute(TridentTuple tuple, TridentCollector collector) {
int res1 = tuple.getInteger();
int res2 = tuple.getInteger();
int sub = res1 > res2 ? res1 - res2 : res2 - res1;
collector.emit(new Values(res1+res2,sub));
}
}

  此函数接收两个整数作为参数,并计算两个数的和以及差,作为两个新的Fields提交。

05-17 18:28