EventBus 深入学习三之Guava小结
- 巧妙的利用缓存, 解决重复耗时的操作
- 异步化的操作
- 队列存储消息, 以及如何避免消息的重复消费
- 消费的先后顺序
- 截断
- 异常处理
1. 缓存
看代码时,可以看到很多地方都用到了缓存,如再注册时, 根据class获取所有带注解的方法; 推送消息时,根据事件类型,获取所有的超类集合
如注册时,一条完整的调用链
com.google.common.eventbus.SubscriberRegistry#register ->
com.google.common.eventbus.SubscriberRegistry#findAllSubscribers -> com.google.common.eventbus.SubscriberRegistry#getAnnotatedMethods ->
subscriberMethodsCache.getUnchecked(clazz) ->
com.google.common.eventbus.SubscriberRegistry#getAnnotatedMethodsNotCached
2. 根据类查询所有超类
TypeToken.of(concreteClass).getTypes().rawTypes());
// 我们自己的实现, 一直到返回null为止
clz.getSuperClass().getSuperClass();
// 获取接口
clz.getInterfaces()
3. 异步
异步推送处理Event和同步处理主要的区别点是使用的 Dispatcher不同, 同步是使用 PerThreadQueuedDispatcher
, 异步是 LegacyAsyncDispatcher
异步的消息分发
/**
* Global event queue.
*/
private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
Queues.newConcurrentLinkedQueue();
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
while (subscribers.hasNext()) {
queue.add(new EventWithSubscriber(event, subscribers.next()));
}
EventWithSubscriber e;
while ((e = queue.poll()) != null) {
e.subscriber.dispatchEvent(e.event);
}
}
private static final class EventWithSubscriber {
private final Object event;
private final Subscriber subscriber;
private EventWithSubscriber(Object event, Subscriber subscriber) {
this.event = event;
this.subscriber = subscriber;
}
}
}
同步的消息推送
/**
* Per-thread queue of events to dispatch.
*/
private final ThreadLocal<Queue<Event>> queue =
new ThreadLocal<Queue<Event>>() {
@Override
protected Queue<Event> initialValue() {
return Queues.newArrayDeque();
}
};
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
checkNotNull(subscribers);
Queue<Event> queueForThread = queue.get();
queueForThread.offer(new Event(event, subscribers));
if (!dispatching.get()) {
dispatching.set(true);
try {
Event nextEvent;
while ((nextEvent = queueForThread.poll()) != null) {
while (nextEvent.subscribers.hasNext()) {
nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
}
}
} finally {
dispatching.remove();
queue.remove();
}
}
}
private static final class Event {
private final Object event;
private final Iterator<Subscriber> subscribers;
private Event(Object event, Iterator<Subscriber> subscribers) {
this.event = event;
this.subscribers = subscribers;
}
}
}
执行时, 在 AsyncEventBus
是在线程池中执行; 而 EventBus
则是直接执行, 实质上的执行器
public static Executor directExecutor() {
return DirectExecutor.INSTANCE;
}
/** See {@link #directExecutor} for behavioral notes. */
private enum DirectExecutor implements Executor {
INSTANCE;
@Override public void execute(Runnable command) {
command.run();
}
}
4. 线程安全
5. 异常处理
-
没有订阅者时, 抛一个
DeadEvent
-
订阅者接收消息后的,执行异常时 (订阅者之间的隔离)
- 看下具体的执行,比较清晰, 将异常抛给 EventBus的 ExceptionHandler统一处理
final void dispatchEvent(final Object event) { executor.execute(new Runnable() { @Override public void run() { try { invokeSubscriberMethod(event); } catch (InvocationTargetException e) { bus.handleSubscriberException(e.getCause(), context(event)); } } }); }
6. 消费顺序 & 截断
Guava的EventBus不支持定义订阅者的顺序,更谈不上截断