本文介绍了Storm Spout 没有得到确认的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经开始使用storm,所以我使用本教程

I have started using storm, so I create simple topology using this tutorial

当我使用 LocalCluster 运行我的拓扑时,一切看起来都很好,我的问题是我没有在元组上收到 ACK,这意味着我的 spout ack 永远不会被调用.

When I run my topology with LocalCluster and all seem fine,My Problem is that I'm not getting ACK on the tuple, meaning my spout ack is never called.

我的代码如下 - 你知道为什么不调用 ack 吗?

my code is below - do you know why ack is not called ?

所以我的拓扑是这样的

public StormTopology build() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(HelloWorldSpout.class.getSimpleName(), 
             helloWorldSpout, spoutParallelism);

        HelloWorldBolt bolt = new  HelloWorldBolt();

        builder.setBolt(HelloWorldBolt.class.getSimpleName(), 
                   bolt, boltParallelism)
              .shuffleGrouping(HelloWorldSpout.class.getSimpleName());
}

我的 Spout 看起来像这样

My Spout look like this

public class HelloWorldSpout  extends BaseRichSpout implements ISpout {
    private SpoutOutputCollector collector;

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("int"));
    }

    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
        this.collector = collector;
    }

    private static Boolean flag = false;
    public void nextTuple() {
        Utils.sleep(5000);

            //emit only 1 tuple - for testing
        if (!flag){
            this.collector.emit(new Values(6));
            flag = true;
        }
    }

    @Override
    public void ack(Object msgId) {
        System.out.println("[HelloWorldSpout] ack on msgId" + msgId);
    }

    public void fail(Object msgId){
        System.out.println("[HelloWorldSpout] fail on msgId" + msgId);
    }
}

我的螺栓看起来像这样

@SuppressWarnings("serial")
public class HelloWorldBolt extends BaseRichBolt{
    private OutputCollector collector;

    public void prepare(Map conf, TopologyContext context, 
                    OutputCollector collector) {
        this.collector = collector;
        logger.info("preparing HelloWorldBolt");
    }

    public void execute(Tuple tuple) {
        System.out.println("[HelloWorldBolt] got" + tuple.getInteger(0));
        this.collector.ack(tuple);
    }

    public void cleanup() {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub

    }
}

推荐答案

你在 spout 中的 emit() 方法只有一个参数,所以元组没有被锚定.这就是为什么即使您确认了 bolt 中的元组,您也没有在 spout 中收到对 ack() 方法的回调.

Your emit() method in the spout has only one argument, so that tuple isn't anchored. That's why you're not getting a call back to the ack() method in the spout even though you're ack'ing the tuple in the bolt.

要使其工作,您需要修改您的 spout 以发出第二个参数,即消息 ID.正是这个 id 传递回了 spout 中的 ack() 方法:

To get this to work, you need to modify your spout to emit a second argument which is the message id. It is this id that's passed back to the ack() method in the spout:

public void nextTuple() {
    Utils.sleep(5000);

        //emit only 1 tuple - for testing
    if (!flag){
        Object msgId = "ID 6";  // this can be any object
        this.collector.emit(new Values(6), msgId);
        flag = true;
    }
}


@Override
public void ack(Object msgId) {
    //  msgId should be "ID 6"
    System.out.println("[HelloWorldSpout] ack on msgId" + msgId);
}

这篇关于Storm Spout 没有得到确认的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-17 18:50