一 可靠性简单介绍
全然处理的意思是该MessageId绑定的源Tuple以及由该源Tuple衍生的全部Tuple都经过了Topology中每个应该到达的Bolt的处理。
能够通过Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 来指定
watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvemhhbmd6aGVianV0/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="">
默认值是1。 假设你的topology里面的tuple比較多的话, 那么把acker的数量设置多一点。效率会高一点。
每一个tuple知道它的祖宗的id(从spout发出来的那个tuple的id),
每当你新发射一个tuple, 它的祖宗id都会传给这个新的tuple。
所以当一个tuple被ack的时候,它会发一个消息给acker。告诉它这个tuple树发生了怎么样的变化。详细来说就是它告诉acker: 我已经完毕了, 我有这些儿子tuple, 你跟踪一下他们吧。
tmp-ark-val = tuple-id ^ (child-tuple-id1 ^ child-tuple-id2 ... )
task并不显式的跟踪tuple树。
对于那些有成千上万个节点的tuple树,把这么多的tuple信息都跟踪起来会耗费太多的内存。相反, acker用了一种不同的方式, 使得对于每一个spout tuple所须要的内存量是恒定的(20 bytes) . 这个跟踪算法是storm怎样工作的关键,而且也是它的主要突破。
这个对子的第一个值是创建这个tuple的taskid, 这个是用来在完毕处理tuple的时候发送消息用的。 第二个值是一个64位的数字称作:ack val,
ack val是整个tuple树的状态的一个表示,无论这棵树多大。
它仅仅是简单地把这棵树上的全部创建的tupleid/ack的tupleid一起异或(XOR)。
task 发现一个 ack val变成0了。 它知道这棵树已经处理完毕了。
Topology
id映射找到相应的Spout,然后调用该Spout的ack方法。
watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvemhhbmd6aGVianV0/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="">
注:1. Acker (ack bolt)组件由系统自己主动产生。一般来说一个topology仅仅有一个ack bolt(当然能够通过配置參数指定多个),当bolt处理并下发完tuple给下一跳的bolt时,会发送一个ack给ack
bolt。
ack bolt通过简单的异或原理(即同一个数与自己异或结果为零)来判定从spout发出的某一个Tuple是否已经被全然处理完成。
假设结果为真,ack bolt发送消息给spout,spout中的ack函数被调用并运行。
假设超时,则发送fail消息给spout,spout中的fail函数被调用并运行。spout中的ack和fail的处理逻辑由用户自行填写。
就ack一个tuple。那么tuple的id都要跟这个校验值异或一下,而且把得到的值更新为ack-val的新值。
那么如果每一个发射出去的tuple都被ack了, 那么最后ack-val一定是0(由于一个数字跟自己异或得到的值是0)。
就算每秒发生10000个ack, 那么须要50000000万年才可能碰到一个错误。并且就算碰到了一个错误, 也仅仅有在这个tuple失败的时候才会造成数据丢失。
tuple都会超时,也就会被又一次处理。
Spout挂掉了: 在这样的情况下给spout发送消息的消息源负责又一次发送这些消息。比方Kestrel和RabbitMQ在一个client断开之后会把全部”处理中“的消息放回队列。
在Spout中,Storm系统会为用户指定的MessageId生成一个相应的64位的整数。作为整个Tuple Tree的RootId。RootId会被传递给Acker以及兴许的Bolt来作为该消息单元的唯一标识。同一时候,不管Spout还是Bolt每次新生成一个Tuple时,都会赋予该Tuple一个唯一的64位整数的Id。
而当Bolt处理完一个输入Tuple并产生出新的Tuple时,也会告知Acker自己处理的输入Tuple的Id以及新生成的那些Tuple的Id。Acker仅仅须要对这些Id进行异或运算,就能推断出该RootId相应的消息单元是否成功处理完毕了。
以下这个是spout要实现的接口:
public interface ISpout extends Serializable {
void open(Map conf, TopologyContext context,
SpoutOutputCollector collector);
void close();
void nextTuple();
void ack(Object msgId);
void fail(Object msgId);
}
Spout通过open方法參数里面提供的SpoutOutputCollector来发射新tuple到它的当中一个输出消息流, 发射tuple的时候spout会提供一个message-id, 后面通过这个message-id来追踪这个tuple。
this.collector.emit(new Values("hello world"),msgId);
注:msgId是提供给Acker组件使用的,Acker组件使用msgId来跟踪Tuple树
接下来。 这个发射的tuple被传送到消息处理者bolt那里, storm会跟踪由此所产生的这课tuple树。假设storm检測到一个tuple被全然处理了, 那么storm会以最開始的那个message-id作为參数去调用消息源的ack方法;反之storm会调用spout的fail方法。
值得注意的是。
storm调用ack或者fail的task始终是产生这个tuple的那个task。所以假设一个spout被分成非常多个task来运行。 消息运行的成功失败与否始终会通知最開始发出tuple的那个task。
作为storm的使用者。有两件事情要做以更好的利用storm的可靠性特征。
首先。在你生成一个新的tuple的时候要通知storm;
其次,完毕处理一个tuple之后要通知storm。 这样storm就能够检測整个tuple树有没有完毕处理。而且通知源spout处理结果。storm提供了一些简洁的api来做这些事情。
这个bolt把一个包括一个句子的tuple切割成每一个单词一个tuple。
public class SplitSentence implements IRichBolt {
OutputCollector _collector; public void prepare(Map conf,
TopologyContext context,
OutputCollector collector) {
_collector = collector;
} public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
_collector.emit(tuple,new Values(word));
}
_collector.ack(tuple);
} publicvoid cleanup() {}
publicvoid declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(newFields("word"));
}
}
_collector.emit(new Values(word));
用这样的方法发射会导致新发射的这个tuple脱离原来的tuple树(unanchoring), 假设这个tuple处理失败了, 整个句子不会被又一次处理。一个输出tuple能够被anchoring到多个输入tuple。这样的方式在stream合并或者stream聚合的时候非常实用。一个多入口tuple处理失败的话,那么它相应的全部输入tuple都要又一次运行。
看看以下演示怎么指定多个输入tuple:
new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors,new Values(1,2,3));
SplitSentence
的样例,你能够看到“句子tuple”在全部“单词tuple”被发出之后调用了ack。
你能够调用OutputCollector
的fail方法去立即将从消息源头发出的那个tuple标记为fail。 比方你查询了数据库,发现一个错误,你能够立即fail那个输入tuple。 这样能够让这个tuple被高速的又一次处理,
由于你不须要等那个timeout时间来让它自己主动fail。
每一个你处理的tuple,
必须被ack或者fail。由于storm追踪每个tuple要占用内存。所以假设你不ack/fail每个tuple。 那么终于你会看到OutOfMemory错误。
<pre name="code" class="java"> publicclass SplitSentence implements IBasicBolt {
public void prepare(Map conf,
TopologyContext context) {
} public void execute(Tuple tuple,
BasicOutputCollector collector) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
collector.emit(newValues(word));
}
} publicvoid cleanup() {} publicvoid declareOutputFields(
OutputFieldsDeclarer declarer) {
declarer.declare(newFields("word"));
}
}
这个实现比之前的实现简单多了。 可是功能上是一样的,发送到BasicOutputCollector的tuple会自己主动和输入tuple相关联,而在execute方法结束的时候那个输入tuple会被自己主动ack的。
作为对照,处理聚合和合并的bolt往往要处理一大堆的tuple之后才干被ack, 而这类tuple通常都是多输入的tuple。 所以这个已经不是IBasicBolt能够罩得住的了。
而且它须要更少的id来保存下游的tuple, 降低带宽占用。
这样这些tuple就不在tuple树里面。 也就不会被跟踪了。