在我的拓扑结构中,我从卡夫卡队列读取触发器消息。在接收到触发信息时,我需要向一个螺栓发出大约4096条信息。在bolt中,经过一些处理后,它将发布到另一个kafka队列(另一个拓扑稍后将使用此队列)。
我试图设置TOPOLOGY_MAX_SPOUT_PENDING
参数来限制要发送的消息数。但我看没有效果。是因为我在一个nextTuple()
方法中发出所有元组吗?如果是的话,该怎么办?
最佳答案
如果你在读卡夫卡的作品,你应该使用《暴风雨》中的KafkaSpout
。不要试图实现你自己的喷口,相信我,我在生产中使用的卡夫卡,它的工作非常顺利。每个kafka消息只生成一个元组。
正如您在this nice page from the manual上看到的,您可以如下设置topology.max.spout.pending
:
Config conf = new Config();
conf.setMaxSpoutPending(5000);
StormSubmitter.submitTopology("mytopology", conf, topology);
每个喷口设置
topology.max.spout.pending
,如果你有四个喷口,那么你的拓扑中有一个最大的非完整元组等于喷口*拓扑。另一个技巧是,您应该使用Storm UI查看
topology.max.spout.pending
是否设置正确。记住
topology.max.spout.pending
只是拓扑中未处理的元组数,拓扑永远不会停止使用来自kafka的消息,至少在生产系统上是这样……如果您想使用4096个批处理,您需要在螺栓上实现缓存逻辑,或者使用Storm以外的其他东西(面向微批处理的东西)。