问题描述
我们有一个案例如下:
担心的是,协调器从一个方法上下文发出一条消息并从另一个方法上下文得到响应:
The concern is that, the Coordinator sends out a message from a method context and gets the reponse from another:
private void forwardToVWClient(Message msg) {
vertx.eventBus().send(RESTClient.ADDRESS, msg.body(), deliveryOptions, res -> {
if (res.succeeded()) {
log.info("forwardToVWClient. VW got result : success.");
// do not reply ok until we get an OK from the Listener verticle
} else {
log.error("forwardToVWClient VW got result : failure.");
msg.fail(500, res.cause().getMessage());
}
});
}
然后我有另一个事件总线消耗方法,我在那里收到响应:
then I have another event bus consuming method where I receive the response:
vertx.eventBus().consumer(ADDRESS_RESPONSE, this::handleResponseMessage);
private void handleResponseMessage(Message msg) {
// how to reply the message received in the context of forwardToVWClient ??
}
那么,当我在 handleResponseMessage
中收到响应时,我如何在 forwardToVWClient
的上下文中回复消息?
So, how could I reply the message in the context of forwardToVWClient
, when I receive the response in the handleResponseMessage
?
到目前为止的几个想法:
- 将消息放在顶点上下文中?
- 消息对象有一个字段:
.replyAddress()
返回一个 int,我将它保存在静态 ConcurrentHashMap 中并使用它来回复特定消息.我会发布更多详细信息作为答案.
- Put the message in the vertx context ?
- The message object has a field :
.replyAddress()
that returns an int, I save that in a static ConcurrentHashMap and use it to reply a particular message. I ll post more details as an answer.
有没有更好的方法?
推荐答案
实现它的一种方法是保存消息的 replyAddress
字段并使用它来将消息发回给发起者.
one way to achieve it is to save the replyAddress
field of the message and use it to send a message back to the originator.
下面是一些简化的代码,展示了如何:
Below is some simplified code that shows how:
public class VehicleStateCoordinatorVerticle extends AbstractVerticle {
final static String ADDRESS_REQUEST = "CoordinatorRequest";
final static String ADDRESS_RESPONSE = "CoordinatorResponse";
static ConcurrentHashMap<String, VWApiRequest> pendingCommands = new ConcurrentHashMap<>();
public void start() {
vertx.eventBus().consumer(ADDRESS_REQUEST, this::handleRequestMessage);
vertx.eventBus().consumer(ADDRESS_RESPONSE, this::handleResponseMessage);
log.info("===== VehicleStateCoordinatorVerticle - bus consumer ready =====");
}
private void handleRequestMessage(Message msg) {
// .... omitted for brevity
// save the replyAddress and the command for later/callback
cmd.setReplyAddress(msg.replyAddress());
pendingCommands.put(cmd.getVwReference(), cmd);
forwardToVWClient(msg);
}
private void forwardToVWClient(Message msg) {
vertx.eventBus().send(AbstractOEMClientVerticle.ADDRESS, msg.body(), deliveryOptions, res -> {
if (res.succeeded()) {
log.info("forwardToVWClient. VW got result : success.");
// do not reply ok until we get an OK from the VWAPIServer verticle
} else {
log.error("forwardToVWClient VW got result : failure.");
msg.fail(500, res.cause().getMessage());
}
});
}
private void handleResponseMessage(Message msg) {
//..
VWApiRequest vwApiRequest = pendingCommands.get(vwReference);
if(vwApiRequest == null){
log.error("No pending vwApiRequest could be found!");
return;
}
/**
* Instead of targeting the RESTApi address,
* we use the replyAddress to target the specific message that is pending response.
*/
vertx.eventBus().send(vwApiRequest.getReplyAddress(), body, deliveryOptions, res -> {
if (res.succeeded()) {
// cheers!
}
else{
log.error("Error in handleResponseMessage {}", res.cause().getMessage());
}
});
}
这篇关于Vertx EventBus 回复“特定"消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!