本文介绍了风暴喷口没有得到阿克的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经开始使用Storm,所以我使用本教程

I have started using storm, so I create simple topology using this tutorial

当我使用LocalCluster运行拓扑时,一切似乎都很好,我的问题是我没有在元组上收到ACK,这意味着我的嘴ack从未被调用.

When I run my topology with LocalCluster and all seem fine,My Problem is that I'm not getting ACK on the tuple, meaning my spout ack is never called.

我的代码在下面-您知道为什么不调用ack吗?

my code is below - do you know why ack is not called ?

所以我的拓扑看起来像这样

so my topology look like this

public StormTopology build() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(HelloWorldSpout.class.getSimpleName(), 
             helloWorldSpout, spoutParallelism);

        HelloWorldBolt bolt = new  HelloWorldBolt();

        builder.setBolt(HelloWorldBolt.class.getSimpleName(), 
                   bolt, boltParallelism)
              .shuffleGrouping(HelloWorldSpout.class.getSimpleName());
}

我的Spout看起来像这样

My Spout look like this

public class HelloWorldSpout  extends BaseRichSpout implements ISpout {
    private SpoutOutputCollector collector;

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("int"));
    }

    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
        this.collector = collector;
    }

    private static Boolean flag = false;
    public void nextTuple() {
        Utils.sleep(5000);

            //emit only 1 tuple - for testing
        if (!flag){
            this.collector.emit(new Values(6));
            flag = true;
        }
    }

    @Override
    public void ack(Object msgId) {
        System.out.println("[HelloWorldSpout] ack on msgId" + msgId);
    }

    public void fail(Object msgId){
        System.out.println("[HelloWorldSpout] fail on msgId" + msgId);
    }
}

我的螺栓看起来像这样

@SuppressWarnings("serial")
public class HelloWorldBolt extends BaseRichBolt{
    private OutputCollector collector;

    public void prepare(Map conf, TopologyContext context, 
                    OutputCollector collector) {
        this.collector = collector;
        logger.info("preparing HelloWorldBolt");
    }

    public void execute(Tuple tuple) {
        System.out.println("[HelloWorldBolt] got" + tuple.getInteger(0));
        this.collector.ack(tuple);
    }

    public void cleanup() {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub

    }
}

推荐答案

在spout中,您的generate()方法只有一个参数,因此不会锚定元组.这就是为什么即使您要确认螺栓中的元组,也无法在喷嘴中调用ack()方法的原因.

Your emit() method in the spout has only one argument, so that tuple isn't anchored. That's why you're not getting a call back to the ack() method in the spout even though you're ack'ing the tuple in the bolt.

要使其正常工作,您需要修改喷嘴以发出第二个参数,即消息ID.正是这个id传递回了spout中的ack()方法:

To get this to work, you need to modify your spout to emit a second argument which is the message id. It is this id that's passed back to the ack() method in the spout:

public void nextTuple() {
    Utils.sleep(5000);

        //emit only 1 tuple - for testing
    if (!flag){
        Object msgId = "ID 6";  // this can be any object
        this.collector.emit(new Values(6), msgId);
        flag = true;
    }
}


@Override
public void ack(Object msgId) {
    //  msgId should be "ID 6"
    System.out.println("[HelloWorldSpout] ack on msgId" + msgId);
}

这篇关于风暴喷口没有得到阿克的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-16 03:15