淺析Node.js的Event Loop

目錄

淺析Node.js的Event Loop

引出問題

首先看兩段代碼,下面兩段代碼的執行結果是什麼?爲何?javascript

// event-loop-1.js
setTimeout(() => {
    console.log('setTimeout');
}, 0);
setImmediate(() => {
    console.log('setImmediate');
});


// event-loop-2.js
const fs = require('fs');
fs.readFile(__filename, () => {
    setTimeout(() => {
        console.log('setTimeout');
    }, 0);
    setImmediate(() => {
        console.log('setImmediate');
    });
});

也許你內心已經有了答案,可是就是不太肯定,其實這裏面涉及到的知識點就是今天要說的Event Loophtml

Node.js的基本架構

在講Event Loop以前,首先介紹一下Node.js的基本架構。提到Node.js的時候,咱們耳熟能詳的是: Node.js是一個基於ChromeV8引擎的JavaScript運行時。Node.js 使用高效、輕量級的事件驅動、非阻塞 I/O 模型。那麼這句話真正想要表達的是什麼呢?請看下圖:java

  • Node standard library: Node的標準庫,也就是咱們平時所用的fs, path, http, net, stream等模塊。
  • Node bindlings: 是C++與JavaScript溝通的橋樑, 封裝了V8和Libuv的細節,向上層提供API。
  • 最後一層是支撐Node的關鍵。

使用tree -L 1能夠看到Node.js源碼的目錄以下:node

➜  node git:(master) tree -L 1
.
├── AUTHORS
├── BSDmakefile
├── BUILDING.md
├── CHANGELOG.md
├── CODE_OF_CONDUCT.md
├── COLLABORATOR_GUIDE.md
├── CONTRIBUTING.md
├── CPP_STYLE_GUIDE.md
├── GOVERNANCE.md
├── LICENSE
├── Makefile
├── README.md
├── android-configure
├── benchmark
├── common.gypi
├── configure
├── deps
├── doc
├── lib
├── node.gyp
├── node.gypi
├── src
├── test
├── tools
└── vcbuild.bat

而比較關鍵的幾個目錄是:android

  • deps: 一些Node.js的依賴庫,好比Libuv, V8等。
  • src: 包含C++的源碼,即Node bindings。
  • lib: 包含JavaScript的源碼,存放的是Node.js的核心模塊,即fs, path, http, net, stream等模塊。

Libuv

咱們知道Node.js是一個Runtime, 它擁有異步,非阻塞的模型,那麼它是如何實現非阻塞的呢?答案是:Libuv。git

什麼是Libuv?Libuv是一個高性能的,事件驅動的I/O庫,而且提供了 跨平臺(如windows, *nix)的API。簡單的來講,Node.js的異步、非阻塞I/O,底層其實是Libuv實現的。github

具體更多關於Libuv的知識這裏再也不闡述,感興趣的同窗下來能夠去多瞭解一下。windows

Event Loop

能夠參考Node.js官方文檔上的這樣一篇文檔: The Node.js Event Loop, Timers, and process.nextTick(), 咱們能夠知道:promise

When Node.js starts, it initializes the event loop, processes the provided input script (or drops into the REPL, which is not covered in this document) which may make async API calls, schedule timers, or call process.nextTick(), then begins processing the event loop.瀏覽器

即在Node.js啓動的時候,它會初始化Event Loop, 處理提供的輸入腳本, 這可能會使異步API調用,調用timers,或者調用process.nextTick, 而後開始處理事件循環。
下圖簡單展現了事件循環的操做順序:

┌───────────────────────┐
┌─>│        timers         │
│  └──────────┬────────────┘
│  ┌──────────┴────────────┐
│  │     I/O callbacks     │
│  └──────────┬────────────┘
│  ┌──────────┴────────────┐
│  │     idle, prepare     │
│  └──────────┬────────────┘      ┌───────────────┐
│  ┌──────────┴────────────┐      │   incoming:   │
│  │         poll          │<─────┤  connections, │
│  └──────────┬────────────┘      │   data, etc.  │
│  ┌──────────┴────────────┐      └───────────────┘
│  │        check          │
│  └──────────┬────────────┘
│  ┌──────────┴────────────┐
└──┤    close callbacks    │
└───────────────────────┘

注意:每一個盒子被看成Event Loop的一個階段。

每一個階段都有一個執行回調的FIFO隊列(官網這麼描述的,實際上不是的,好比timers的數據結構其實是堆), 簡單概述,當Event Loop進入到某個階段的時候,就會將該階段的隊列裏的回調拿出來執行,直到隊列爲空(實際上要複雜一點兒)。

Event Loop Phases Overview

簡單的介紹一下這幾個階段所作的事情:

  • timers: 這個階段執行由setTimeout()setInterval()調度的回調。
  • I/O callbacks: 執行幾乎全部的回調,除了close callbacks以及timers調度的回調和setImmediate()調度的回調。
  • idle, prepare: 只在內部使用。
  • poll: 檢索新的I/O事件,node將在適當的時候阻塞。(retrieve new I/O events; node will block here when appropriate)
  • check: setImmediate()的回調將會在這個階段執行。
  • close callbacks: 好比socket.on('close', ...)

上面的階段仍是筆記容易理解的,就是poll階段的解釋有點兒讓人迷惑,這是什麼意思呢?官方文檔給出了poll階段的做用。

Poll Phase

poll階段有兩個功能:

  • 當timers到達指定的時間後,執行指定的timer的回調(Executing scripts for timers whose threshold has elapsed, then)。
  • 處理poll隊列的事件(Processing events in the poll queue)。

當進入到poll階段,而且沒有timers被調用的時候,會發生下面的狀況:

  • 若是poll隊列不爲空,Event Loop 將同步的執行poll queue裏的callback,直到queue爲空或者執行的callback到達上線。
  • 若是poll隊列爲空,則會發生下面的狀況:
    • 若是腳本調用了setImmediate(), Event Loop將會結束poll階段而且進入到check階段執行setImmediate()的回調。
    • 若是腳本沒有被setImmediate()調用,Event Loop將會等待回調被添加到隊列中,而後當即執行它們。
      當進入到poll階段,而且調用了timers的話,會發生下面的狀況:
  • 一旦poll queue是空的話,Event Loop會檢查是否timers, 若是有1個或多個timers時間已經到達,Event Loop將會回到timer階段並執行那些timer的callback(即進入到下一次tick)。

看了上面的介紹,比較I/O callbacks階段與poll階段,可能會感到迷惑?爲何在I/O callbacks是執行幾乎全部的回調,而在poll階段也是執行回調?我找到了Libuv的官方文檔:

Pending callbacks are called. All I/O callbacks are called right after polling for I/O, for the most part. There are cases, however, in which calling such a callback is deferred for the next loop iteration. If the previous iteration deferred any I/O callback it will be run at this point.

結合Libuv官方文檔給出的流程圖

來看, 能夠翻譯爲:Pending callbacks(即I/O callbacks)被調用。大多數狀況下,全部的I/O callbacks都是在poll for I/O(即poll phase)後理解調用的。然而,有些狀況,會在下一次tick調用,之前被推遲的I/O callback會在下一次tick的I/O階段調用。

那麼通常什麼樣的callback會在I/O callbacks階段被調用呢?Node.js官方有提到:

This phase executes callbacks for some system operations such as types of TCP errors. For example if a TCP socket receives ECONNREFUSED when attempting to connect, some *nix systems want to wait to report the error. This will be queued to execute in the I/O callbacks phase.

即:這個階段對某些系統操做(好比TCP類型錯誤)執行回調。舉個例子,若是嘗試鏈接時,一個TCP套接字收到了ECONNREFUSED,則某些*nix系統會等待報錯。這將排隊在I/O callbacks階段執行。

對於文檔上的說法去一探究竟,在Node.js源碼裏全局搜索: ECONNREFUSED, 在node/deps/uv/src/unix/tcp.c目錄下,第206行,uv__tcp_connect函數,代碼以下:

int uv__tcp_connect(uv_connect_t* req,
                    uv_tcp_t* handle,
                    const struct sockaddr* addr,
                    unsigned int addrlen,
                    uv_connect_cb cb) {
  int err;
  int r;

  assert(handle->type == UV_TCP);

  if (handle->connect_req != NULL)
    return -EALREADY;  /* FIXME(bnoordhuis) -EINVAL or maybe -EBUSY. */

  err = maybe_new_socket(handle,
                         addr->sa_family,
                         UV_STREAM_READABLE | UV_STREAM_WRITABLE);
  if (err)
    return err;

  handle->delayed_error = 0;

  do {
    errno = 0;
    r = connect(uv__stream_fd(handle), addr, addrlen);
  } while (r == -1 && errno == EINTR);

  /* We not only check the return value, but also check the errno != 0.
   * Because in rare cases connect() will return -1 but the errno
   * is 0 (for example, on Android 4.3, OnePlus phone A0001_12_150227)
   * and actually the tcp three-way handshake is completed.
   */
  if (r == -1 && errno != 0) {
    if (errno == EINPROGRESS)
      ; /* not an error */
    else if (errno == ECONNREFUSED)
    /* If we get a ECONNREFUSED wait until the next tick to report the
     * error. Solaris wants to report immediately--other unixes want to
     * wait.
     */
      handle->delayed_error = -errno;
    else
      return -errno;
  }

  uv__req_init(handle->loop, req, UV_CONNECT);
  req->cb = cb;
  req->handle = (uv_stream_t*) handle;
  QUEUE_INIT(&req->queue);
  handle->connect_req = req;

  uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);

  if (handle->delayed_error)
    uv__io_feed(handle->loop, &handle->io_watcher);

  return 0;
}

從上面的代碼咱們能夠知道,當errno === ECONNREFUSED時,會去調用uv__io_feed(handle->loop, &handle->io_watcher)方法,看一下uv__io_feed的的實現:

void uv__io_feed(uv_loop_t* loop, uv__io_t* w) {
  if (QUEUE_EMPTY(&w->pending_queue))
    QUEUE_INSERT_TAIL(&loop->pending_queue, &w->pending_queue);
}

從函數名字能夠看出來,這裏是在向pendingQueue插入發生錯誤時的回調。也就是說,I/O callbacks通常是對一些系統操做執行回調。

那麼咱們能夠得出結論:

  • 大部分的回調在poll階段執行的。
  • I/O callbacks階段通常執行的是系統操做的回調。

The Heart Of Event Loop

有了上面的知識後,咱們依然不能解決文章開頭的問題。來看一下,Event Loop核心的代碼

int uv_run(uv_loop_t* loop, uv_run_mode mode) {
  int timeout;
  int r;
  int ran_pending;

  r = uv__loop_alive(loop);
  if (!r)
    uv__update_time(loop);

  while (r != 0 && loop->stop_flag == 0) {
    uv__update_time(loop);
    uv__run_timers(loop);
    ran_pending = uv__run_pending(loop);
    uv__run_idle(loop);
    uv__run_prepare(loop);

    timeout = 0;
    if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT)
      timeout = uv_backend_timeout(loop);

    uv__io_poll(loop, timeout);
    uv__run_check(loop);
    uv__run_closing_handles(loop);

    if (mode == UV_RUN_ONCE) {
      /* UV_RUN_ONCE implies forward progress: at least one callback must have
       * been invoked when it returns. uv__io_poll() can return without doing
       * I/O (meaning: no callbacks) when its timeout expires - which means we
       * have pending timers that satisfy the forward progress constraint.
       *
       * UV_RUN_NOWAIT makes no guarantees about progress so it's omitted from
       * the check.
       */
      uv__update_time(loop);
      uv__run_timers(loop);
    }

    r = uv__loop_alive(loop);
    if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT)
      break;
  }

  /* The if statement lets gcc compile it to a conditional store. Avoids
   * dirtying a cache line.
   */
  if (loop->stop_flag != 0)
    loop->stop_flag = 0;

  return r;
}

上面代碼能夠簡化爲下面的僞代碼:

while(true) {
    uv__update_time(loop); // 使用Linux下的高精度Timer hrtime更新loop->time,即event loop的時間戳
    uv__run_timers(loop);
    uv__run_pending(loop);
    uv__run_idle(loop);
    uv__run_prepare(loop);
    uv__io__poll(loop, timeout);
    uv__run_check(loop);
    uv__run_closing_handlers(loop);

    // Node默認的mode是`UV_RUN_ONCE`
    if (mode == UV_RUN_ONCE) {
        uv__run_timers();
        uv__update_time(loop); // 更新loop->time
    }
}

實際上,在一次tick的時候,首先會去調用一次uv__run_timers去處理timers, 而後在最後if語句裏,還會去調用uv__run_timers

我在timers的實現裏, 找到下面的代碼:

function Timeout(callback, after, args, isRepeat) {
  after *= 1; // coalesce to number or NaN
  if (!(after >= 1 && after <= TIMEOUT_MAX)) {
    if (after > TIMEOUT_MAX) {
      process.emitWarning(`${after} does not fit into` +
                          ' a 32-bit signed integer.' +
                          '\nTimeout duration was set to 1.',
                          'TimeoutOverflowWarning');
    }
    after = 1; // schedule on next tick, follows browser behavior
  }

  this._called = false;
  this._idleTimeout = after;
  this._idlePrev = this;
  this._idleNext = this;
  this._idleStart = null;
  // this must be set to null first to avoid function tracking
  // on the hidden class, revisit in V8 versions after 6.2
  this._onTimeout = null;
  this._onTimeout = callback;
  this._timerArgs = args;
  this._repeat = isRepeat ? after : null;
  this._destroyed = false;

  this[async_id_symbol] = ++async_id_fields[kAsyncIdCounter];
  this[trigger_async_id_symbol] = getDefaultTriggerAsyncId();
  if (async_hook_fields[kInit] > 0) {
    emitInit(this[async_id_symbol],
             'Timeout',
             this[trigger_async_id_symbol],
             this);
  }
}

也就是說,實際上setTimeout(fn, 0);最後會變爲setTimeout(fn, 1);在一次tick的時候,大概的流程是這樣的:

  • 首先更新loop->time(uv__update_time)
    c UV_UNUSED(static void uv__update_time(uv_loop_t* loop)) { /* Use a fast time source if available. We only need millisecond precision. */ loop->time = uv__hrtime(UV_CLOCK_FAST) / 1000000; }
  • 上面的uv__hrtime(UV_CLOCK_FAST)的值是精確到納秒的,所以loop->time最後的結果多是大於1的,也有多是小於1的。
  • 而後uv__run_timers(loop)被調用:
    ```c
    void uv__run_timers(uv_loop_t* loop) {
    struct heap_node* heap_node;
    uv_timer_t* handle;

    for (;;) {
      heap_node = heap_min((struct heap*) &loop->timer_heap);
      if (heap_node == NULL)
        break;
    
      handle = container_of(heap_node, uv_timer_t, heap_node);
      if (handle->timeout > loop->time)
        break;
    
      uv_timer_stop(handle);
      uv_timer_again(handle);
      handle->timer_cb(handle);
    }

    }
    ```

有了上面的理解後,就能夠獲得文章最開始的答案了,對於event-loop-1.js:

/*
 * 若是第一次loop準備前的耗時超過1ms, 即loop->time > 1, 則會先執行setTimeout, 再執行setImmediate
 * 若是第一次loop準備前的耗時小於1ms,即loop->time < 1, 則會先執行setImediate,而後在執行setTimeout
 */
setTimeout(function() {
    console.log('setTimeout');
}, 0);

setImmediate(function() {
    console.log('setImmediate');
});

而對於event-loop-2.js:

/*
 * 因爲是在回調裏面調用的setTimeout, setImmediate兩個函數
 * 首先在poll階段,執行回調函數
 * 而後進入到check階段,會執行setImmediate()的回調函數
 * 最後進入在執行setTimeout()的回調函數
 *
 */
const fs = require('fs');
fs.readFile(__filename, () => {
    setTimeout(function() {
        console.log('setTimeout');
    }, 0);
    setImmediate(function() {
        console.log('setImmediate');
    });
});

MacroTask VS MicroTask

Node.js官網文檔的描述中,提到了process.nextTick(), 它不屬於Libuv的部分,實際上,它是屬於Node.js的一部分。

實際上,除了Libuv裏面要處理的回調,在Node.js裏還有另外兩個queue,分別是Next Tick Queue以及MicroTask Queue

  • Next Tick Queue: 使用process.nextTick()添加的回調。
  • MicroTask Queue: 包含一些microtasks好比resolved promise callbacks

那MacroTask是什麼呢?Macrotask實際上就是上面咱們遇到的那些異步任務,也被稱爲Task, 也就是說,有的人會將MacroTask Queue稱爲Task Queue

它是如何工做的?

咱們結合一張圖來看看它在Event Loop是如何工做的:

在Event Loop完成一個階段,而後到另外一個階段以前,Event Loop將會執行這Next Tick Queue以及MicroTask Queue裏面的回調, 直到這兩個隊列爲空。一旦它們空了後,Event Loop會進入到下一個階段。

不少人會將這兩個隊列都看成是MicroTask Queue, 由於它們是處於同一階段執行的, 實際上,這兩個隊列執行依然是有一個前後順序的: Next Tick Queue的優先級高於MicroTask Queue, 注意:咱們這裏將兩個隊列稱爲Immediate Queue

E.g, The event loop is currently processing the immediates queue which has 5 handlers to be processed. Meanwhile, two handlers are added to the next tick queue. Once the event loop completes 5 handlers in the immediates queue, event loop will detect that there are two items to be processed in the next tick queue before moving to the close handlers queue. It will then execute all the handlers in the next tick queue and then will move to process the close handlers queue.

上面的那段話引用來自Event Loop and the Big Picture — NodeJS Event Loop Part 1, 即Event Loop在處理擁有5個handlersNext Tick Queue時,有2個handlers被添加到Next Tick Queue, 一旦5個handlers被處理完後,Event Loop會接着處理Next Tick Queue裏面新增的兩個handlers, 而後再處理MicroTask Queue裏的回調,當Immediate Queue裏面的回調都處理完成後,Event Loop將會進入到下一個階段。舉個例子:

Promise.resolve().then(() => {
  console.log('resolve1');
});

process.nextTick(function() {
  console.log('tick1');
  process.nextTick(function() {
    console.log('tick2');
  });
  process.nextTick(function() {
    console.log('tick3');
  });
});

Promise.resolve().then(() => {
  console.log('resolve2');
});

process.nextTick(function() {
  console.log('tick4');
});


Promise.resolve().then(() => {
  console.log('resolve3');
});

process.nextTick(function() {
  console.log('tick5');
});

那麼上面的執行順序是:tick1, tick4, tick5, tick2, tick3, resolve1, resolve2, resolve3。不要遞歸調用process.nextTick, 由於這會致使I/O starvation

推薦閱讀

參考

相關文章
相關標籤/搜索