目录

浅析Node.js的Event Loop

引出问题

首先看两段代码,下面两段代码的执行结果是什么?为什么?

// event-loop-1.js
setTimeout(() => {
console.log('setTimeout');
}, 0);
setImmediate(() => {
console.log('setImmediate');
}); // event-loop-2.js
const fs = require('fs');
fs.readFile(__filename, () => {
setTimeout(() => {
console.log('setTimeout');
}, 0);
setImmediate(() => {
console.log('setImmediate');
});
});

也许你心里已经有了答案,但是就是不太确定,其实这里面涉及到的知识点就是今天要说的Event Loop

Node.js的基本架构

在讲Event Loop之前,首先介绍一下Node.js的基本架构。提到Node.js的时候,我们耳熟能详的是: Node.js是一个基于ChromeV8引擎的JavaScript运行时。Node.js 使用高效、轻量级的事件驱动、非阻塞 I/O 模型。那么这句话真正想要表达的是什么呢?请看下图:

浅析Node.js的Event Loop-LMLPHP

  • Node standard library: Node的标准库,也就是我们平时所用的fs, path, http, net, stream等模块。
  • Node bindlings: 是C++与JavaScript沟通的桥梁, 封装了V8和Libuv的细节,向上层提供API。
  • 最后一层是支撑Node的关键。

使用tree -L 1可以看到Node.js源码的目录如下:

➜  node git:(master) tree -L 1
.
├── AUTHORS
├── BSDmakefile
├── BUILDING.md
├── CHANGELOG.md
├── CODE_OF_CONDUCT.md
├── COLLABORATOR_GUIDE.md
├── CONTRIBUTING.md
├── CPP_STYLE_GUIDE.md
├── GOVERNANCE.md
├── LICENSE
├── Makefile
├── README.md
├── android-configure
├── benchmark
├── common.gypi
├── configure
├── deps
├── doc
├── lib
├── node.gyp
├── node.gypi
├── src
├── test
├── tools
└── vcbuild.bat

而比较关键的几个目录是:

  • deps: 一些Node.js的依赖库,比如Libuv, V8等。
  • src: 包含C++的源码,即Node bindings。
  • lib: 包含JavaScript的源码,存放的是Node.js的核心模块,即fs, path, http, net, stream等模块。

Libuv

我们知道Node.js是一个Runtime, 它拥有异步,非阻塞的模型,那么它是如何实现非阻塞的呢?答案是:Libuv。

浅析Node.js的Event Loop-LMLPHP

什么是Libuv?Libuv是一个高性能的,事件驱动的I/O库,并且提供了 跨平台(如windows, *nix)的API。简单的来说,Node.js的异步、非阻塞I/O,底层实际上是Libuv实现的。

具体更多关于Libuv的知识这里不再阐述,感兴趣的同学下来可以去多了解一下。

Event Loop

可以参考Node.js官方文档上的这样一篇文档: The Node.js Event Loop, Timers, and process.nextTick(), 我们可以知道:

即在Node.js启动的时候,它会初始化Event Loop, 处理提供的输入脚本, 这可能会使异步API调用,调用timers,或者调用process.nextTick, 然后开始处理事件循环。

下图简单展示了事件循环的操作顺序:

┌───────────────────────┐
┌─>│ timers │
│ └──────────┬────────────┘
│ ┌──────────┴────────────┐
│ │ I/O callbacks │
│ └──────────┬────────────┘
│ ┌──────────┴────────────┐
│ │ idle, prepare │
│ └──────────┬────────────┘ ┌───────────────┐
│ ┌──────────┴────────────┐ │ incoming: │
│ │ poll │<─────┤ connections, │
│ └──────────┬────────────┘ │ data, etc. │
│ ┌──────────┴────────────┐ └───────────────┘
│ │ check │
│ └──────────┬────────────┘
│ ┌──────────┴────────────┐
└──┤ close callbacks │
└───────────────────────┘

注意:每个盒子被当作Event Loop的一个阶段。

每个阶段都有一个执行回调的FIFO队列(官网这么描述的,实际上不是的,比如timers的数据结构实际上是堆), 简单概述,当Event Loop进入到某个阶段的时候,就会将该阶段的队列里的回调拿出来执行,直到队列为空(实际上要复杂一点儿)。

Event Loop Phases Overview

简单的介绍一下这几个阶段所做的事情:

  • timers: 这个阶段执行由setTimeout()setInterval()调度的回调。
  • I/O callbacks: 执行几乎所有的回调,除了close callbacks以及timers调度的回调和setImmediate()调度的回调。
  • idle, prepare: 只在内部使用。
  • poll: 检索新的I/O事件,node将在适当的时候阻塞。(retrieve new I/O events; node will block here when appropriate)
  • check: setImmediate()的回调将会在这个阶段执行。
  • close callbacks: 比如socket.on('close', ...)

上面的阶段还是笔记容易理解的,就是poll阶段的解释有点儿让人迷惑,这是什么意思呢?官方文档给出了poll阶段的作用。

Poll Phase

poll阶段有两个功能:

  • 当timers到达指定的时间后,执行指定的timer的回调(Executing scripts for timers whose threshold has elapsed, then)。
  • 处理poll队列的事件(Processing events in the poll queue)。

当进入到poll阶段,并且没有timers被调用的时候,会发生下面的情况:

  • 如果poll队列不为空,Event Loop 将同步的执行poll queue里的callback,直到queue为空或者执行的callback到达上线。
  • 如果poll队列为空,则会发生下面的情况:
    • 如果脚本调用了setImmediate(), Event Loop将会结束poll阶段并且进入到check阶段执行setImmediate()的回调。
    • 如果脚本没有被setImmediate()调用,Event Loop将会等待回调被添加到队列中,然后立即执行它们。

      当进入到poll阶段,并且调用了timers的话,会发生下面的情况:
  • 一旦poll queue是空的话,Event Loop会检查是否timers, 如果有1个或多个timers时间已经到达,Event Loop将会回到timer阶段并执行那些timer的callback(即进入到下一次tick)。

看了上面的介绍,比较I/O callbacks阶段与poll阶段,可能会感到迷惑?为什么在I/O callbacks是执行几乎所有的回调,而在poll阶段也是执行回调?我找到了Libuv的官方文档:

结合Libuv官方文档给出的流程图

浅析Node.js的Event Loop-LMLPHP

来看, 可以翻译为:Pending callbacks(即I/O callbacks)被调用。大多数情况下,所有的I/O callbacks都是在poll for I/O(即poll phase)后理解调用的。然而,有些情况,会在下一次tick调用,以前被推迟的I/O callback会在下一次tick的I/O阶段调用。

那么一般什么样的callback会在I/O callbacks阶段被调用呢?Node.js官方有提到:

即:这个阶段对某些系统操作(比如TCP类型错误)执行回调。举个例子,如果尝试连接时,一个TCP套接字收到了ECONNREFUSED,则某些*nix系统会等待报错。这将排队在I/O callbacks阶段执行。

对于文档上的说法去一探究竟,在Node.js源码里全局搜索: ECONNREFUSED, 在node/deps/uv/src/unix/tcp.c目录下,第206行,uv__tcp_connect函数,代码如下:

int uv__tcp_connect(uv_connect_t* req,
uv_tcp_t* handle,
const struct sockaddr* addr,
unsigned int addrlen,
uv_connect_cb cb) {
int err;
int r; assert(handle->type == UV_TCP); if (handle->connect_req != NULL)
return -EALREADY; /* FIXME(bnoordhuis) -EINVAL or maybe -EBUSY. */ err = maybe_new_socket(handle,
addr->sa_family,
UV_STREAM_READABLE | UV_STREAM_WRITABLE);
if (err)
return err; handle->delayed_error = 0; do {
errno = 0;
r = connect(uv__stream_fd(handle), addr, addrlen);
} while (r == -1 && errno == EINTR); /* We not only check the return value, but also check the errno != 0.
* Because in rare cases connect() will return -1 but the errno
* is 0 (for example, on Android 4.3, OnePlus phone A0001_12_150227)
* and actually the tcp three-way handshake is completed.
*/
if (r == -1 && errno != 0) {
if (errno == EINPROGRESS)
; /* not an error */
else if (errno == ECONNREFUSED)
/* If we get a ECONNREFUSED wait until the next tick to report the
* error. Solaris wants to report immediately--other unixes want to
* wait.
*/
handle->delayed_error = -errno;
else
return -errno;
} uv__req_init(handle->loop, req, UV_CONNECT);
req->cb = cb;
req->handle = (uv_stream_t*) handle;
QUEUE_INIT(&req->queue);
handle->connect_req = req; uv__io_start(handle->loop, &handle->io_watcher, POLLOUT); if (handle->delayed_error)
uv__io_feed(handle->loop, &handle->io_watcher); return 0;
}

从上面的代码我们可以知道,当errno === ECONNREFUSED时,会去调用uv__io_feed(handle->loop, &handle->io_watcher)方法,看一下uv__io_feed的的实现:

void uv__io_feed(uv_loop_t* loop, uv__io_t* w) {
if (QUEUE_EMPTY(&w->pending_queue))
QUEUE_INSERT_TAIL(&loop->pending_queue, &w->pending_queue);
}

从函数名字可以看出来,这里是在向pendingQueue插入发生错误时的回调。也就是说,I/O callbacks一般是对一些系统操作执行回调。

那么我们可以得出结论:

  • 大部分的回调在poll阶段执行的。
  • I/O callbacks阶段一般执行的是系统操作的回调。

The Heart Of Event Loop

有了上面的知识后,我们依然不能解决文章开头的问题。来看一下,Event Loop核心的代码

int uv_run(uv_loop_t* loop, uv_run_mode mode) {
int timeout;
int r;
int ran_pending; r = uv__loop_alive(loop);
if (!r)
uv__update_time(loop); while (r != 0 && loop->stop_flag == 0) {
uv__update_time(loop);
uv__run_timers(loop);
ran_pending = uv__run_pending(loop);
uv__run_idle(loop);
uv__run_prepare(loop); timeout = 0;
if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT)
timeout = uv_backend_timeout(loop); uv__io_poll(loop, timeout);
uv__run_check(loop);
uv__run_closing_handles(loop); if (mode == UV_RUN_ONCE) {
/* UV_RUN_ONCE implies forward progress: at least one callback must have
* been invoked when it returns. uv__io_poll() can return without doing
* I/O (meaning: no callbacks) when its timeout expires - which means we
* have pending timers that satisfy the forward progress constraint.
*
* UV_RUN_NOWAIT makes no guarantees about progress so it's omitted from
* the check.
*/
uv__update_time(loop);
uv__run_timers(loop);
} r = uv__loop_alive(loop);
if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT)
break;
} /* The if statement lets gcc compile it to a conditional store. Avoids
* dirtying a cache line.
*/
if (loop->stop_flag != 0)
loop->stop_flag = 0; return r;
}

上面代码可以简化为下面的伪代码:

while(true) {
uv__update_time(loop); // 使用Linux下的高精度Timer hrtime更新loop->time,即event loop的时间戳
uv__run_timers(loop);
uv__run_pending(loop);
uv__run_idle(loop);
uv__run_prepare(loop);
uv__io__poll(loop, timeout);
uv__run_check(loop);
uv__run_closing_handlers(loop); // Node默认的mode是`UV_RUN_ONCE`
if (mode == UV_RUN_ONCE) {
uv__run_timers();
uv__update_time(loop); // 更新loop->time
}
}

实际上,在一次tick的时候,首先会去调用一次uv__run_timers去处理timers, 然后在最后if语句里,还会去调用uv__run_timers

我在timers的实现里, 找到下面的代码:

function Timeout(callback, after, args, isRepeat) {
after *= 1; // coalesce to number or NaN
if (!(after >= 1 && after <= TIMEOUT_MAX)) {
if (after > TIMEOUT_MAX) {
process.emitWarning(`${after} does not fit into` +
' a 32-bit signed integer.' +
'\nTimeout duration was set to 1.',
'TimeoutOverflowWarning');
}
after = 1; // schedule on next tick, follows browser behavior
} this._called = false;
this._idleTimeout = after;
this._idlePrev = this;
this._idleNext = this;
this._idleStart = null;
// this must be set to null first to avoid function tracking
// on the hidden class, revisit in V8 versions after 6.2
this._onTimeout = null;
this._onTimeout = callback;
this._timerArgs = args;
this._repeat = isRepeat ? after : null;
this._destroyed = false; this[async_id_symbol] = ++async_id_fields[kAsyncIdCounter];
this[trigger_async_id_symbol] = getDefaultTriggerAsyncId();
if (async_hook_fields[kInit] > 0) {
emitInit(this[async_id_symbol],
'Timeout',
this[trigger_async_id_symbol],
this);
}
}

也就是说,实际上setTimeout(fn, 0);最后会变为setTimeout(fn, 1);在一次tick的时候,大概的流程是这样的:

  • 首先更新loop->time(uv__update_time)
    UV_UNUSED(static void uv__update_time(uv_loop_t* loop)) {
    /* Use a fast time source if available. We only need millisecond precision.
    */
    loop->time = uv__hrtime(UV_CLOCK_FAST) / 1000000;
    }
  • 上面的uv__hrtime(UV_CLOCK_FAST)的值是精确到纳秒的,因此loop->time最后的结果可能是大于1的,也有可能是小于1的。
  • 然后uv__run_timers(loop)被调用:
    void uv__run_timers(uv_loop_t* loop) {
    struct heap_node* heap_node;
    uv_timer_t* handle; for (;;) {
    heap_node = heap_min((struct heap*) &loop->timer_heap);
    if (heap_node == NULL)
    break; handle = container_of(heap_node, uv_timer_t, heap_node);
    if (handle->timeout > loop->time)
    break; uv_timer_stop(handle);
    uv_timer_again(handle);
    handle->timer_cb(handle);
    }
    }

有了上面的理解后,就可以得到文章最开始的答案了,对于event-loop-1.js:

/*
* 如果第一次loop准备前的耗时超过1ms, 即loop->time > 1, 则会先执行setTimeout, 再执行setImmediate
* 如果第一次loop准备前的耗时小于1ms,即loop->time < 1, 则会先执行setImediate,然后在执行setTimeout
*/
setTimeout(function() {
console.log('setTimeout');
}, 0); setImmediate(function() {
console.log('setImmediate');
});

而对于event-loop-2.js:

/*
* 由于是在回调里面调用的setTimeout, setImmediate两个函数
* 首先在poll阶段,执行回调函数
* 然后进入到check阶段,会执行setImmediate()的回调函数
* 最后进入在执行setTimeout()的回调函数
*
*/
const fs = require('fs');
fs.readFile(__filename, () => {
setTimeout(function() {
console.log('setTimeout');
}, 0);
setImmediate(function() {
console.log('setImmediate');
});
});

MacroTask VS MicroTask

Node.js官网文档的描述中,提到了process.nextTick(), 它不属于Libuv的部分,实际上,它是属于Node.js的一部分。

实际上,除了Libuv里面要处理的回调,在Node.js里还有另外两个queue,分别是Next Tick Queue以及MicroTask Queue

  • Next Tick Queue: 使用process.nextTick()添加的回调。
  • MicroTask Queue: 包含一些microtasks比如resolved promise callbacks

那MacroTask是什么呢?Macrotask实际上就是上面我们遇到的那些异步任务,也被称为Task, 也就是说,有的人会将MacroTask Queue称为Task Queue

它是如何工作的?

我们结合一张图来看看它在Event Loop是如何工作的:

浅析Node.js的Event Loop-LMLPHP

在Event Loop完成一个阶段,然后到另一个阶段之前,Event Loop将会执行这Next Tick Queue以及MicroTask Queue里面的回调, 直到这两个队列为空。一旦它们空了后,Event Loop会进入到下一个阶段。

很多人会将这两个队列都当作是MicroTask Queue, 因为它们是处于同一阶段执行的, 实际上,这两个队列执行依然是有一个先后顺序的: Next Tick Queue的优先级高于MicroTask Queue, 注意:我们这里将两个队列称为Immediate Queue

上面的那段话引用来自Event Loop and the Big Picture — NodeJS Event Loop Part 1, 即Event Loop在处理拥有5个handlersNext Tick Queue时,有2个handlers被添加到Next Tick Queue, 一旦5个handlers被处理完后,Event Loop会接着处理Next Tick Queue里面新增的两个handlers, 然后再处理MicroTask Queue里的回调,当Immediate Queue里面的回调都处理完成后,Event Loop将会进入到下一个阶段。举个例子:

Promise.resolve().then(() => {
console.log('resolve1');
}); process.nextTick(function() {
console.log('tick1');
process.nextTick(function() {
console.log('tick2');
});
process.nextTick(function() {
console.log('tick3');
});
}); Promise.resolve().then(() => {
console.log('resolve2');
}); process.nextTick(function() {
console.log('tick4');
}); Promise.resolve().then(() => {
console.log('resolve3');
}); process.nextTick(function() {
console.log('tick5');
});

那么上面的执行顺序是:tick1, tick4, tick5, tick2, tick3, resolve1, resolve2, resolve3。不要递归调用process.nextTick, 因为这会导致I/O starvation

推荐阅读

参考

04-16 06:33
查看更多