在之前的文章中我们详细介绍过Netty中的NioEventLoop,NioEventLoop从本质上讲是一个事件循环执行器,每个NioEventLoop都会绑定一个对应的线程通过一个for(;;)
循环来处理事件消息。今天我们就借鉴NioEventLoop,并加入消息分发策略构建一个基础的Eventloop线程模型。
整个线程模型可以划分为三部分
事件监听及分发(Event): 监听消息事件并通过分发器分发;
分发器(Dispatch):将消息按照分发策略分发到对应的事件循环器中;
事件循环器 (EventLoop) :绑定一个对应的线程与消息队列,循环处理消息;
模型图如下所示
事件监听器把监听到的事件交给分发器进行分发,分发器根据一定的分发策略把消息事件分发至对应的Eventloop的消息事件队列中,每个EventLoop内部会启动一个线程轮询队列中的消息事件并进行处理;
下面结合代码看下具体实现
一、事件监听及分发
首先是事件监听机制,定义了事件、事件源、事件监听器三部分
/** * 事件类,用于封装事件源及一些与事件相关的参数. */ public class LoopEvent extends EventObject { private static final long serialVersionUID = 1L; public LoopEvent(Object source) { super(source); // TODO Auto-generated constructor stub } } /** * 事件源 * */ public class LoopEventSource { // 监听器容器 private static Set<AbstractEventListener> listeners; public LoopEventSource() { listeners = new HashSet<AbstractEventListener>(); } // 给事件源注册监听器 public void addEventListener(AbstractEventListener listener) { listeners.add(listener); } // 当事件发生时,通知注册在该事件源上的所有监听器做出相应的反应(调用回调方法) public void notifies(LoopEvent event) { AbstractEventListener listener = null; try { Iterator<AbstractEventListener> iterator = listeners.iterator(); while (iterator.hasNext()) { listener = iterator.next(); listener.fireEvent(event); // 如果事件消息类型不同,可以进行封装 } } catch (Exception e) { e.printStackTrace(); } } } /** * 事件监听器 * */ public class LoopEventListener extends AbstractEventListener { EventLoopDispatch<String,String> dispatcher = new EventLoopDispatch<String,String>(4);//定义及初始化一个消息分发器 @Override public void fireEvent(LoopEvent e) { EventRecord<String,String> eventRecord = new EventRecord<String, String>(null, e.getSource().toString());//转为内部统一的消息类型 dispatcher.dispatch(eventRecord);//分发消息事件 } }
以上代码就是实现一个简单的事件监听机制,需要注意两点:
1、事件监听器中对消息分发器的定义及初始化,因为在分发器中会同时构造及初始化EventLoopGroup与DefaultPartitioner,前者顾名思义是指一组事件循环器,后者则是分发策略的具体实现;
2、为了配置分发策略需要把事件数据转为线程模型内部统一的消息类型;
二、分发器(Dispatch)
接收到事件触发后,我们会把事件转为统一的内部消息类型,然后交给dispatch进行分发,来看下分发的具体代码。
//消息分发与执行 public void dispatch(EventRecord<K, V> record) { int partation = partitioner.partition(core,record.partition(), record.key());//根据传入的消息数据,确定partation group.next(partation, record);//根据partation,确定执行的EventLoop }
分发方法实现的功能也很简单,第一步根据传入的消息体数据拿到具体的partation,第二步把拿到的partation与消息数据传入group的next方法,分配指定的EventLoop;
在AbstractDispatch构造函数中对EventLoopGroup与DefaultPartitioner进行初始化操作;
protected AbstractDispatch(int core) {
this.core = core;
this.partitioner = new DefaultPartitioner();
this.group = new ThreadEventLoopGroup(core);
}
DefaultPartitioner类借鉴了Kafka Consumer的实现,主要实现partitioner的分配策略,可以通过指定消息实体EventRecord的Key或Partition,保证消息数据按照一定的规则落入对应的EventLoop中。
根据EventRecord消息构造的不同,三种策略如下:
1、构造EventRecord时,确定了partition,判断后直接返回;
2、构造EventRecord时,Key与partition均为空,自增取余;
3、构造EventRecord时,partition为空,Key不为空,hash取余;
public class DefaultPartitioner implements Partitioner { private final ConcurrentMap<Integer, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>(); public int partition(int core, Integer partition, Object key) { if (partition != null ) { if(partition>core) {//可以自己指定partition,但不能大于core,也就是group个数 throw new IllegalArgumentException(String .format("Invalid partition: %d. partition should always be greater than core.", partition)); } return partition; } else if (partition == null && key == null) {//如果没有指定key或partition,则采用自增取余模式 int nextValue = nextValue(core); return ConstantUtil.toPositive(nextValue) % core; } else {//如果partition为空,key不为空,则根据hash取余 return Math.abs(key.hashCode() % core); } } private int nextValue(int core) { AtomicInteger counter = topicCounterMap.get(core); if (null == counter) { counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent(core, counter); if (currentCounter != null) { counter = currentCounter; } } return counter.getAndIncrement(); } }
而ThreadEventLoopGroup 顾名思义就是维护一组EventLoop,会根据你传入的参数大小创建并初始化一个EventLoop数组;next方法会根据传入的partition,把消息数据交给EventLoop数组中的一个来执行
public class ThreadEventLoopGroup { private ThreadEventLoop[] children; public ThreadEventLoopGroup(int threads) { this(threads, null); } public ThreadEventLoopGroup(int threads, Executor executor) { if (executor == null) {//自定义线程工厂 executor = new ThreadPerTaskExecutor(EventThreadFactory()); } //根据传入的线程数创建一个EventLoop数组 children = new ThreadEventLoop[threads]; //初始化EventLoop for (int i = 0; i < threads; i++) { children[i] = new ThreadEventLoop(executor); } } protected ThreadFactory EventThreadFactory() { return new EventThreadFactory(); } //分配EventLoop public void next(int partaion, EventRecord<?, ?> context) { children[partaion].execute(context); } }
以上部分主要实现了消息数据的接收与分发,具体分发策略由DefaultPartitioner与 EventRecord相互配合实现 ,使用者既可以通过构造EventRecord自定义分发策略,也可以使用默认分发策略;
三、事件循环器 (EventLoop)
消息经过分发器分发后,会进入对应EventLoop,我们首先看下EventLoop及其抽象类的实现
ThreadEventLoop类
public class ThreadEventLoop extends AbstractEventLoop { public ThreadEventLoop(Executor executor) { super(executor); } public void run() { //通过for(;;)轮询的方式从阻塞队列中获取数据进行处理 for (;;) { try { EventRecord<?, ?> record = arrayBlockingQueue.take();//从阻塞队列中获取数据 iHandler.execute(record); } catch (InterruptedException e) { // TODO Auto-generated catch block System.err.println(e.getMessage()); } } } public void execute(EventRecord<?, ?> record) { try { if (record == null) {//判断非空 throw new NullPointerException("EventRecord"); } boolean inEventLoop = inEventLoop(Thread.currentThread()); arrayBlockingQueue.put(record);//把消息放入阻塞队列中 if (!inEventLoop) { startThread();//启动EventLoop中改的线程 } } catch (InterruptedException e) { // TODO Auto-generated catch block System.err.println(e.getMessage()); } } }
execute作为执行方法入口主要起到两点作用:一是要把收到的消息数据放入阻塞队列中,二是启动EventLoop中的线程,从而执行run方法轮询队列进行处理;
在抽象类AbstractEventLoop 中,startThread方法通过CAS的方式,保证了每个EventLoop 中自定义的线程工厂只创建和启动一次线程;
public abstract class AbstractEventLoop { //阻塞队列,消息会先进入阻塞队列中,再由线程循环处理 protected final ArrayBlockingQueue<EventRecord<?,?>> arrayBlockingQueue = new ArrayBlockingQueue<EventRecord<?,?>>(1024); //绑定线程 protected volatile Thread thread; protected final Executor executor; protected IHandler iHandler = new EventHandler(); //声明AtomicIntegerFieldUpdater类,用于控制线程启动状态 private static final AtomicIntegerFieldUpdater<AbstractEventLoop> STATE_UPDATER = AtomicIntegerFieldUpdater .newUpdater(AbstractEventLoop.class, "state"); private volatile int state = ST_NOT_STARTED; private static final int ST_NOT_STARTED = 1; private static final int ST_STARTED = 2; protected AbstractEventLoop(Executor executor) { this.executor = executor; } public boolean inEventLoop(Thread thread) { return thread == this.thread; } //启动线程 protected void startThread() { if (state == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {//通过CAS的方式保证线程不会重复启动 try { doStartThread();// } catch (Throwable cause) { STATE_UPDATER.set(this, ST_NOT_STARTED); } } } } protected void doStartThread() { assert thread == null; executor.execute(new Runnable() {//通过自定义线程工厂开始执行线程 @Override public void run() { thread = Thread.currentThread(); AbstractEventLoop.this.run();//执行实现类的run方法 } }); } protected abstract void run(); }
可以看到EventLoop要实现的目标很明确,每个EventLoop都会绑定与维护一个对应的线程,该线程轮询队列中的消息进行处理;
四、总结
以上就是一个由事件消息驱动的EventLoop线程模型的构建,其中涉及到了事件监听、消息分发、线程轮询等内容,相比与一般的生产者消费者模型,具备以下特点:
1、结合事件监听机制,统一内部消息模型,为分发策略的制定与无锁化提供基础;
2、线程内部结合阻塞队列循环执行,保证同一EventLoo中数据处理的有序性;
3、针对需要线程同步数据,可根据一定的分发策略保证由同一线程执行,实现无锁化;
当然相比Netty等框架中的EventLoop线程模型,本文只是基于自己理解实现的简单实例,其中有很多复杂的细节都未考虑,但仍然希望对大家能有所帮助,其中如有不足与不正确的地方还望指出与海涵。
github地址:https://github.com/dafanjoy/fan-eventLoop
关注微信公众号,查看更多技术文章。
转载说明:未经授权不得转载,授权后务必注明来源(注明:来源于公众号:架构空间, 作者:大凡)