我有一个交换消息的发件人和使用者:

public class Sender extends AbstractVerticle {
@Override
public void start() {
    EventBus eventBus = vertx.eventBus();
    eventBus.send(Constants.ADDRESS, "Hello from sender", res -> {
        if (res.succeeded()) {
            System.out.println("Successfully sent reply");
        } else {
            System.out.println("Failed to send reply." + res.cause());
        }
    });
    eventBus.consumer(Constants.ADDRESS, msg -> System.out.println("received msg from consumer:" + msg.body()));
}

public class Consumer extends AbstractVerticle{
protected EventBus eventBus = null;

@Override
public void start() {
    eventBus = vertx.eventBus();

    eventBus.consumer(Constants.ADDRESS, msg -> msg.reply("Hi from consumer.", res -> {
        if (res.succeeded()) {
            System.out.println("Successfully sent reply");
        } else {
            System.out.println("Failed to send reply." + res.cause());
        }
    }));
}


}

我希望当消费者回复邮件时,发件人将收到该邮件。但是,我超时了:

Successfully sent reply

Failed to send reply.(TIMEOUT,-1) Timed out after waiting 30000(ms) for a reply. address: 2, repliedAddress: 1


部署:

public class ServiceLauncher {

private static Vertx vertx = Vertx.vertx();

public static void main(String[] args) {
    vertx.deployVerticle(new Consumer(), res -> {
        if (res.succeeded()) {
            System.out.println("Verticle " + Consumer.NAME + " deployed.");
            vertx.deployVerticle(new Sender());
            System.out.println("Verticle " + Sender.NAME + " deployed.");
        } else {
            System.out.println("Verticle " + Consumer.NAME + " not deployed.");
        }
    });
}


我究竟做错了什么?提前感谢

更新:问题出在msg.reply()中-使用者没有回复消息,但我仍然不知道为什么。

最佳答案

超时不是在请求的发送者中发生的,而是在其接收者中发生的。
msg.reply()中定义的处理程序等待发送方的下一个答复。它不是处理程序,仅确认发送状态。
当发件人收到回复时,SendereventBus.send()中的处理程序也会触发。

只需删除msg.reply()中的处理程序,并以相同的方式在eventBus.send()中修改处理程序Sender

public class Sender extends AbstractVerticle {
    public static final String NAME = "SENDER";

    @Override
    public void start() {
        EventBus eventBus = vertx.eventBus();
        eventBus.send(Constants.ADDRESS, "Hello from sender", res -> {
            if (res.succeeded()) {
                System.out.println("Successfully received reply: " + res.result().body());
            } else {
                System.out.println("Failed to send reply." + res.cause());
            }
        });

    }
}




public class Consumer extends AbstractVerticle {
    public static final String NAME = "CONSUMER";

    @Override
    public void start() {
        final EventBus eventBus = vertx.eventBus();
        eventBus.consumer(Constants.ADDRESS, msg -> {
            System.out.println("Message received");
            msg.reply("Hi from consumer.");
        });

    }
}


执行后,您将看到:

Verticle CONSUMER deployed.
Verticle SENDER deployed.
Message received
Successfully received reply: Hi from consumer.

关于vert.x - Vertx:邮件回复超时,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/49449257/

10-15 09:39