我们有一个 Storm 拓扑,其中我们配置了一个喷嘴和两个 bolt 。 Spout连续查询数据库中的数据,并将其元组发送到第一个 bolt 以进行某些处理。第一个 bolt 进行一些处理并将其元组发送到第二个 bolt ,第二个 bolt 调用第三方Web服务并发送数据。因此,一段时间后发生了什么,最后一个 bolt 没有得到任何元组,并且如果我们重新启动拓扑,它就可以正常工作。这里只有最后一个 bolt 有问题。其他喷嘴和第一个 bolt 运行正常,并且我没有使用acking框架。在这种情况下,我只配置了一名 worker 。
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("messageListenrSpout", new MessageListenerSpout(), 1);
builder.setBolt("processorBolt", new ProcessorBolt(), 20).shuffleGrouping("messageListenrSpout");
builder.setBolt("notifierBolt", new NotifierBolt(),40).shuffleGrouping("processorBolt");
Config conf = new Config();
conf.put(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS, 10000);
//conf.setMessageTimeoutSecs(600);
conf.setDebug(true);
StormSubmitter.submitTopology(TOPOLOGY, conf, builder.createTopology());
最佳答案
您很可能遇到因超时导致的元组积压问题。尝试增加第二个 bolt 的并行度提示,因为听起来它的处理时间比第一个 bolt 的处理时间长得多(这就是为什么第二个 bolt 积压的原因)。如果您正在集群上运行此拓扑,请查看Storm UI以查看详细信息。
关于java - Apache Storm Bolt任务在一段时间后未收到消息,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/33467940/