目前,我正在一个项目中,该项目跨四个Unix主机设置了一个Storm集群。

拓扑本身如下:


JMS Spout侦听MQ以获取新消息
JMS Spout解析,然后将结果发送到Esper Bolt
然后,Esper Bolt处理事件并将结果发送到JMS Bolt
然后,JMS Bolt将消息发布到另一个主题上的MQ上


我意识到Storm是一个“至少一次”的框架。但是,如果我收到5个事件并将它们传递给Esper Bolt进行计数,则由于某种原因,我将在JMS Bolt中收到5个计数结果(所有值相同)。

理想情况下,我想接收一个结果输出,是否可以通过某种方式告诉Storm忽略重复的元组?

我认为这与我设置的并行性有关,因为当我只有一个线程时,它可以按预期工作:

 TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout(JMS_DATA_SPOUT, new JMSDataSpout(),2).setNumTasks(2);
    builder.setBolt("esperBolt", new EsperBolt.Builder().build(),6).setNumTasks(6)
            .fieldsGrouping(JMS_DATA_SPOUT,new Fields("eventGrouping"));
    builder.setBolt("jmsBolt", new JMSBolt(),2).setNumTasks(2).fieldsGrouping("esperBolt", new Fields("eventName"));


我也看到Trident具有“完全一次”的语义。我不完全相信这会解决这个问题。

最佳答案

如果您的Esper Bolt没有在其execute()方法的末尾显式地确认()每个元组或使用iBasicBolt实现,则超时后,它收到的每个元组最终将由源JMS Spout重播。

另外,如果您要求螺栓“仅处理唯一消息”,请考虑将此处理行为添加到execute()方法中。它可以首先检查本地Guava缓存中元组值的唯一性,然后进行相应处理。

关于java - Storm 集群重复元组,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/23764293/

10-10 04:08