本文主要分享Netty中事件循环机制的实现。
源码分析基于Netty 4.1

EventLoop

前面分享服务端和客户端启动过程的文章中说过,Netty通过事件循环机制(EventLoop)处理IO事件和异步任务,简单来说,就是通过一个死循环,不断处理当前已发生的IO事件和待处理的异步任务。示例如下

while(true) {
	process(selector.select());

	process(getTask());
}

这种事件循环机制也是一种常用的IO事件处理机制,包括Redis,Mysql都使用了类似的机制。

关于异步任务,前面文章说过,EventLoop实现了(jvm)Executor的接口,execute方法可以提供异步任务。
register,bind,connect等操作,都会提交一个任务给EventLoop处理。如

if (eventLoop.inEventLoop()) {
	register0(promise);
} else {
	eventLoop.execute(new Runnable() {
		public void run() {
			register0(promise);
		}
	});
}

下面看一下Netty中事件循环机制相关的类。

EventExecutor,事件执行器,负责处理事件。
EventExecutorGroup维护了一个EventExecutor链表,它继承了ScheduledExecutorService,execute方法通过next方法选择一个EventExecutor,并调用EventLoop#execute处理事件。
(EventExecutor继承了EventExecutorGroup,可以看做一个特殊的EventExecutorGroup,其execute方法可以提交一个任务任务)

EventLoop,事件循环器,继承了EventExecutor,通过循环不断处理注册于其上的Channel的IO事件。
EventLoopGroup接口则继承了EventExecutorGroup,负责调度EventLoop。

SingleThreadEventExecutor实现了EventExecutor,它会创建一个新线程,并在该线程上处理事件,可以理解为单线程处理器。
MultithreadEventExecutorGroup实现EventExecutorGroup,可以理解为多线程处理器(实际上是维护了多个EventExecutor,一个EventExecutor可以理解为一个线程),newChild方法构造具体的EventExecutor。
MultithreadEventExecutorGroup可以配置EventExecutor数量,即线程数量。
EventExecutorChooserFactory.EventExecutorChooser负责选择一个EventExecutor执行实际操作。

NioEventLoop继承了SingleThreadEventExecutor,负责处理NIO事件。所以,一个NioEventLoop对象可以看做是一个线程。
NioEventLoop也实现了EventLoop接口,它实现了事件循环机制,是Netty核心类。

MultithreadEventLoopGroup继承了MultithreadEventExecutorGroup,并实现了EventLoopGroup,其newChild方法构造具体的EventLoop。
NioEventLoopGroup#newChild会构建NioEventLoop。

EventLoop各实现类关系如下
Netty源码解析 -- 事件循环机制实现原理-LMLPHP

启动

SingleThreadEventExecutor关键字段

private final Queue<Runnable> taskQueue;	// 待处理异步任务
private volatile Thread thread;				// EventLoop执行线程,即SingleThreadEventExecutor创建的新线程
private final Executor executor;			// java.util.concurrent.Executor,负责创建线程

当我们通过execute方法提交任务时,如果还没有创建执行新线程,会通过SingleThreadEventExecutor#executor一个新线程,并在新线程中调用run方法(run方法由子类实现,负责实现事件循环机制,新线程是EventLoop真正执行线程)。

SingleThreadEventExecutor#execute

public void execute(Runnable task) {
	...

	boolean inEventLoop = inEventLoop();
	// #1
	addTask(task);
	// #2
	if (!inEventLoop) {
		startThread();
		// #3
		if (isShutdown()) {
			...
		}
	}
	// #4
	if (!addTaskWakesUp && wakesUpForTask(task)) {
		wakeup(inEventLoop);
	}
}

#1 添加任务到待处理列表
#2
inEventLoop方法,判断当前线程是否为EventLoop执行线程
若当前线程非EventLoop执行线程,调用startThread方法启动一个新的线程,执行run方法。
这里可以理解为启动EventLoop。
#3 如果当前EventLoop已关闭,拒绝任务
#4 若当前EventLoop线程阻塞正等待IO事件(Selector#select方法),调用wakeup方法唤醒线程执行该新增任务

循环机制

NioEventLoop#run方法负责实现NIO事件处理机制。

protected void run() {
	int selectCnt = 0;
	// #1
	for (;;) {

			int strategy;

				// #2
				strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
				switch (strategy) {
				case SelectStrategy.CONTINUE:
					continue;

				case SelectStrategy.BUSY_WAIT:
					// fall-through to SELECT since the busy-wait is not supported with NIO

				case SelectStrategy.SELECT:
					// #3
					long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
					if (curDeadlineNanos == -1L) {
						curDeadlineNanos = NONE; // nothing on the calendar
					}
					nextWakeupNanos.set(curDeadlineNanos);
					try {
						// #4
						if (!hasTasks()) {
							strategy = select(curDeadlineNanos);
						}
					} finally {
						// #5
						nextWakeupNanos.lazySet(AWAKE);
					}
					// fall through
				default:
				}
				...

			// #6
			selectCnt++;
			cancelledKeys = 0;
			needsToSelectAgain = false;
			final int ioRatio = this.ioRatio;
			boolean ranTasks;
			// #7
			if (ioRatio == 100) {
				try {
					if (strategy > 0) {
						processSelectedKeys();
					}
				} finally {
					// Ensure we always run tasks.
					ranTasks = runAllTasks();
				}
			} else if (strategy > 0) {
				final long ioStartTime = System.nanoTime();
				try {
					processSelectedKeys();
				} finally {
					// Ensure we always run tasks.
					final long ioTime = System.nanoTime() - ioStartTime;
					ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
				}
			} else {
				ranTasks = runAllTasks(0); // This will run the minimum number of tasks
			}
			// #8
			if (ranTasks || strategy > 0) {
				if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
					logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
							selectCnt - 1, selector);
				}
				selectCnt = 0;
			} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
				selectCnt = 0;
			}


			// #9
			if (isShuttingDown()) {
				closeAll();
				if (confirmShutdown()) {
					return;
				}
			}

	}
}

为了版面整洁,这里删除了异常处理代码。
#1 可以看到,这里通过一个死循环不断处理IO事件和异步任务。
#2 如果当前存在待处理的任务,调用selector.selectNow(),这时会跳出switch语句,往下处理事件和任务,否则返回SelectStrategy.SELECT。
#3 curDeadlineNanos,计算延迟任务队列中第一个任务的到期执行时间(即最晚还能延迟多长时间执行),没有任务则返回-1。
更新nextWakeupNanos为阻塞时间。
由于频繁调用(jvm)Selector.wakeup会造成性能消耗,NioEventLoop维护了一个唤醒标识nextWakeupNanos。nextWakeupNanos有三种值
NONE -- 执行线程被阻塞;
AWAKE -- 执行线程未阻塞;
其他值 -- 执行线程被超时阻塞,在指定的时间后唤醒;
NioEventLoop#wakeup方法中,只有nextWakeupNanos.getAndSet(AWAKE) != AWAKE成功才调用selector.wakeup()方法。
#4
这时如果还没有任务加入,则执行select,阻塞线程。select方法返回结果作为新的strategy。
#5
lazySet方法,设置值之后其他线程在短期内还是可能读到旧值
这里将nextWakeupNanos设置为AWAKE,主要是减少wakeup方法中不必要的wakeup操作。
所以使用lazySet方法也没有问题。
#6 selectCnt增加
旧版本的Java NIO在Linux Epoll实现上存在bug,(jvm)Selector.select方法可能在没有任何就绪事件的情况下返回,导致CPU空转,利用率飙升到100%。
于是,Netty计算select方法重复调用次数selectCnt,并在selectCnt大于SELECTOR_AUTO_REBUILD_THRESHOLD配置(默认512)时,重建selector,从而规避该问题。
幸好在JDK6_6u4,JDK7_b12已修复该Bug。
#7 processSelectedKeys方法处理IO事件,runAllTasks方法处理任务。
ioRatio表示执行IO事件所占CPU时间百分比,默认50,
ioTime * (100 - ioRatio) / ioRatio,通过ioTime,ioRatio计算处理任务的CPU时间。
#8 如果执行了任务或者select方法返回有效值,直接重置selectCnt。
unexpectedSelectorWakeup方法中会在selectCnt大于SELECTOR_AUTO_REBUILD_THRESHOLD时重建selector。
#9 如果是正在关闭状态,则要关闭所有的Channel

IO事件

下面看一下Eventloop中如何处理IO事件。
NioEventLoop关键字段

Selector unwrappedSelector;				// JVM中的Selector
Selector selector;						// 优化后的SelectedSelectionKeySetSelector
SelectedSelectionKeySet selectedKeys;	// 对(jvm)Selector#selectedKeys进行优化

SelectedSelectionKeySetSelector每次调用select前都清除SelectedSelectionKeySet
SelectedSelectionKeySet使用数组代替原Selector的中的HashSet,提高性能。数组默认大小为1024,不够用时扩展为原大小的2倍。

NioEventLoop#构造方法 -> NioEventLoop#openSelector

private SelectorTuple openSelector() {
    final Selector unwrappedSelector;
    try {
		// #1
        unwrappedSelector = provider.openSelector();
    } catch (IOException e) {
        throw new ChannelException("failed to open a new selector", e);
    }

    if (DISABLE_KEY_SET_OPTIMIZATION) {
        return new SelectorTuple(unwrappedSelector);
    }

    ...

    final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

    Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {

        public Object run() {
            try {
				// #2
                Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                ...

                selectedKeysField.set(unwrappedSelector, selectedKeySet);
                publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                return null;
            } ...
        }
    });

    ...
    selectedKeys = selectedKeySet;
    logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
	// #3
    return new SelectorTuple(unwrappedSelector,
                             new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}

#1 通过(jvm)SelectorProvider打开一个Selector
#2 构造了selectedKeySet,并通过反射将该对象设置到Selector的selectedKeys,publicSelectedKeys属性中,这样Selector监听到的事件就会存储到selectedKeySet。
#3 构造了SelectedSelectionKeySetSelector对象

NioEventLoop#select负责阻塞线程,等待IO事件

private int select(long deadlineNanos) throws IOException {
	// #1
	if (deadlineNanos == NONE) {
		return selector.select();
	}

	// #2
	long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
	return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}

#1 一直阻塞,知道发生IO事件或加入了新任务
#2 计算阻塞时间,在原阻塞时间加上995微秒后转化为毫秒。
如果原阻塞时间在5微秒内,就不阻塞了。

IO事件的处理流程为
NioEventLoop#processSelectedKeys -> (没有禁用SelectedSelectionKeySet)processSelectedKeysOptimized -> processSelectedKey

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
	final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
	...

	try {
		int readyOps = k.readyOps();
		// #1
		if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
			int ops = k.interestOps();
			ops &= ~SelectionKey.OP_CONNECT;
			k.interestOps(ops);

			unsafe.finishConnect();
		}

		// #2
		if ((readyOps & SelectionKey.OP_WRITE) != 0) {
			// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
			ch.unsafe().forceFlush();
		}

		// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
		// to a spin loop
		// #3
		if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
			unsafe.read();
		}
	} catch (CancelledKeyException ignored) {
		unsafe.close(unsafe.voidPromise());
	}
}

#1 处理OP_CONNECT
移除关注事件OP_CONNECT,否则Selector.select(..)将不断返回
前面分享客户端启动过程的文章说过了,这里会调用AbstractNioUnsafe#finishConnect,完成客户端Connect操作,可回顾《客户端启动过程解析》。
#2 先处理OP_WRITE事件,能够尽早写入数据释放内存,这里涉及flush操作,后面有文章解析。
#3 处理OP_READ或OP_ACCEPT事件。
对于ServerChannel,这里会调用NioMessageUnsafe#read,处理OP_ACCEPT事件,可回顾《客户端启动过程解析》。
对于SocketChannel,这里会调用NioByteUnsafe#read,进行读写操作,后面有文章解析。

异步任务

下面看一下Eventloop中如何处理异步任务。
run方法#4步骤 -> SingleThreadEventExecutor#runAllTasks

protected boolean runAllTasks(long timeoutNanos) {
    // #1
    fetchFromScheduledTaskQueue();
    Runnable task = pollTask();
    if (task == null) {
        afterRunningAllTasks();
        return false;
    }

    final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
    long runTasks = 0;
    long lastExecutionTime;
    for (;;) {
    	// #2
        safeExecute(task);

        runTasks ++;

        // #3
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                break;
            }
        }
        // #4
        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
    // #5
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

#1 AbstractScheduledEventExecutor#scheduledTaskQueue中存放的是定时任务,
SingleThreadEventExecutor#taskQueue中存放的是待处理的任务。
fetchFromScheduledTaskQueue方法会获取已到期的定时任务,移动到SingleThreadEventExecutor#taskQueue。
#2 执行获取的任务
#3 每个64个任务检查一次是否超时,因为nanoTime()方法也是一个相对昂贵的操作。
#4 取下一个任务,继续处理
#5 预留的扩展方法。

EventLoop在4.1.44版本被优化,代码做了较大改动,删除了原来的wakeup标志,改用nextWakeupNanos,代码更清晰。
请参考 -- Clean up NioEventLoop

Netty是由事件驱动的,服务端register,bind,客户端connect等操作都是提交异步任务给EventLoop处理的
,而Accept,Read/Writ,Connect等IO事件都都需要EventLoop的处理。
大家可以结合前面分析服务端和客户端启动过程的文章,理解EventLoop是如何驱动Netty工作的。

如果您觉得本文不错,欢迎关注我的微信公众号。您的关注是我坚持的动力!
Netty源码解析 -- 事件循环机制实现原理-LMLPHP

11-04 01:53