前言:

线程安全是Vert.x的重要特性,但这一特性是由它依赖的netty实现的,Vert.x只是直接拿过来使用。

这里涉及到很多个类。

ContextImpl、EventLoopContext、NioEventLoop、和NioEventLoop的父类SingleThreadEventLoop、和NioEventLoop的爷爷类SingleThreadEventExecutor。

原理:

Netty定义了EventExecutor事件执行器,用做对任务处理的封装。执行器内部维护了Queue<Runable>

,实现了任务的顺序执行。还定义了MultithreadEventExecutorGroup类,维护数组变量EventExecutor[] children,实现了多核CPU的利用; (数组队列结构,非常像Hashmap的数组链表结构)。一个Verticle和一个ContextImpl对应,再有一个ContextImpl和一个EventExecutor对应,使所有对Verticle的操作都在一个Queue<Runable>中依次执行,实现了线程安全。

代码:

代码1.构造器

对于占了大部分的普通Verticle来说一般来说,会依次由VertxImpl.getOrCreateContext()、createEventLoopContext()、EventLoopContext构造方法、ContextImpl构造方法调用后,进入ContextImpl类

在创建ContextImpl 时 ,这下面的三个方法(或构造方法),

// 利用next(),从group中取一个。next()也实现了对group的平衡获取

private static EventLoop getEventLoop(VertxInternal vertx) {
    EventLoopGroup group = vertx.getEventLoopGroup();
    if (group != null) {
        return group.next();
    } else {
        return null;
    }
}

// 需要注意this的第2个参数是getEventLoop(vertx)方法的调用。才

protected ContextImpl(VertxInternal vertx, WorkerPool internalBlockingPool, WorkerPool workerPool, String deploymentID, JsonObject config,
ClassLoader tccl) {
    this(vertx, getEventLoop(vertx), internalBlockingPool, workerPool, deploymentID, config, tccl);
}

// 简单的赋值

protected ContextImpl(VertxInternal vertx, EventLoop eventLoop, WorkerPool internalBlockingPool, WorkerPool workerPool, String deploymentID, JsonObject config,ClassLoader tccl) {
    if (DISABLE_TCCL && !tccl.getClass().getName().equals("sun.misc.Launcher$AppClassLoader")) {
        log.warn("You have disabled TCCL checks but you have a custom TCCL to set.");
    }
    this.deploymentID = deploymentID;
    this.config = config;
    this.eventLoop = eventLoop;
    this.tccl = tccl;
    this.owner = vertx;
    this.workerPool = workerPool;
    this.internalBlockingPool = internalBlockingPool;
    this.orderedTasks = new TaskQueue();
    this.internalOrderedTasks = new TaskQueue();
    this.closeHooks = new CloseHooks(log);
}

完成对属性private final EventLoop eventLoop;的赋值,即对ContextImpl和EventLoop的1对1绑定。

VertxImpl的构造方法中,会对它的成员变量 eventLoopGroup 赋值

eventLoopGroup = transport.eventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory, NETTY_IO_RATIO);

在eventLoopGroup()方法为:

public EventLoopGroup eventLoopGroup(int nThreads, ThreadFactory threadFactory, int ioRatio) {
    NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(nThreads, threadFactory);
    eventLoopGroup.setIoRatio(ioRatio);
    return eventLoopGroup;
}

可以看到实例化了一个 NioEventLoopGroup 作为返回值。NioEventLoopGroup 就是若干个NioEventLoop的封装,主要还是看NioEventLoop。

Vert.x系列(五)--ContextImpl源码分析-LMLPHP

用ctrl+alt+U查看下类图,发现NioEventLoop的继承结构有点复杂。可以看到 Executor、SingleThreadEventExecutor。

Executor 定义了 void execute(Runnable command); -- 处理任务的方法

SingleThreadEventExecutor 实现了void execute(Runnable command);

并定义了重要的任务队列 private final Queue<Runnable> taskQueue;

也看看 NioEventLoopGroup的类图:

Vert.x系列(五)--ContextImpl源码分析-LMLPHP

在他的父类MultithreadEventExecutorGroup,定义了private final EventExecutor[] children;

那么,对前面的eventLoopGroup()方法里的

NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(nThreads, threadFactory);

这句话归纳,就是拥有 EventExecutor[] children; 的对象,而EventExecutor实现了对Queue<Runnable> taskQueue;的操作。就是“原理”里说的数组队列结构。

代码2. runOnContext

对Verticle的操作,最后都会统一到 ContextImpl.runOnContext()方法处理,比如EventBusImpl.deliverToHandler()

Vert.x系列(五)--ContextImpl源码分析-LMLPHP

runOnContext作为入口方法很简单:

// Run the task asynchronously on this same context

@Override
public void runOnContext(Handler<Void> task) {
    try {
        executeAsync(task);
    } catch (RejectedExecutionException ignore) {
    // Pool is already shut down
    }
}

executeAsync 有 abstract 关键字修饰,需要查看 ContextImpl 的子类EventLoopContext ,看看它是怎么实现的

public void executeAsync(Handler<Void> task) {
// No metrics, we are on the event loop.
nettyEventLoop().execute(wrapTask(null, task, true, null));
}

这个wrapTask(代码略)方法把属于Vertx的Handler封装成JDK的Runable,传给netty框架处理。再使用execute()执行。下面的逻辑就是netty如何处理Runnable.

代码3 SingleThreadEventExecutor.execute()

execute() 最上层的接口Executor定义的。NioEventLoop的父类SingleThreadEventExecutor 重写了此方法.SingleThreadEventExecutor去执行execute() ,自己仍然还是一个代理,会把真正执行运行线程的逻辑(类似方法名doExecute做的事情)的逻辑交给 private final Executor executor;执行

@Override
public void execute(Runnable task) {
    if (task == null) {
    throw new NullPointerException("task");
    }
    boolean inEventLoop = inEventLoop();
    // 对Queue添加 addTask(Runnable task)--offerTask(Runnable task) --taskQueue.offer(task); 这一系列操作, // 完成了对 Queue<Runnable>的添加操作。
    addTask(task);
    if (!inEventLoop) {
    // 执行
    //SingleThreadEventExecutor.execute--> SingleThreadEventExecutor.startThread-->
    // SingleThreadEventExecutor.doStartThread. -->成员 Executor executor的execute(),实现是ThreadPerTaskExecutor的execute()
    startThread();
    // 对Queue减少
    if (isShutdown() && removeTask(task)) {
    reject();
    }
    }
    if (!addTaskWakesUp && wakesUpForTask(task)) {
    wakeup(inEventLoop);
    }
}

这个Executor executor 是在SingleThreadEventExecutor的构造方法中实例化的ThreadPerTaskExecutor,是属于Netty框架的。但是,ThreadPerTaskExecutor包含一个接口属性ThreadFactory threadFactory。针对Vertx框架的场景,new ThreadPerTaskExecutor(threadFactory) 中的 threadFactory是属于Vertx框架的VertxThreadFactory。

protected SingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory,boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
// 粗体代码
this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);
}
public final class ThreadPerTaskExecutor implements Executor {

    private final ThreadFactory threadFactory; // 这个实际是Vertx框架下的VertxThreadFactory
    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
           }
        this.threadFactory = threadFactory;
    }
    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start(); // 最最最底层的Thread.start()方法。
    }
}

这个变量的源头,很早很早前,由VertxImpl在调用时传入的

eventLoopGroup = transport.eventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory, NETTY_IO_RATIO);

所以,弄到现在,各种Factory包裹,N层逻辑。才最终还是使用抽象工厂模式,调用了Vertx实现的工厂。

需要注意的是 , NioEventLoop重写了newTaskQueue()方法,

@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
// This event loop never calls takeTask()
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue(): PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}

所以Queue<Runnable> taskQueue 拥有的不是在SingleThreadEventExecutor.newTaskQueue()里的 LinkedBlockingQueue , 而是 MpscUnboundedArrayQueue。

public final class ThreadPerTaskExecutor implements Executor {

private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
03-31 17:09