前言:
线程安全是Vert.x的重要特性,但这一特性是由它依赖的netty实现的,Vert.x只是直接拿过来使用。
这里涉及到很多个类。
ContextImpl、EventLoopContext、NioEventLoop、和NioEventLoop的父类SingleThreadEventLoop、和NioEventLoop的爷爷类SingleThreadEventExecutor。
原理:
Netty定义了EventExecutor事件执行器,用做对任务处理的封装。执行器内部维护了Queue<Runable>
,实现了任务的顺序执行。还定义了MultithreadEventExecutorGroup类,维护数组变量EventExecutor[] children,实现了多核CPU的利用; (数组队列结构,非常像Hashmap的数组链表结构)。一个Verticle和一个ContextImpl对应,再有一个ContextImpl和一个EventExecutor对应,使所有对Verticle的操作都在一个Queue<Runable>中依次执行,实现了线程安全。
代码:
代码1.构造器
对于占了大部分的普通Verticle来说一般来说,会依次由VertxImpl.getOrCreateContext()、createEventLoopContext()、EventLoopContext构造方法、ContextImpl构造方法调用后,进入ContextImpl类
在创建ContextImpl 时 ,这下面的三个方法(或构造方法),
// 利用next(),从group中取一个。next()也实现了对group的平衡获取
private static EventLoop getEventLoop(VertxInternal vertx) { EventLoopGroup group = vertx.getEventLoopGroup(); if (group != null) { return group.next(); } else { return null; } }
// 需要注意this的第2个参数是getEventLoop(vertx)方法的调用。才
protected ContextImpl(VertxInternal vertx, WorkerPool internalBlockingPool, WorkerPool workerPool, String deploymentID, JsonObject config, ClassLoader tccl) { this(vertx, getEventLoop(vertx), internalBlockingPool, workerPool, deploymentID, config, tccl); }
// 简单的赋值
protected ContextImpl(VertxInternal vertx, EventLoop eventLoop, WorkerPool internalBlockingPool, WorkerPool workerPool, String deploymentID, JsonObject config,ClassLoader tccl) { if (DISABLE_TCCL && !tccl.getClass().getName().equals("sun.misc.Launcher$AppClassLoader")) { log.warn("You have disabled TCCL checks but you have a custom TCCL to set."); } this.deploymentID = deploymentID; this.config = config; this.eventLoop = eventLoop; this.tccl = tccl; this.owner = vertx; this.workerPool = workerPool; this.internalBlockingPool = internalBlockingPool; this.orderedTasks = new TaskQueue(); this.internalOrderedTasks = new TaskQueue(); this.closeHooks = new CloseHooks(log); }
完成对属性private final EventLoop eventLoop;的赋值,即对ContextImpl和EventLoop的1对1绑定。
VertxImpl的构造方法中,会对它的成员变量 eventLoopGroup 赋值
eventLoopGroup = transport.eventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory, NETTY_IO_RATIO);
在eventLoopGroup()方法为:
public EventLoopGroup eventLoopGroup(int nThreads, ThreadFactory threadFactory, int ioRatio) { NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(nThreads, threadFactory); eventLoopGroup.setIoRatio(ioRatio); return eventLoopGroup; }
可以看到实例化了一个 NioEventLoopGroup 作为返回值。NioEventLoopGroup 就是若干个NioEventLoop的封装,主要还是看NioEventLoop。
用ctrl+alt+U查看下类图,发现NioEventLoop的继承结构有点复杂。可以看到 Executor、SingleThreadEventExecutor。
Executor 定义了 void execute(Runnable command); -- 处理任务的方法
SingleThreadEventExecutor 实现了void execute(Runnable command);
并定义了重要的任务队列 private final Queue<Runnable> taskQueue;
也看看 NioEventLoopGroup的类图:
在他的父类MultithreadEventExecutorGroup,定义了private final EventExecutor[] children;
那么,对前面的eventLoopGroup()方法里的
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(nThreads, threadFactory);
这句话归纳,就是拥有 EventExecutor[] children; 的对象,而EventExecutor实现了对Queue<Runnable> taskQueue;的操作。就是“原理”里说的数组队列结构。
代码2. runOnContext
对Verticle的操作,最后都会统一到 ContextImpl.runOnContext()方法处理,比如EventBusImpl.deliverToHandler()
runOnContext作为入口方法很简单:
// Run the task asynchronously on this same context @Override public void runOnContext(Handler<Void> task) { try { executeAsync(task); } catch (RejectedExecutionException ignore) { // Pool is already shut down } }
executeAsync 有 abstract 关键字修饰,需要查看 ContextImpl 的子类EventLoopContext ,看看它是怎么实现的
public void executeAsync(Handler<Void> task) { // No metrics, we are on the event loop. nettyEventLoop().execute(wrapTask(null, task, true, null)); }
这个wrapTask(代码略)方法把属于Vertx的Handler封装成JDK的Runable,传给netty框架处理。再使用execute()执行。下面的逻辑就是netty如何处理Runnable.
代码3 SingleThreadEventExecutor.execute()
execute() 最上层的接口Executor定义的。NioEventLoop的父类SingleThreadEventExecutor 重写了此方法.SingleThreadEventExecutor去执行execute() ,自己仍然还是一个代理,会把真正执行运行线程的逻辑(类似方法名doExecute做的事情)的逻辑交给 private final Executor executor;执行
@Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); // 对Queue添加 addTask(Runnable task)--offerTask(Runnable task) --taskQueue.offer(task); 这一系列操作, // 完成了对 Queue<Runnable>的添加操作。 addTask(task); if (!inEventLoop) { // 执行 //SingleThreadEventExecutor.execute--> SingleThreadEventExecutor.startThread--> // SingleThreadEventExecutor.doStartThread. -->成员 Executor executor的execute(),实现是ThreadPerTaskExecutor的execute() startThread(); // 对Queue减少 if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
这个Executor executor 是在SingleThreadEventExecutor的构造方法中实例化的ThreadPerTaskExecutor,是属于Netty框架的。但是,ThreadPerTaskExecutor包含一个接口属性ThreadFactory threadFactory。针对Vertx框架的场景,new ThreadPerTaskExecutor(threadFactory) 中的 threadFactory是属于Vertx框架的VertxThreadFactory。
protected SingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory,boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { // 粗体代码 this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler); }
public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory; // 这个实际是Vertx框架下的VertxThreadFactory public ThreadPerTaskExecutor(ThreadFactory threadFactory) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } this.threadFactory = threadFactory; } @Override public void execute(Runnable command) { threadFactory.newThread(command).start(); // 最最最底层的Thread.start()方法。 } }
这个变量的源头,很早很早前,由VertxImpl在调用时传入的
eventLoopGroup = transport.eventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory, NETTY_IO_RATIO);
所以,弄到现在,各种Factory包裹,N层逻辑。才最终还是使用抽象工厂模式,调用了Vertx实现的工厂。
需要注意的是 , NioEventLoop重写了newTaskQueue()方法,
@Override protected Queue<Runnable> newTaskQueue(int maxPendingTasks) { // This event loop never calls takeTask() return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue(): PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks); }
所以Queue<Runnable> taskQueue 拥有的不是在SingleThreadEventExecutor.newTaskQueue()里的 LinkedBlockingQueue , 而是 MpscUnboundedArrayQueue。
public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory; public ThreadPerTaskExecutor(ThreadFactory threadFactory) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } this.threadFactory = threadFactory; } @Override public void execute(Runnable command) { threadFactory.newThread(command).start(); } }