本文轉載自微信公衆號「編程雜技 」,做者theanarkh。轉載本文請聯繫編程雜技 公衆號。 node
咱們從一個使用例子開始看看udp模塊的實現。c++
const dgram = require('dgram'); // 建立一個socket對象 const server = dgram.createSocket('udp4'); // 監聽udp數據的到來 server.on('message', (msg, rinfo) => { // 處理數據});// 綁定端口 server.bind(41234);
咱們看到建立一個udp服務器很簡單,首先申請一個socket對象,在nodejs中和操做系統中同樣,socket是對網絡通訊的一個抽象,咱們能夠把他理解成對傳輸層的抽象,他能夠表明tcp也能夠表明udp。咱們看一下createSocket作了什麼。編程
function createSocket(type, listener) { return new Socket(type, listener); } function Socket(type, listener) { EventEmitter.call(this); let lookup; let recvBufferSize; let sendBufferSize; let options; if (type !== null && typeof type === 'object') { options = type; type = options.type; lookup = options.lookup; recvBufferSize = options.recvBufferSize; sendBufferSize = options.sendBufferSize; } const handle = newHandle(type, lookup); this.type = type; if (typeof listener === 'function') this.on('message', listener); this[kStateSymbol] = { handle, receiving: false, bindState: BIND_STATE_UNBOUND, connectState: CONNECT_STATE_DISCONNECTED, queue: undefined, reuseAddr: options && options.reuseAddr, // Use UV_UDP_REUSEADDR if true. ipv6Only: options && options.ipv6Only, recvBufferSize, sendBufferSize };}
咱們看到一個socket對象是對handle的一個封裝。咱們看看handle是什麼。服務器
function newHandle(type, lookup) { // 用於dns解析的函數,好比咱們調send的時候,傳的是一個域名 if (lookup === undefined) { if (dns === undefined) { dns = require('dns'); } lookup = dns.lookup; } if (type === 'udp4') { const handle = new UDP(); handle.lookup = lookup4.bind(handle, lookup); return handle; } // 忽略ipv6的處理}
handle又是對UDP模塊的封裝,UDP是c++模塊,咱們看看該c++模塊的定義。微信
// 定義一個v8函數模塊 Local<FunctionTemplate> t = env->NewFunctionTemplate(New); // t新建的對象須要額外拓展的內存 t->InstanceTemplate()->SetInternalFieldCount(1); // 導出給js層使用的名字 Local<String> udpString = FIXED_ONE_BYTE_STRING(env->isolate(), "UDP"); t->SetClassName(udpString); // 屬性的存取屬性 enum PropertyAttribute attributes = static_cast<PropertyAttribute>(ReadOnly | DontDelete); Local<Signature> signature = Signature::New(env->isolate(), t); // 新建一個函數模塊 Local<FunctionTemplate> get_fd_templ = FunctionTemplate::New(env->isolate(), UDPWrap::GetFD, env->as_callback_data(), signature); // 設置一個訪問器,訪問fd屬性的時候,執行get_fd_templ,從而執行UDPWrap::GetFD t->PrototypeTemplate()->SetAccessorProperty(env->fd_string(), get_fd_templ, Local<FunctionTemplate>(), attributes); // 導出的函數 env->SetProtoMethod(t, "open", Open); // 忽略一系列函數 // 導出給js層使用 target->Set(env->context(), udpString, t->GetFunction(env->context()).ToLocalChecked()).Check();
在c++層通用邏輯中咱們講過相關的知識,這裏就不詳細講述了,當咱們在js層new UDP的時候,會新建一個c++對象。網絡
UDPWrap::UDPWrap(Environment* env, Local<Object> object) : HandleWrap(env, object, reinterpret_cast<uv_handle_t*>(&handle_), AsyncWrap::PROVIDER_UDPWRAP) { int r = uv_udp_init(env->event_loop(), &handle_);}
執行了uv_udp_init初始化udp對應的handle。咱們看一下libuv的定義。dom
int uv_udp_init_ex(uv_loop_t* loop, uv_udp_t* handle, unsigned int flags) { int domain; int err; int fd; /* Use the lower 8 bits for the domain */ domain = flags & 0xFF; // 申請一個socket,返回一個fd fd = uv__socket(domain, SOCK_DGRAM, 0); uv__handle_init(loop, (uv_handle_t*)handle, UV_UDP); handle->alloc_cb = NULL; handle->recv_cb = NULL; handle->send_queue_size = 0; handle->send_queue_count = 0; // 初始化io觀察者(尚未註冊到事件循環的poll io階段),監聽的文件描述符是fd,回調是uv__udp_io uv__io_init(&handle->io_watcher, uv__udp_io, fd); // 初始化寫隊列 QUEUE_INIT(&handle->write_queue); QUEUE_INIT(&handle->write_completed_queue); return 0;}
到這裏,就是咱們在js層執行dgram.createSocket('udp4')的時候,在nodejs中主要的執行過程。回到最開始的例子,咱們看一下執行bind的時候的邏輯。socket
Socket.prototype.bind = function(port_, address_ /* , callback */) { let port = port_; // socket的狀態 const state = this[kStateSymbol]; // 已經綁定過了則報錯 if (state.bindState !== BIND_STATE_UNBOUND) throw new ERR_SOCKET_ALREADY_BOUND(); // 不然標記已經綁定了 state.bindState = BIND_STATE_BINDING; // 沒傳地址則默認綁定全部地址 if (!address) { if (this.type === 'udp4') address = '0.0.0.0'; else address = '::'; } // dns解析後在綁定,若是須要的話 state.handle.lookup(address, (err, ip) => { if (err) { state.bindState = BIND_STATE_UNBOUND; this.emit('error', err); return; } const err = state.handle.bind(ip, port || 0, flags); if (err) { const ex = exceptionWithHostPort(err, 'bind', ip, port); state.bindState = BIND_STATE_UNBOUND; this.emit('error', ex); // Todo: close? return; } startListening(this); return this;}
bind函數主要的邏輯是handle.bind和startListening。咱們一個個看。咱們看一下c++層的bind。tcp
void UDPWrap::DoBind(const FunctionCallbackInfo<Value>& args, int family) { UDPWrap* wrap; ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder(), args.GetReturnValue().Set(UV_EBADF)); // bind(ip, port, flags) CHECK_EQ(args.Length(), 3); node::Utf8Value address(args.GetIsolate(), args[0]); Local<Context> ctx = args.GetIsolate()->GetCurrentContext(); uint32_t port, flags; if (!args[1]->Uint32Value(ctx).To(&port) || !args[2]->Uint32Value(ctx).To(&flags)) return; struct sockaddr_storage addr_storage; int err = sockaddr_for_family(family, address.out(), port, &addr_storage); if (err == 0) { err = uv_udp_bind(&wrap->handle_, reinterpret_cast<const sockaddr*>(&addr_storage), flags); } args.GetReturnValue().Set(err);}
也沒有太多邏輯,處理參數而後執行uv_udp_bind,uv_udp_bind就不具體展開了,和tcp相似,設置一些標記和屬性,而後執行操做系統bind的函數把本端的ip和端口保存到socket中。咱們繼續看startListening。ide
function startListening(socket) { const state = socket[kStateSymbol]; // 有數據時的回調,觸發message事件 state.handle.onmessage = onMessage; // 重點,開始監聽數據 state.handle.recvStart(); state.receiving = true; state.bindState = BIND_STATE_BOUND; if (state.recvBufferSize) bufferSize(socket, state.recvBufferSize, RECV_BUFFER); if (state.sendBufferSize) bufferSize(socket, state.sendBufferSize, SEND_BUFFER); socket.emit('listening');}
重點是recvStart函數,咱們到c++的實現。
void UDPWrap::RecvStart(const FunctionCallbackInfo<Value>& args) { UDPWrap* wrap; ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder(), args.GetReturnValue().Set(UV_EBADF)); int err = uv_udp_recv_start(&wrap->handle_, OnAlloc, OnRecv); // UV_EALREADY means that the socket is already bound but that's okay if (err == UV_EALREADY) err = 0; args.GetReturnValue().Set(err);}
OnAlloc, OnRecv分別是分配內存接收數據的函數和數據到來時執行的回調。繼續看libuv
int uv__udp_recv_start(uv_udp_t* handle, uv_alloc_cb alloc_cb, uv_udp_recv_cb recv_cb) { int err; err = uv__udp_maybe_deferred_bind(handle, AF_INET, 0); if (err) return err; // 保存一些上下文 handle->alloc_cb = alloc_cb; handle->recv_cb = recv_cb; // 註冊io觀察者到loop,若是事件到來,等到poll io階段處理 uv__io_start(handle->loop, &handle->io_watcher, POLLIN); uv__handle_start(handle); return 0;}
uv__udp_recv_start主要是註冊io觀察者到loop,等待事件到來的時候,在poll io階段處理。前面咱們講過,回調函數是uv__udp_io。咱們看一下事件觸發的時候,該函數怎麼處理的。
static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) { uv_udp_t* handle; handle = container_of(w, uv_udp_t, io_watcher); // 可讀事件觸發 if (revents & POLLIN) uv__udp_recvmsg(handle); // 可寫事件觸發 if (revents & POLLOUT) { uv__udp_sendmsg(handle); uv__udp_run_completed(handle); }}
咱們這裏先分析可讀事件的邏輯。咱們看uv__udp_recvmsg。
static void uv__udp_recvmsg(uv_udp_t* handle) { struct sockaddr_storage peer; struct msghdr h; ssize_t nread; uv_buf_t buf; int flags; int count; count = 32; do { // 分配內存接收數據,c++層設置的 buf = uv_buf_init(NULL, 0); handle->alloc_cb((uv_handle_t*) handle, 64 * 1024, &buf); memset(&h, 0, sizeof(h)); memset(&peer, 0, sizeof(peer)); h.msg_name = &peer; h.msg_namelen = sizeof(peer); h.msg_iov = (void*) &buf; h.msg_iovlen = 1; // 調操做系統的函數讀取數據 do { nread = recvmsg(handle->io_watcher.fd, &h, 0); } while (nread == -1 && errno == EINTR); // 調用c++層回調 handle->recv_cb(handle, nread, &buf, (const struct sockaddr*) &peer, flags); }}
libuv會回調c++層,而後c++層回調到js層,最後觸發message事件,這就是對應開始那段代碼的message事件。
【編輯推薦】
【責任編輯:武曉燕 TEL:(010)68476606】