我有一个交换消息的发件人和使用者:
public class Sender extends AbstractVerticle {
@Override
public void start() {
EventBus eventBus = vertx.eventBus();
eventBus.send(Constants.ADDRESS, "Hello from sender", res -> {
if (res.succeeded()) {
System.out.println("Successfully sent reply");
} else {
System.out.println("Failed to send reply." + res.cause());
}
});
eventBus.consumer(Constants.ADDRESS, msg -> System.out.println("received msg from consumer:" + msg.body()));
}
public class Consumer extends AbstractVerticle{
protected EventBus eventBus = null;
@Override
public void start() {
eventBus = vertx.eventBus();
eventBus.consumer(Constants.ADDRESS, msg -> msg.reply("Hi from consumer.", res -> {
if (res.succeeded()) {
System.out.println("Successfully sent reply");
} else {
System.out.println("Failed to send reply." + res.cause());
}
}));
}
}
我希望当消费者回复邮件时,发件人将收到该邮件。但是,我超时了:
Successfully sent reply
Failed to send reply.(TIMEOUT,-1) Timed out after waiting 30000(ms) for a reply. address: 2, repliedAddress: 1
部署:
public class ServiceLauncher {
private static Vertx vertx = Vertx.vertx();
public static void main(String[] args) {
vertx.deployVerticle(new Consumer(), res -> {
if (res.succeeded()) {
System.out.println("Verticle " + Consumer.NAME + " deployed.");
vertx.deployVerticle(new Sender());
System.out.println("Verticle " + Sender.NAME + " deployed.");
} else {
System.out.println("Verticle " + Consumer.NAME + " not deployed.");
}
});
}
我究竟做错了什么?提前感谢
更新:问题出在msg.reply()中-使用者没有回复消息,但我仍然不知道为什么。
最佳答案
超时不是在请求的发送者中发生的,而是在其接收者中发生的。
在msg.reply()
中定义的处理程序等待发送方的下一个答复。它不是处理程序,仅确认发送状态。
当发件人收到回复时,Sender
的eventBus.send()
中的处理程序也会触发。
只需删除msg.reply()
中的处理程序,并以相同的方式在eventBus.send()
中修改处理程序Sender
:
public class Sender extends AbstractVerticle {
public static final String NAME = "SENDER";
@Override
public void start() {
EventBus eventBus = vertx.eventBus();
eventBus.send(Constants.ADDRESS, "Hello from sender", res -> {
if (res.succeeded()) {
System.out.println("Successfully received reply: " + res.result().body());
} else {
System.out.println("Failed to send reply." + res.cause());
}
});
}
}
和
public class Consumer extends AbstractVerticle {
public static final String NAME = "CONSUMER";
@Override
public void start() {
final EventBus eventBus = vertx.eventBus();
eventBus.consumer(Constants.ADDRESS, msg -> {
System.out.println("Message received");
msg.reply("Hi from consumer.");
});
}
}
执行后,您将看到:
Verticle CONSUMER deployed.
Verticle SENDER deployed.
Message received
Successfully received reply: Hi from consumer.
关于vert.x - Vertx:邮件回复超时,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/49449257/