我正在运行一个 Storm 三叉戟拓扑,在两个不同的流中具有两个两个不同的喷嘴。我的喷口是JMS喷口,并使用HDFS State来保留元组。
如果我只运行一个喷口,那就很好,我正在将所有记录发布到HDFS的JMS队列中。
与连接到两个不同队列的两个喷嘴一起运行拓扑时,与我在QUEUE中发布的记录相比,我得到的记录更少。我在这里做错什么了吗?请让我知道我执行此操作时是否有任何问题。
TridentTopology topology = new TridentTopology();
topology.newStream("QueueSpout", TridentSpout).partitionPersist(tradeStateFactory,hdfsFields, new HdfsUpdater());
Stream TopicStream = topology.newStream("TopicSpout", TridentTopicSpout);
TopicStream.each(hdfsFields, new CashFilter()).partitionPersist(cashStateFactory, hdfsFields, new HdfsUpdater());
TopicStream.each(hdfsFields, new JournalFilter()).partitionPersist(journalStateFactory, hdfsFields, new HdfsUpdater());
TopicStream.each(hdfsFields, new RcvdlvrFilter()).partitionPersist(rcvdlvrStateFactory, hdfsFields, new HdfsUpdater());
最佳答案
下面的配置可以很好地与拓扑配合使用。
发生这种情况是因为我没有使用分区分组。
使用全局和批处理全局后,它工作正常。
shuffle :使用随机循环算法在所有目标分区之间平均分配元组
broadcast :每个元组都复制到所有目标分区。这在DRPC期间很有用-例如,如果您需要对每个数据分区执行stateQuery。
partitionBy :partitionBy接受一组字段,并根据该组字段进行语义分区。这些字段将根据目标分区的数量进行散列和修改,以选择目标分区。 partitionBy保证相同的字段集始终进入相同的目标分区。
全局:所有元组都发送到同一分区。为流中的所有批次选择相同的分区。
batch全局:批处理中的所有元组都发送到同一分区。流中的不同批次可能会转到不同的分区。
分区:此方法采用一个自定义分区功能,该功能实现backtype.storm.grouping.CustomStreamGrouping
下面的拓扑配置可以正常运行。
TridentTopology topology = new TridentTopology();
topology.newStream("QueueSpout", TridentSpout).batchGlobal().partitionPersist(tradeStateFactory,hdfsFields, new HdfsUpdater());
Stream TopicStream = topology.newStream("TopicSpout", TridentTopicSpout).global();
TopicStream.each(hdfsFields, new CashFilter()).partitionPersist(cashStateFactory, hdfsFields, new HdfsUpdater());
TopicStream.each(hdfsFields, new JournalFilter()).partitionPersist(journalStateFactory, hdfsFields, new HdfsUpdater());
TopicStream.each(hdfsFields, new RcvdlvrFilter()).partitionPersist(rcvdlvrStateFactory, hdfsFields, new HdfsUpdater());
关于hadoop - 将三元组放入HDFS时,Storm Trident拓扑缺少元组,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/33697536/