Nodejs源碼解析之UDP服務器

本文轉載自微信公衆號「編程雜技 」,做者theanarkh。轉載本文請聯繫編程雜技 公衆號。   node

咱們從一個使用例子開始看看udp模塊的實現。c++

const dgram = require('dgram'); 
// 建立一個socket對象 
const server = dgram.createSocket('udp4'); 
// 監聽udp數據的到來 
server.on('message', (msg, rinfo) => { 
  // 處理數據});// 綁定端口 
server.bind(41234);

咱們看到建立一個udp服務器很簡單,首先申請一個socket對象,在nodejs中和操做系統中同樣,socket是對網絡通訊的一個抽象,咱們能夠把他理解成對傳輸層的抽象,他能夠表明tcp也能夠表明udp。咱們看一下createSocket作了什麼。編程

function createSocket(type, listener) { 
  return new Socket(type, listener); 
} 
 
function Socket(type, listener) { 
  EventEmitter.call(this); 
  let lookup; 
  let recvBufferSize; 
  let sendBufferSize; 
 
  let options; 
  if (type !== null && typeof type === 'object') { 
    options = type; 
    type = options.type; 
    lookup = options.lookup; 
    recvBufferSize = options.recvBufferSize; 
    sendBufferSize = options.sendBufferSize; 
  } 
  const handle = newHandle(type, lookup);  
  this.type = type; 
  if (typeof listener === 'function') 
    this.on('message', listener); 
 
  this[kStateSymbol] = { 
    handle, 
    receiving: false, 
    bindState: BIND_STATE_UNBOUND, 
    connectState: CONNECT_STATE_DISCONNECTED, 
    queue: undefined, 
    reuseAddr: options && options.reuseAddr, // Use UV_UDP_REUSEADDR if true. 
    ipv6Only: options && options.ipv6Only, 
    recvBufferSize, 
    sendBufferSize 
  };}

咱們看到一個socket對象是對handle的一個封裝。咱們看看handle是什麼。服務器

function newHandle(type, lookup) { 
  // 用於dns解析的函數,好比咱們調send的時候,傳的是一個域名 
  if (lookup === undefined) { 
    if (dns === undefined) { 
      dns = require('dns'); 
    } 
 
    lookup = dns.lookup; 
  }  
 
  if (type === 'udp4') { 
    const handle = new UDP(); 
    handle.lookup = lookup4.bind(handle, lookup); 
    return handle; 
  } 
  // 忽略ipv6的處理}

handle又是對UDP模塊的封裝,UDP是c++模塊,咱們看看該c++模塊的定義。微信

// 定義一個v8函數模塊 
Local<FunctionTemplate> t = env->NewFunctionTemplate(New); 
  // t新建的對象須要額外拓展的內存 
  t->InstanceTemplate()->SetInternalFieldCount(1); 
  // 導出給js層使用的名字 
  Local<String> udpString = FIXED_ONE_BYTE_STRING(env->isolate(), "UDP"); 
  t->SetClassName(udpString); 
  // 屬性的存取屬性 
  enum PropertyAttribute attributes = static_cast<PropertyAttribute>(ReadOnly | DontDelete); 
 
  Local<Signature> signature = Signature::New(env->isolate(), t); 
  // 新建一個函數模塊 
  Local<FunctionTemplate> get_fd_templ = 
      FunctionTemplate::New(env->isolate(), 
                            UDPWrap::GetFD, 
                            env->as_callback_data(), 
                            signature); 
  // 設置一個訪問器,訪問fd屬性的時候,執行get_fd_templ,從而執行UDPWrap::GetFD 
  t->PrototypeTemplate()->SetAccessorProperty(env->fd_string(), 
                                              get_fd_templ, 
                                              Local<FunctionTemplate>(), 
                                              attributes); 
  // 導出的函數 
  env->SetProtoMethod(t, "open", Open); 
  // 忽略一系列函數 
  // 導出給js層使用 
  target->Set(env->context(), 
              udpString, 
              t->GetFunction(env->context()).ToLocalChecked()).Check();

在c++層通用邏輯中咱們講過相關的知識,這裏就不詳細講述了,當咱們在js層new UDP的時候,會新建一個c++對象。網絡

UDPWrap::UDPWrap(Environment* env, Local<Object> object) 
    : HandleWrap(env, 
                 object, 
                 reinterpret_cast<uv_handle_t*>(&handle_), 
                 AsyncWrap::PROVIDER_UDPWRAP) { 
  int r = uv_udp_init(env->event_loop(), &handle_);}

執行了uv_udp_init初始化udp對應的handle。咱們看一下libuv的定義。dom

int uv_udp_init_ex(uv_loop_t* loop, uv_udp_t* handle, unsigned int flags) { 
  int domain; 
  int err; 
  int fd; 
 
  /* Use the lower 8 bits for the domain */ 
  domain = flags & 0xFF; 
  // 申請一個socket,返回一個fd 
  fd = uv__socket(domain, SOCK_DGRAM, 0); 
  uv__handle_init(loop, (uv_handle_t*)handle, UV_UDP); 
  handle->alloc_cb = NULL; 
  handle->recv_cb = NULL; 
  handle->send_queue_size = 0; 
  handle->send_queue_count = 0; 
  // 初始化io觀察者(尚未註冊到事件循環的poll io階段),監聽的文件描述符是fd,回調是uv__udp_io 
  uv__io_init(&handle->io_watcher, uv__udp_io, fd); 
  // 初始化寫隊列 
  QUEUE_INIT(&handle->write_queue); 
  QUEUE_INIT(&handle->write_completed_queue); 
  return 0;}

到這裏,就是咱們在js層執行dgram.createSocket('udp4')的時候,在nodejs中主要的執行過程。回到最開始的例子,咱們看一下執行bind的時候的邏輯。socket

Socket.prototype.bind = function(port_, address_ /* , callback */) { 
  let port = port_; 
  // socket的狀態 
  const state = this[kStateSymbol]; 
  // 已經綁定過了則報錯 
  if (state.bindState !== BIND_STATE_UNBOUND) 
    throw new ERR_SOCKET_ALREADY_BOUND(); 
  // 不然標記已經綁定了 
  state.bindState = BIND_STATE_BINDING; 
  // 沒傳地址則默認綁定全部地址 
  if (!address) { 
    if (this.type === 'udp4') 
      address = '0.0.0.0'; 
    else 
      address = '::'; 
  } 
  // dns解析後在綁定,若是須要的話 
  state.handle.lookup(address, (err, ip) => { 
    if (err) { 
      state.bindState = BIND_STATE_UNBOUND; 
      this.emit('error', err); 
      return; 
    } 
    const err = state.handle.bind(ip, port || 0, flags); 
    if (err) { 
       const ex = exceptionWithHostPort(err, 'bind', ip, port); 
       state.bindState = BIND_STATE_UNBOUND; 
       this.emit('error', ex); 
       // Todo: close? 
       return; 
     } 
 
     startListening(this); 
  return this;}

bind函數主要的邏輯是handle.bind和startListening。咱們一個個看。咱們看一下c++層的bind。tcp

void UDPWrap::DoBind(const FunctionCallbackInfo<Value>& args, int family) { 
  UDPWrap* wrap; 
  ASSIGN_OR_RETURN_UNWRAP(&wrap, 
                          args.Holder(), 
                          args.GetReturnValue().Set(UV_EBADF)); 
 
  // bind(ip, port, flags) 
  CHECK_EQ(args.Length(), 3); 
  node::Utf8Value address(args.GetIsolate(), args[0]); 
  Local<Context> ctx = args.GetIsolate()->GetCurrentContext(); 
  uint32_t port, flags; 
  if (!args[1]->Uint32Value(ctx).To(&port) || 
      !args[2]->Uint32Value(ctx).To(&flags)) 
    return; 
  struct sockaddr_storage addr_storage; 
  int err = sockaddr_for_family(family, address.out(), port, &addr_storage); 
  if (err == 0) { 
    err = uv_udp_bind(&wrap->handle_, 
                      reinterpret_cast<const sockaddr*>(&addr_storage), 
                      flags); 
  } 
 
  args.GetReturnValue().Set(err);}

也沒有太多邏輯,處理參數而後執行uv_udp_bind,uv_udp_bind就不具體展開了,和tcp相似,設置一些標記和屬性,而後執行操做系統bind的函數把本端的ip和端口保存到socket中。咱們繼續看startListening。ide

function startListening(socket) { 
  const state = socket[kStateSymbol]; 
  // 有數據時的回調,觸發message事件 
  state.handle.onmessage = onMessage; 
  // 重點,開始監聽數據 
  state.handle.recvStart(); 
  state.receiving = true; 
  state.bindState = BIND_STATE_BOUND; 
 
  if (state.recvBufferSize) 
    bufferSize(socket, state.recvBufferSize, RECV_BUFFER); 
 
  if (state.sendBufferSize) 
    bufferSize(socket, state.sendBufferSize, SEND_BUFFER); 
 
  socket.emit('listening');}

重點是recvStart函數,咱們到c++的實現。

void UDPWrap::RecvStart(const FunctionCallbackInfo<Value>& args) { 
  UDPWrap* wrap; 
  ASSIGN_OR_RETURN_UNWRAP(&wrap, 
                          args.Holder(), 
                          args.GetReturnValue().Set(UV_EBADF)); 
  int err = uv_udp_recv_start(&wrap->handle_, OnAlloc, OnRecv); 
  // UV_EALREADY means that the socket is already bound but that's okay 
  if (err == UV_EALREADY) 
    err = 0; 
  args.GetReturnValue().Set(err);}

OnAlloc, OnRecv分別是分配內存接收數據的函數和數據到來時執行的回調。繼續看libuv

int uv__udp_recv_start(uv_udp_t* handle, 
                       uv_alloc_cb alloc_cb, 
                       uv_udp_recv_cb recv_cb) { 
  int err; 
 
 
  err = uv__udp_maybe_deferred_bind(handle, AF_INET, 0); 
  if (err) 
    return err; 
  // 保存一些上下文 
  handle->alloc_cb = alloc_cb; 
  handle->recv_cb = recv_cb; 
  // 註冊io觀察者到loop,若是事件到來,等到poll io階段處理 
  uv__io_start(handle->loop, &handle->io_watcher, POLLIN); 
  uv__handle_start(handle); 
 
  return 0;}

uv__udp_recv_start主要是註冊io觀察者到loop,等待事件到來的時候,在poll io階段處理。前面咱們講過,回調函數是uv__udp_io。咱們看一下事件觸發的時候,該函數怎麼處理的。

static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) { 
  uv_udp_t* handle; 
 
  handle = container_of(w, uv_udp_t, io_watcher); 
  // 可讀事件觸發 
  if (revents & POLLIN) 
    uv__udp_recvmsg(handle); 
  // 可寫事件觸發 
  if (revents & POLLOUT) { 
    uv__udp_sendmsg(handle); 
    uv__udp_run_completed(handle); 
  }}

咱們這裏先分析可讀事件的邏輯。咱們看uv__udp_recvmsg。

static void uv__udp_recvmsg(uv_udp_t* handle) { 
  struct sockaddr_storage peer; 
  struct msghdr h; 
  ssize_t nread; 
  uv_buf_t buf; 
  int flags; 
  int count; 
 
  count = 32; 
 
  do { 
    // 分配內存接收數據,c++層設置的 
    buf = uv_buf_init(NULL, 0); 
    handle->alloc_cb((uv_handle_t*) handle, 64 * 1024, &buf); 
    memset(&h, 0, sizeof(h)); 
    memset(&peer, 0, sizeof(peer)); 
    h.msg_name = &peer; 
    h.msg_namelen = sizeof(peer); 
    h.msg_iov = (void*) &buf; 
    h.msg_iovlen = 1; 
    // 調操做系統的函數讀取數據 
    do { 
      nread = recvmsg(handle->io_watcher.fd, &h, 0); 
    } 
    while (nread == -1 && errno == EINTR); 
    // 調用c++層回調 
    handle->recv_cb(handle, nread, &buf, (const struct sockaddr*) &peer, flags); 
  }}

libuv會回調c++層,而後c++層回調到js層,最後觸發message事件,這就是對應開始那段代碼的message事件。

【編輯推薦】

  1. 搜狗開源其輕量級高性能 C++ 服務器引擎,引入任務流概念

  2. 41項測試一次過!寧暢八款服務器獲Windows Server權威認證

  3. 微軟爲何非要把數據中心設在海底?瞭解服務器背後的科學

  4. Linux黑話解釋:什麼是顯示服務器,用來作什麼?

  5. IDC:第二季度全球服務器市場同比增加19.8% 亞太地區表現搶眼

【責任編輯:武曉燕 TEL:(010)68476606】

相關文章
相關標籤/搜索