本文介绍了风暴集群重复元组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

目前我正在做一个项目,我在四个 Unix 主机上设置了一个 Storm 集群.

Currently I am working on a project where I have setup a Storm cluster across four Unix hosts.

拓扑本身如下:

  1. JMS Spout 侦听 MQ 以获取新消息
  2. JMS Spout 解析并将结果发送到 Esper Bolt
  3. Esper Bolt 然后处理事件并将结果发送到 JMS Bolt
  4. 然后 JMS Bolt 将消息发布回 MQ 上的不同主题

我意识到 Storm 是一个至少一次"的框架.但是,如果我收到 5 个事件并将它们传递给 Esper Bolt 进行计数,那么出于某种原因,我会在 JMS Bolt 中收到 5 个计数结果(所有值都相同).

I realize that Storm is a "at least-once" framework. However, if I receive 5 events and pass these onto the Esper Bolt for counting then for some reason I am receiving 5 count results in the JMS Bolt(all the same value).

理想情况下,我想接收一个结果输出,有什么方法可以告诉 Storm 忽略重复的元组吗?

Ideally, I want to receive one result output, is there some way I can tell Storm to ignore duplicate tuples?

我认为这与我设置的并行性有关,因为当我只有一个线程时它按预期工作:

I think this has something to do with the parallelism that I have setup because it works as expected when I just have a single thread:

 TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout(JMS_DATA_SPOUT, new JMSDataSpout(),2).setNumTasks(2);
    builder.setBolt("esperBolt", new EsperBolt.Builder().build(),6).setNumTasks(6)
            .fieldsGrouping(JMS_DATA_SPOUT,new Fields("eventGrouping"));
    builder.setBolt("jmsBolt", new JMSBolt(),2).setNumTasks(2).fieldsGrouping("esperBolt", new Fields("eventName"));

我还看到了 Trident 的恰好一次"语义.然而,我并不完全相信这会解决这个问题.

I have also seen Trident for "exactly-once" semantics. I am not fully convinced this would solve this issue however.

推荐答案

如果你的 Esper Bolt 没有在其 execute() 方法结束时显式 ack() 每个元组或使用 iBasicBolt 实现,那么它收到的每个元组将最终在超时后由您的源 JMS Spout 重放.

If your Esper Bolt does not explicitly ack() each tuple at the end of its execute() method OR use an iBasicBolt implementation, then each tuple it receives will eventually be replayed by your origin JMS Spout after a timeout.

或者,如果您要求 Bolt仅处理唯一消息",请考虑将此处理行为添加到您的 execute() 方法中.它可以首先检查本地 Guava 缓存的元组值唯一性,然后进行相应处理.

Alternatively, if you are asking your bolt to "only process unique messages" consider adding this processing behavior to your execute() method. It could first check a local Guava cache for tuple value uniqueness, then process accordingly.

这篇关于风暴集群重复元组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-23 18:10