Node中異步和同步的實現

使用過node的朋友都知道,它最重要的也是最值得稱道的就是使用了異步事件驅動的框架libuv,這個框架使得被稱爲玩具語言的JavaScript也在後端語言中佔了一席之地(固然V8的高性能也是功不可沒,並且libuv的代碼很是優雅,很值得你們的學習。不過libuv整個框架很大,咱們不可能只經過一篇文章就能瞭解到它全部的東西,因此我挑選了node中最簡單fs模塊同步讀和異步讀文件的過程講解來對libuv的一個大概過程有所瞭解。node

fs.readSync

fs.readSync這個方法我相信沒有人會陌生,在node中同步讀取文件,不過再不少文章中都不推薦使用這個方法,由於會形成node單線程的阻塞,對於一些比較繁忙的node實例來講是很是不友好的,不過今天咱們不討論這些,只討論其中的實現。實現咱們在node工程的lib目錄中找到fs.js能夠看到它的代碼:後端

function(fd, buffer, offset, length, position) {
	if (length === 0) {
		return 0;
	}

	return binding.read(fd, buffer, offset, length, position);
};
複製代碼

其中直接調用了binding.read其中的binding的申明是這樣的binding = process.binding('fs'),這個天然就是node的builtin_module,因此咱們直接找到src/node_flie.cc文件中。app

node::InitFs方法中咱們能夠看到全部的方法申明,其中返回對象中的read方法對應的是static void Read(const FunctionCallbackInfo<Value>& args)咱們來看一下他的核心代碼:框架

static void Read(const FunctionCallbackInfo<Value>& args) {
	//獲取傳入參數,並對參數進行處理
	....
	//將傳入的buffer的內存的地址取出並用來存儲read出的內容
	char * buf = nullptr;
	Local<Object> buffer_obj = args[1]->ToObject(env->isolate());
	char *buffer_data = Buffer::Data(buffer_obj); 		
	size_t buffer_length = Buffer::Length(buffer_obj);
	...
	buf = buffer_data + off;

	uv_buf_t uvbuf = uv_buf_init(const_cast<char*>(buf), len);
	//執行read操做
	req = args[5];

	if (req->IsObject()) {
		ASYNC_CALL(read, req, UTF8, fd, &uvbuf, 1, pos);
	} else {
		SYNC_CALL(read, 0, fd, &uvbuf, 1, pos)
		args.GetReturnValue().Set(SYNC_RESULT);
	}
}
複製代碼

從上面的代碼,咱們能夠看出第六個參數是個很關鍵的參數,若是傳入了一個對象則使用異步操做,而咱們的fs.readSync方法沒有傳入第六個參數隨意使用的是同步操做,而且在操做完成後當即返回結果。異步

SYNC_CALL這個宏中具體作了什麼呢,他主要是調用另一個宏:async

#define SYNC_CALL(func, path, ...)  \
SYNC_DEST_CALL(func, path, nullptr, __VA_ARGS__) \
複製代碼

其中__VA_ARGS__表示的是除了func和path外其餘傳入宏的參數,接下來咱們來看一下SYNC_DEST_CALL宏:ide

#define SYNC_DEST_CALL(func, path, dest, ...)                                 \
	fs_req_wrap req_wrap;                                                     \
	env->PrintSyncTrace();                                                    \
	int err = uv_fs_ ## func(env->event_loop(),                               \
	                     &req_wrap.req,                                       \
	                     __VA_ARGS__,                                         \
	                     nullptr);                                            \
	if (err < 0) {                                                            \
		return env->ThrowUVException(err, #func, nullptr, path, dest);        			\
	}                                                                         \
複製代碼

其中在宏命令中的 ##標記是鏈接符的意思,因此這裏其實就是調用uv_fs_read方法,而env->PrintSyncTrace()是爲了在node打開--trace-sync-io時用來追蹤代碼中何處使用了同步io時使用,能夠經過這個方法打出代碼中調用同步io的位置,因此當你的代碼常常發生阻塞的時候你能夠經過這個來調優你的代碼(固然阻塞的緣由未必是同步io形成的)。uv_fs_read方法是libuv是來讀取文件的調用,咱們找到這個方法的位置,就在deps/uv/src/unix/fs.c中:函數

int uv_fs_read(uv_loop_t* loop, uv_fs_t* req,
           uv_file file,
           const uv_buf_t bufs[],
           unsigned int nbufs,
           int64_t off,
           uv_fs_cb cb) {
	INIT(READ);

	if (bufs == NULL || nbufs == 0)
	return -EINVAL;

	req->file = file;

	req->nbufs = nbufs;
	req->bufs = req->bufsml;
	if (nbufs > ARRAY_SIZE(req->bufsml))
		req->bufs = uv__malloc(nbufs * sizeof(*bufs));

	if (req->bufs == NULL) {
	if (cb != NULL)
		uv__req_unregister(loop, req);
		return -ENOMEM;
	}

	memcpy(req->bufs, bufs, nbufs * sizeof(*bufs));

	req->off = off;
	POST;
}
複製代碼

首先咱們來看宏調用INIT(READ):oop

#define INIT(subtype)                            	 \
	do {                                             \
		if (req == NULL)                             \
			return -EINVAL;                          \
		req->type = UV_FS;                           \
		if (cb != NULL)                              \
			uv__req_init(loop, req, UV_FS);          \
		req->fs_type = UV_FS_ ## subtype;            \
		req->result = 0;                             \
		req->ptr = NULL;                             \
		req->loop = loop;                            \
		req->path = NULL;                            \
		req->new_path = NULL;                        \
		req->cb = cb;                                \
	}                                                \
	while (0)
複製代碼

這是一個很明顯的初始化操做,這裏主要說兩個最重要地方,首先是將req的loop指向node的event_loop,其次是指定了fs_type喂UV_FS_READ這個是一個重要的標誌,爲後面的工做作識別。作了這些操做之後回到uv_fs_read方法來,咱們能夠看到在POST宏調用以前都是一些對參數的處理工做,這個沒什麼可講的,咱們主要來看看POST宏:post

#define POST                                                                  \
	do {                                                                      \
		if (cb != NULL) {                                                     \
			uv__work_submit(loop, &req->work_req, uv__fs_work, uv__fs_done);  \
			return 0;                                                         \
		}                                                                     \
		else {                                                                \
			uv__fs_work(&req->work_req);                                      \
			return req->result;                                               \
		}                                                                     \
	}                                                                         \
	while (0)
複製代碼

從上面的代碼咱們能夠看到在有cb的時候調用的是uv__work_submit,這就是異步的狀況下調用,等會兒咱們再講。如今咱們先說uv__fs_work 的方法:

static void uv__fs_work(struct uv__work* w) {
	int retry_on_eintr;
	uv_fs_t* req;
	ssize_t r;

	req = container_of(w, uv_fs_t, work_req);
	retry_on_eintr = !(req->fs_type == UV_FS_CLOSE);

	do {
		errno = 0;

		#define X(type, action)                               \
			case UV_FS_ ## type:                              \
			r = action;                                       \
			break;

			switch (req->fs_type) {
				...
				X(WRITE, uv__fs_buf_iter(req, uv__fs_write));
				X(OPEN, uv__fs_open(req));
				X(READ, uv__fs_buf_iter(req, uv__fs_read));
				...
			}
		#undef X
	} while (r == -1 && errno == EINTR && retry_on_eintr);

	if (r == -1)
		req->result = -errno;
	else
		req->result = r;

	if (r == 0 && (req->fs_type == UV_FS_STAT ||
	             req->fs_type == UV_FS_FSTAT ||
	             req->fs_type == UV_FS_LSTAT)) {
		req->ptr = &req->statbuf;
	}
}
複製代碼

這個方法由於是fs文件共用的方法,因此在其中會根據不一樣的類型的來執行不一樣的方法,剛剛咱們看到了在初始化req的時候給了它UV_FS_READ的type,因此會執行方法uv__fs_buf_iter(req, uv__fs_read),uv__fs_buf_iter方法中主要是調用了傳入的第二個參數uv__fs_read函數,這裏的代碼就不貼了很簡單,就是普通的read(還有readv和pread)操做,不過其中有個點就是這段代碼:

#if defined(_AIX)
	struct stat buf;
	if(fstat(req->file, &buf))
		return -1;
	if(S_ISDIR(buf.st_mode)) {
		errno = EISDIR;
		return -1;
	}
#endif
複製代碼

這段代碼很好地解釋了node文檔中關於fs.readFileSync的這一段

Note: Similar to fs.readFile(), when the path is a directory, the behavior of fs.readFileSync() is platform-specific.

// macOS, Linux, and Windows
fs.readFileSync('<directory>');
// => [Error: EISDIR: illegal operation on a directory, read <directory>]

//  FreeBSD
fs.readFileSync('<directory>'); // => null, <data>
複製代碼

uv__fs_read成功讀取文件後,req->bufs中就已經有了所需的內容了,從node_file.cc的static void Read(const FunctionCallbackInfo<Value>& args)方法中咱們能夠知道req->bufs的內存所指向的則是binding.read(fd, buffer, offset, length, position)傳入的buffer內存段。這個時候就已經獲得了想要讀取的內容了。而咱們平時常用的fs.readFileSync則是先打開文件獲得其fd,並生成一段buffer而後調用fs.readSync,是生成的buffer中取得文件內容再返回,簡化了不少操做,因此更受到你們的青睞。到這裏咱們的同步讀取就已經結束了,算是很簡單,由於read這些操做都是阻塞性的操做,因此對於單線程的node進程來講確實容易遇到性能瓶頸,下面咱們來講一下node的異步讀取fs.read函數。

fs.read

異步的操做遠比同步要複雜不少,咱們來一步步的瞭解。首先咱們先來看 ocess.nextTick(function() { callback && callback(null, 0, buffer); }); }

function wrapper(err, bytesRead) {
		// Retain a reference to buffer so that it can't be GC'ed too soon.
		callback && callback(err, bytesRead || 0, buffer);
	}

	var req = new FSReqWrap();
	req.oncomplete = wrapper;

	binding.read(fd, buffer, offset, length, position, req);
};
複製代碼

從剛剛同步的分析中,咱們知道當bingd.read傳入第六個參數的時候則會異步執行read操做,這裏就傳入了第六個參數req, req = new FSReqWrap();req是FSReqWrap = binding.FSReqWrap的實例,因此咱們從node::InitFs中能夠看到以下代碼:

Local<FunctionTemplate> fst = FunctionTemplate::New(env->isolate(), NewFSReqWrap);
fst->InstanceTemplate()->SetInternalFieldCount(1);
AsyncWrap::AddWrapMethods(env, fst);
Local<String> wrapString =
FIXED_ONE_BYTE_STRING(env->isolate(), "FSReqWrap");
fst->SetClassName(wrapString);
target->Set(wrapString, fst->GetFunction());
複製代碼

上面的代碼使用v8提供的API生成FSReqWrap的構造函數而void NewFSReqWrap(const FunctionCallbackInfo<Value>& args)就會提及構造函數的內容。這個函數主要的主要工做只有一個object->SetAlignedPointerInInternalField(0, nullptr);,不過這個只跟C++對象的嵌入有關。從以前咱們討論過的static void Read(const FunctionCallbackInfo<Value>& args)方法中聊到過,當傳入req對象的時候回調用宏命令ASYNC_CALL,這個宏命令跟以前的SYNC_CALL同樣的調用,經過ASYNC_DEST_CALL(func, req, nullptr, encoding, __VA_ARGS__)去調用真正的邏輯,因此咱們直接來看ASYNC_DEST_CALL的代碼:

#define ASYNC_DEST_CALL(func, request, dest, encoding, ...)                   \
	Environment* env = Environment::GetCurrent(args);                         \
	CHECK(request->IsObject());                                               \
	FSReqWrap* req_wrap = FSReqWrap::New(env, request.As<Object>(),           \
	                                   #func, dest, encoding);                \
	int err = uv_fs_ ## func(env->event_loop(),                               \
	                       req_wrap->req(),                                   \
	                       __VA_ARGS__,                                       \
	                       After);                                            \
	req_wrap->Dispatched();                                                   \
	if (err < 0) {                                                            \
		uv_fs_t* uv_req = req_wrap->req();                                    \
		uv_req->result = err;                                                 \
		uv_req->path = nullptr;                                               \
		After(uv_req);                                                        \
		req_wrap = nullptr;                                                   \
	} else {                                                                  \
		args.GetReturnValue().Set(req_wrap->persistent());                    \
	}
複製代碼

上面的代碼咱們能夠看到經過FSReqWrap::New來生成了req_wrap,這個方法的執行是node生成對象的一個基本邏輯,因此咱們着重說一下,首先咱們來看一下FSReqWrap::New的代碼:

const bool copy = (data != nullptr && ownership == COPY);
const size_t size = copy ? 1 + strlen(data) : 0;
FSReqWrap* that;
char* const storage = new char[sizeof(*that) + size];
that = new(storage) FSReqWrap(env, req, syscall, data, encoding);
if (copy)
	that->data_ = static_cast<char*>(memcpy(that->inline_data(), data, size));
return that;
複製代碼

這段代碼咱們主要了解一下new(storage) FSReqWrap(env, req, syscall, data, encoding);,首先咱們經過一張圖來了解一下FSReqWrap的繼承關係:

image1

上圖中咱們給出了一些關鍵對象的關鍵屬性和方法,因此咱們能夠看出FSReqWrap各個繼承對象的主要做用:

1.繼承ReqWrap對象的關鍵屬性uv_fs_t,和關鍵方法ReqWrap<T>::Dispatched,使用該方法中的req_.data = this;在libuv的方法中傳遞自身。

2.繼承AsyncWrap中的MakeCallback,這個函數會執行咱們傳入的異步讀取完成後的回調,在這個例子中就是使用js中經過req.oncomplete = wrapper;傳入的wrapper函數。

3.繼承BaseObject對象中的關鍵屬性Persistent<Object> persistent_handle_Environment* env_,前者是v8中的持久化js對象,和Local的關係能夠參見v8官方的解釋:

Local handles are held on a stack and are deleted when the appropriate destructor is called. These handles' lifetime is determined by a handle scope, which is often created at the beginning of a function call. When the handle scope is deleted, the garbage collector is free to deallocate those objects previously referenced by handles in the handle scope, provided they are no longer accessible from JavaScript or other handles.

Persistent handles provide a reference to a heap-allocated JavaScript Object, just like a local handle. There are two flavors, which differ in the lifetime management of the reference they handle. Use a persistent handle when you need to keep a reference to an object for more than one function call, or when handle lifetimes do not correspond to C++ scopes. 
複製代碼

大概的意思就是,Local會隨着在棧上分配的scope析構而被GC清理掉,可是Persistent不會。有點相似棧上分配的內存和堆上分配內存的關係,想要在超過一個function中使用就要使用Persistent的v8對象,然後者是node的執行環境,幾乎囊括了node執行中所須要的一切方法和屬性(這一塊很是大,涉及的也不少,實在很難一兩句講清楚,跟本文討論內容無直接聯繫只能略過)。

最後,在FSReqWrap的構造函數中經過Wrap(object(), this)將咱們上面提到的Persistent<Object> persistent_handle_持久化js對象和FSReqWrap的C++對象關聯起來,這是node中最經常使用的方式(也是ebmed開發中最經常使用的技巧)。咱們回到宏ASYNC_DEST_CALL中來,如今知道經過方法FSReqWrap::New方法使得FSReqWrap對象實例和剛剛在js中new的req對象鏈接了起來,也使libuv的uv_fs_t和其實例聯繫了起來。這個時候就跟前面同樣開始調用uv_fs_read,此次在最後一個參數cb中傳入了函數void After(uv_fs_t *req)做爲回調函數,從以前同步討論中咱們就說過傳入回調函數後狀況的不一樣,首先是INIT宏中在會多一步操做,經過uv__req_init中的QUEUE_INSERT_TAIL(&(loop)->active_reqs, &(req)->active_queue);宏方法將req放入loop的acitve_reqs的循環鏈表中(libuv的循環鏈表實現很是的有意思,有興趣的朋友能夠參考文章:libuv queue的實現)。而在POST中有回調的函數的狀況是直接經過uv__work_submit(loop, &req->work_req, uv__fs_work, uv__fs_done)調用來完成任務,咱們來看一下uv__work_submit函數的代碼,這個方法在deps/uv/src/threadpool中:

uv_once(&once, init_once);
w->loop = loop;
w->work = work;
w->done = done;
post(&w->wq);
複製代碼

該方法首先經過uv_once在第一次調用該方法是啓動幾個工做線程,這些線程主要執行static void worker(void* arg)方法:

for (;;) {
	uv_mutex_lock(&mutex);

	while (QUEUE_EMPTY(&wq)) {
		idle_threads += 1;
		uv_cond_wait(&cond, &mutex);
		idle_threads -= 1;
	}

	q = QUEUE_HEAD(&wq);

	if (q == &exit_message)
		uv_cond_signal(&cond);
	else {
		QUEUE_REMOVE(q);
		QUEUE_INIT(q);  
	}

	uv_mutex_unlock(&mutex);

	if (q == &exit_message)
		break;

	w = QUEUE_DATA(q, struct uv__work, wq);
	w->work(w);

	uv_mutex_lock(&w->loop->wq_mutex);
	w->work = NULL;  
	QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
	uv_async_send(&w->loop->wq_async);
	uv_mutex_unlock(&w->loop->wq_mutex);
}
複製代碼

其中wq是一個循環鏈表的隊列,記錄了全部註冊的任務,當沒有任務時會經過uv_cond_wait使該線程阻塞,而在有任務的時候會在隊列中取出該任務再經過w->work(w)執行其任務,在執行完成後會將任務註冊在loop->wq的隊列中再經過uv_async_send(&w->loop->wq_async)通知主線程從loop->wq的隊列取出該任務並執行其回調。

再回到uv__work_submit經過work方法咱們就知道它接下來的工做是作什麼了,註冊work函數也就是傳入uv__fs_work函數,這個函數咱們以前就介紹過了,這裏就很少作解釋了,只是在異步中是經過worker線程來完成的,不會阻塞主線程。而第二個函數則是註冊完成後主線執行的回調,也就是uv__fs_done:

req = container_of(w, uv_fs_t, work_req);
uv__req_unregister(req->loop, req);

if (status == -ECANCELED) {
	assert(req->result == 0);
	req->result = -ECANCELED;
}

req->cb(req);
複製代碼

從中咱們能夠看到,這個函數會將該任務的req從loop的acitve_reqs去去掉,而後執行傳入uv_fs_read中的回調函數。而最後的post中主要是將當前任務註冊到wq的列表中,並使用條件變量的uv_cond_signal函數觸發uv_cond_wait中阻塞的函數運做起來,接着worker進程就能執行咱們剛剛說的過程了。

上面咱們講解了大概的過程,從這個過程當中就能明白異步的讀操做是如何執行的,經過使用wokrer線程來作實際的讀操做,而主線程則是在worker線程完成操做後,執行回調。不過如今回過頭來咱們看看,在worker線程之後是如何通知主線程呢?剛剛咱們說到了是經過uv_async_send(&w->loop->wq_async)的調用通知的,這裏咱們來看看他具體是如何作的。首先咱們要回到loop的初始化處,函數uv_loop_init中,在這個函數中有這樣一個調用: uv_async_init(loop, &loop->wq_async, uv__work_done);。這個調用會生成一個管道,並經過如下語句:

uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0]);
uv__io_start(loop, &loop->async_io_watcher, POLLIN);
loop->async_wfd = pipefd[1];
複製代碼

實現當有數據往pipefd[1]中寫時,主線會在讀取數據後執行uv__async_io的調用,在uv__async_io中最重要的工做就是執行其async_cb,而在loop初始化的時候註冊的async_cb是函數uv__work_done:

//取數據的操做
...

while (!QUEUE_EMPTY(&wq)) {
	q = QUEUE_HEAD(&wq);
	QUEUE_REMOVE(q);

	w = container_of(q, struct uv__work, wq);
	err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
	w->done(w, err);
}
複製代碼

這裏咱們能夠看到,會從loop->wq隊列中取出放入其中全部任務,並經過w->done(w, err)執行其回調,而剛剛在worker線程中的調用uv_async_send(&w->loop->wq_async)便是經過往loop->async_wfd,即上面提到的pipefd[1]寫一個字節來觸發整個過程。到這裏最開始uv_fs_read中註冊的函數uv__fs_done就能夠執行了,而這個函數的主要任務便是調用傳入uv_fs_read的cb參數,即void After(uv_fs_t *req)函數,這個函數處理的狀況比較多,就不貼代碼了惟一要講的就是他的第一句

FSReqWrap* req_wrap = static_cast<FSReqWrap*>(req->data);
複製代碼

這裏就回到了咱們前面所說的經過req->data將FSReqWrap的對象實例串聯起來,到這裏就能順利的經過這個實例獲得以前初始化的js對象,並執行它的oncomplete函數了。回到js的代碼中咱們能夠看到這個函數執行的操做就是調用咱們傳入的callback的函數:

callback && callback(err, bytesRead || 0, buffer);
複製代碼

至此,fs.read整個異步操做就已經完成了,至於fs.readFile這個操做放在異步中就複雜了許多,先異步打開文件,再經過回調中註冊異步任務取得文件的stat,最後經過回調去讀取文件,並且若是文件太大不能一次讀完(一次最多讀8*1024的字節),會不斷的回調繼續讀取文件,直到讀完才異步關閉文件,而且經過異步關閉文件的回調執行傳入的回調函數。可見爲了咱們平時開發中的方便,node的開發者仍是付出了不少的努力的。

總結

在瞭解了node對於文件讀取的同步和異步實現後,咱們就能看出libuv的精妙之處了。特別是異步時經過子線程處理任務,再用管道通知主線執行回調的方式,真的是爲node這樣的單線程語言量身定作,固然可能也有同窗有疑問,主線是如何讀取管道值的呢?這又是一個很大的問題,咱們只能之後的文章再來解釋了。這篇文章就先到此爲止了,但願經過該文能幫助你們對node背後的邏輯會多一點了解。

相關文章
相關標籤/搜索