前言:Vert.x 实现了2种完成不同的eventBus:
EventBusImpl(A local event bus implementation)和 它的子类 ClusteredEventBus(An event bus implementation that clusters with other Vert.x nodes)。这里介绍下EventBusImpl
EventBusImpl 原理:调用consumer方法时,以address-handler作为k-v存在一个map的容器中。接着调用send方法时,把message,DeploymentOptions等内容封装成对象(MessageIml,命令模式),从以address为k从map里取出handler.把MessageIml作为参数传递给handler运行。
一.初始化:
初始化过程就是new EventBusImpl,并修改状态变量started。
首先,在VertxImpl的构造方法
VertxImpl(VertxOptions options, Handler<AsyncResult<Vertx>> resultHandler)
中进行初始化。以 options.isClustered()为判断条件,调用createAndStartEventBus(options, resultHandler);
其次createAndStartEventBus中做了2件事
1.以options.isClustered()判断条件,new出了ClusteredEventBus/ EventBusImpl. new时并没有业务逻辑。(额外提一句eventBus = new EventBusImpl(this);使eventBus和VertImpl相互拥有对方的引用,是很常见的写法。)
2.调用EventBusImpl的初始化方法start(),并返回结果给最外层resultHandler的。start()更没做什么事,只是EventBusImpl里面有个状态变量started。把它置为true.
二. consumer订阅
EventBusImpl维护了
protected final ConcurrentMap<String, Handlers> handlerMap = new ConcurrentHashMap<>();
成员变量。
Handlers 是一个handler的List的封装类,上面可以理解为 ConcurrentMap<String, List<Handler>>这种数据结构。consumer方法以address为k,以handler作v的list的一员,存放在handlerMap中。
所以重点关注对handlerMap的操作。
调用vertx.eventBus().consumer("Address1", ar -> {});发生了什么?
查看代码发现,先new HandlerRegistration(这里也有相互引用)。再调用HandlerRegistration .handler,那里面又会调用eventBusImpl.addRegistration()。在HandlerRegistration这个类兜了一圈,又回到eventBusImpl里。
(相关代码截断如下: EventBusImpl.consumer(address);--> new HandlerRegistration --> consumer.handler-->eventBus.addRegistration(address, this, repliedAddress != null, localOnly);)
核心逻辑在addRegistration() 和 addLocalRegistration()中。我的理解是,前个方法明显有问题。最后一句addRegistration(newAddress, address, replyHandler, localOnly, registration::setResult);前面的参数都没有使用,应该可以省略,修改为addRegistration(registration::setResult);就可以。很少在Vert.x框架中看到这样不合规范的代码。如果读者有好的见解,欢迎留言。
// 调用 addLocalRegistration
// 注册完成
protected <T> void addRegistration(String address, HandlerRegistration<T> registration, boolean replyHandler, boolean localOnly) { Objects.requireNonNull(registration.getHandler(), "handler"); boolean newAddress = addLocalRegistration(address, registration, replyHandler, localOnly); addRegistration(newAddress, address, replyHandler, localOnly, registration::setResult); }
/** *
* 初始化 或 获取原 Contex
初始化 或 获取原 Handlers
* 新建 HandlerHolder
* Handlers 里添加 HandlerHolder
**/
protected <T> boolean addLocalRegistration(String address, HandlerRegistration<T> registration, boolean replyHandler, boolean localOnly) { Objects.requireNonNull(address, "address"); Context context = Vertx.currentContext(); boolean hasContext = context != null; if (!hasContext) { // Embedded context = vertx.getOrCreateContext(); } registration.setHandlerContext(context); boolean newAddress = false; HandlerHolder holder = new HandlerHolder<>(metrics, registration, replyHandler, localOnly, context); Handlers handlers = handlerMap.get(address); if (handlers == null) { handlers = new Handlers(); Handlers prevHandlers = handlerMap.putIfAbsent(address, handlers); if (prevHandlers != null) { handlers = prevHandlers; } newAddress = true; } handlers.list.add(holder); if (hasContext) { HandlerEntry entry = new HandlerEntry<>(address, registration); context.addCloseHook(entry); } return newAddress; }
新出现的几个类的作用:
Context 线程调度--Vert.x框架的优点是线程安全,就是通过Context实现。
HandlerHolder--对HandlerRegistration的封装,外加Context。
Handlers--上面HandlerHolder 的集合封装,外加平衡轮询逻辑。
handlers.list.add(holder);这句作为压轴(戏曲名词,指一场折子戏演出的倒数第二个剧目)出场完成整个功能的核心注册操作。
至于后面的那段代码,我觉得有点问题。
if (hasContext) { HandlerEntry entry = new HandlerEntry<>(address, registration); context.addCloseHook(entry); }
作用是在context上注册关闭事件,由DeploymentManager在unploy的时候调用,对应的核心逻辑在 CloseHooks.run()方法中。但这个这个判断条件案例只有第2次添加consumer的时候才有效果。或者是上面的代码boolean hasContext = context != null;给人的误导? 以上consumer的流程还被reply方法使用。
三. Send/Publish发送
多个send重载方法最后定位到EventBusImpl.send(String address, Object message, DeliveryOptions options, Handler<AsyncResult<Message<T>>> replyHandler)。但这个核心方法的最终却调用了一个
名为sendOrPubInternal的方法,不由得在让人想起写程序最难的事之一是命名。正如开头说的这个使用了设计模式中的命令模式,把参数封装成MessageImpl对象发送到后面的方法。
sendOrPubInternal做了3个事情,
1.createReplyHandlerRegistration -- 有replyHandler.reply()这步才有意义
2.new SendContextImpl -- 从Context类判断,SendContextImpl可以绑定线程
3.sendContext.next(); -- 在执行方法前,执行拦截器。拦截器极大地丰富开发人员的自定义使用。
本来应该1,2,3顺序介绍代码,但是消息流程一般是:
Sender----( message )--->customer;
Sender<---(reply message)---customer;
根据这个流程,得先介绍2.new SendContextImpl 和3.sendContext.next();
再回头介绍 1.createReplyHandlerRegistration
先说 2.new SendContextImpl
这个类是整个Send相关类的大封装。
3.sendContext.next();
根据代码流程
sendOrPub--》deliverMessageLocally--》deliverMessageLocally
进入到deliverMessageLocally(),这个方法做了2个大事情。
- 获取address所对应的所有handlers
- 根据isSend()区分 send (平衡轮询发一个handler)/publish(遍历handlers发给所有)
方法的第一句话msg.setBus(this);和reply逻辑有关系。在这个local eventbus下,是重复赋值,没有作用的。
然后Handlers handlers = handlerMap.get(msg.address());
这句根据以address为k,取出Handlers。sender的messageImpl 终于和consumer的HandlerHold见面
Handler.choose()方法实现了轮询发送message, 个人认为这个方法叫做 balanceChoose()更好。
代码如下:
public HandlerHolder choose() { while (true) { int size = list.size(); if (size == 0) { return null; } int p = pos.getAndIncrement(); if (p >= size - 1) { pos.set(0); } try { return list.get(p); } catch (IndexOutOfBoundsException e) { // Can happen pos.set(0); } } }
当时我使用Vert.x的时候,就很好奇eventBus的轮询功能怎么实现。现在看到其实非常简单。维护一个 AtomicInteger 的变量,每次调用累加一次。如果超过List的长度,则重置为0,方法永远返回 list.get(p)。巧妙!
最后在deliverToHandler()方法里,在Context的线程控制下,完成message和handler的最终交互。
那么,回到最开始的问题,
Sender----( message )--->Customer;
Sender<---(reply message)---Customer;
在上面的流程中,Sender根据address找到Customer从而发送message,那么Customer的reply是怎么找到Sender的呢?
答案是一个临时的replyAddress。通过以 replyAddress为key,把Sender作为handler注册到eventBusImpl上,处理后直接注销。replyAddress的规律是从1开始的步长为1的自增数列,所以开发者不应该使用纯数字作为自身业务的Address,避免冲突。
最后说说1.createReplyHandlerRegistration
如果sender在发送消息时使用了
send(String address, Object message, Handler<AsyncResult<Message<T>>> replyHandler);方法。
vertx.eventBus().send("address1", "测试消息", ar -> {
if (ar.succeeded()) {
System.out.println("我是producer1:" + ar.result().body());
}
});
并且consumer在接受消息到后,调用了 reply();
vertx.eventBus().consumer("address1", ar -> {
System.out.println("consumer:" + ar.body());
ar.reply("consumer reply message ");
});
则会进入createReplyHandlerRegistration的处理逻辑。
使用
protected String generateReplyAddress() {
return Long.toString(replySequence.incrementAndGet());
}
这里产生从1开始的步长为1的自增数列address。
Handler<Message<T>> simpleReplyHandler = convertHandler(replyHandler);
HandlerRegistration<T> registration =
new HandlerRegistration<>(vertx, metrics, this, replyAddress, message.address, true, replyHandler, timeout);
registration.handler(simpleReplyHandler);
里面的this是eventBusImpl,并在handler()方法里把 boolean replyHander的值置为true.
这样,eventBusImpl的handlerMap变量里,就多了<replyAddress, replyHander>。
在cuomser处调用reply()后,会在eventBusImpl的内部类ReplySendContextImpl<T> extends SendContextImpl 的参与下,走类似send()的流程。区别是最后在deliverToHandler()方法里,会判断boolean replyHander的值,如果是true调用完毕就注销.
错误代码测验:
vertx.eventBus().consumer("1", ar -> {
System.out.println("我不应该在这里" + ar.body());
ar.reply("对不起,其实我是阿杜。");
});
vertx.eventBus().consumer("address1", ar -> {
System.out.println("consumer:" + ar.body());
ar.reply("我是高帅富");
});
vertx.eventBus().send("address1", "测试消息", ar -> {
if (ar.succeeded()) {
System.out.println("sender:接收收到的回应是:"+ar.result().body());
}else{
System.out.println("发送失败");
}
});
存在consumer("1", ar -> {})的Console:
consumer:测试消息 我不应该在这里我是高帅富 20:08:56.404 [vert.x-eventloop-thread-0] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024 20:08:56.405 [vert.x-eventloop-thread-0] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096 发送失败
可以看到上面的输出完全不是设想的结果。
如果不存在consumer("1", ar -> {})address为1的Console:
consumer:测试消息
sender:接收收到的回应是:我是高帅富
最后,再次提醒:使用eventBus时,不要使用纯数字作为自身业务的address。