本文介绍了Storm Cluster重复元组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

目前我正在开发一个项目,我在四个Unix主机上设置了一个Storm集群。

Currently I am working on a project where I have setup a Storm cluster across four Unix hosts.

拓扑本身如下:


  1. JMS Spout侦听MQ对于新消息

  2. JMS Spout解析然后将结果发送到Esper Bolt

  3. 然后Esper Bolt处理事件并将结果发送到JMS Bolt

  4. 然后JMS Bolt将消息发布回MQ上的另一个主题

  1. JMS Spout listens to an MQ for new messages
  2. JMS Spout parses and then emits the result to an Esper Bolt
  3. The Esper Bolt then processes the event and emits a result to a JMS Bolt
  4. The JMS Bolt then publishes the message back onto the MQ on a different topic

我意识到Storm是一个至少一次的框架。但是,如果我收到5个事件并将这些事件传递给Esper Bolt进行计数,那么由于某种原因,我在JMS Bolt中获得了5个计数结果(所有相同的值)。

I realize that Storm is a "at least-once" framework. However, if I receive 5 events and pass these onto the Esper Bolt for counting then for some reason I am receiving 5 count results in the JMS Bolt(all the same value).

理想情况下,我希望收到一个结果输出,是否有某种方法我可以告诉Storm忽略重复的元组?

Ideally, I want to receive one result output, is there some way I can tell Storm to ignore duplicate tuples?

我认为这与我设置的并行性有关,因为当我只有一个线程时它按预期工作:

I think this has something to do with the parallelism that I have setup because it works as expected when I just have a single thread:

 TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout(JMS_DATA_SPOUT, new JMSDataSpout(),2).setNumTasks(2);
    builder.setBolt("esperBolt", new EsperBolt.Builder().build(),6).setNumTasks(6)
            .fieldsGrouping(JMS_DATA_SPOUT,new Fields("eventGrouping"));
    builder.setBolt("jmsBolt", new JMSBolt(),2).setNumTasks(2).fieldsGrouping("esperBolt", new Fields("eventName"));

我也看过Trident的一次性语义。我并不完全相信这会解决这个问题。

I have also seen Trident for "exactly-once" semantics. I am not fully convinced this would solve this issue however.

推荐答案

如果你的Esper Bolt没有明确地确认()每个元组都在execute()方法结束或使用iBasicBolt实现,然后它收到的每个元组最终会在超时后由你的原始JMS Spout重放。

If your Esper Bolt does not explicitly ack() each tuple at the end of its execute() method OR use an iBasicBolt implementation, then each tuple it receives will eventually be replayed by your origin JMS Spout after a timeout.

或者,如果你要求你的螺栓只处理唯一的消息考虑将这个处理行为添加到你的execute()方法。它可以首先检查本地Guava缓存的元组值唯一性,然后相应地进行处理。

Alternatively, if you are asking your bolt to "only process unique messages" consider adding this processing behavior to your execute() method. It could first check a local Guava cache for tuple value uniqueness, then process accordingly.

这篇关于Storm Cluster重复元组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

07-30 02:50