问题描述
我已经开始使用Storm,所以我使用本教程
I have started using storm, so I create simple topology using this tutorial
当我使用LocalCluster
运行拓扑时,一切似乎都很好,我的问题是我没有在元组上收到ACK,这意味着我的嘴ack
从未被调用.
When I run my topology with LocalCluster
and all seem fine,My Problem is that I'm not getting ACK on the tuple, meaning my spout ack
is never called.
我的代码在下面-您知道为什么不调用ack
吗?
my code is below - do you know why ack
is not called ?
所以我的拓扑看起来像这样
so my topology look like this
public StormTopology build() {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(HelloWorldSpout.class.getSimpleName(),
helloWorldSpout, spoutParallelism);
HelloWorldBolt bolt = new HelloWorldBolt();
builder.setBolt(HelloWorldBolt.class.getSimpleName(),
bolt, boltParallelism)
.shuffleGrouping(HelloWorldSpout.class.getSimpleName());
}
我的Spout看起来像这样
My Spout look like this
public class HelloWorldSpout extends BaseRichSpout implements ISpout {
private SpoutOutputCollector collector;
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("int"));
}
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
}
private static Boolean flag = false;
public void nextTuple() {
Utils.sleep(5000);
//emit only 1 tuple - for testing
if (!flag){
this.collector.emit(new Values(6));
flag = true;
}
}
@Override
public void ack(Object msgId) {
System.out.println("[HelloWorldSpout] ack on msgId" + msgId);
}
public void fail(Object msgId){
System.out.println("[HelloWorldSpout] fail on msgId" + msgId);
}
}
我的螺栓看起来像这样
@SuppressWarnings("serial")
public class HelloWorldBolt extends BaseRichBolt{
private OutputCollector collector;
public void prepare(Map conf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
logger.info("preparing HelloWorldBolt");
}
public void execute(Tuple tuple) {
System.out.println("[HelloWorldBolt] got" + tuple.getInteger(0));
this.collector.ack(tuple);
}
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
}
推荐答案
在spout中,您的generate()方法只有一个参数,因此不会锚定元组.这就是为什么即使您要确认螺栓中的元组,也无法在喷嘴中调用ack()方法的原因.
Your emit() method in the spout has only one argument, so that tuple isn't anchored. That's why you're not getting a call back to the ack() method in the spout even though you're ack'ing the tuple in the bolt.
要使其正常工作,您需要修改喷嘴以发出第二个参数,即消息ID.正是这个id传递回了spout中的ack()方法:
To get this to work, you need to modify your spout to emit a second argument which is the message id. It is this id that's passed back to the ack() method in the spout:
public void nextTuple() {
Utils.sleep(5000);
//emit only 1 tuple - for testing
if (!flag){
Object msgId = "ID 6"; // this can be any object
this.collector.emit(new Values(6), msgId);
flag = true;
}
}
@Override
public void ack(Object msgId) {
// msgId should be "ID 6"
System.out.println("[HelloWorldSpout] ack on msgId" + msgId);
}
这篇关于风暴喷口没有得到阿克的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!