只是想确保我了解Ack-ing在Storm中的工作方式。
我有1个喷嘴和2个 bolt 连接在一起。 Spout向Bolt1发出元组,而Bolt1又向Bolt 2发出元组。我希望Bolt 2确认从Spout发送的初始元组,我不确定如何。
为了保证容错能力(即,重发元组),我想在 bolt 2中确认Spout发出的元组,以防万一在过程中某个地方发生故障,以便可以重发。
考虑以下示例:
喷口:
_collector.emit(new Values(queue.dequeue())
bolt 1:
def execute(tuple: Tuple) {
_collector.emit(tuple, new Values("stuff"))
}
此时,元组是喷嘴发出的元组。我可以在这里确认它,没有问题。现在添加另一个 bolt ,该 bolt 监听由Bolt1发出的元组。
Bolt2:
def execute(tuple2: Tuple) {
_collector.emit(tuple2, new Values("foo"))
}
此时,tuple2中的元组是从Bolt1(其中包含字符串“stuff”的那个)发送的元组。
因此,如果我在Bolt2中发送一个确认,这将确认Bolt1中的元组,而不是从Spout发送的元组。正确?
我该如何确认从喷嘴发送的元组?我是否应该在所有其他喷嘴上都背起初始喷嘴,以便可以在最后一个 bolt 中将其取下并确认?
我读了Nathan的教程,给人的印象是,在发出元组2之后,我可以在其中接收来自Bolt1(来自Spout)的元组。这会将新发出的元组2链接到Spout发送的原始元组,因此,当Bolt2终止元组2时,实际上会从Spout终止原始元组。这是真的?
让我知道我是否在解释中遗漏了一些东西。
最佳答案
对于那些感兴趣的人,我通过询问 Storm 小组找到了解决方案。
我需要的是在Spout中以以下方式发出元组(具有唯一ID):
喷口:
//ties in tuple to this UID
_collector.emit(new Values(queue.dequeue(), *uniqueID*)
然后,Bolt1仅在将其发送给Bolt2之后才确认该元组。
bolt 1:
//emit first then ack
_collector.emit(tuple, new Values("stuff")) //**anchoring** - read below to see what it means
_collector.ack(tuple)
此时,Bolt1中已经确认了来自Spout的元组,但是与此同时,从Spout中将新发射的元组“ Material ”“ anchor 定”到了Bolt2中。这意味着它仍然需要稍后再确认,否则在超时时将被spout重新发送。
bolt 2:
_collector.ack(tuple)
Bolt2需要确认从Bolt1收到的元组,这将发送Spout等待的最后一个确认。如果此时Bolt2发出元组,则必须有一个Bolt3将其获取并确认。如果没有在最后一点确认元组,则Spout会将其超时并重新发送。
每次在一个 bolt 之间的
emit
语句上完成 anchor 定时,都会在“树”结构中构建一个新节点...由于我从未将相同的元组发送到2个或更多的元组,因此更像是一个列表,我有一对一的关系。树中的所有节点都需要确认,然后才将元组标记为完全到达。如果元组没有被确认,并以UID发送并在以后 anchor 定,则它将被永久保存在内存中(直到被确认)。
希望这可以帮助。