libuv 簡單使用

libuv 簡單使用

來源:https://zhuanlan.zhihu.com/p/50497450前端

前序:說說爲啥要研究libuv,其實在好久以前(大概2年前吧)玩nodejs的時候就對這個核心庫很是感興趣,不過因爲當年水平確實比較菜,大概看了看以後實在沒能靜下心來看下去。18年初的時候,360直播雲官網作了React同構,那個時候我問本身若是真有百萬併發,天天億級的訪問量有沒有信心保證中間node層一次不掛(或者不出任何事故),其實我到今天仍然是沒有足夠底氣的。緣由有兩個吧:一是對nodejs和它底層的內容還遠遠不夠了解,二是對監控層面作的不夠好。咱們大概也都知道alinode,他們早在3 4年前就在nodejs上作了不少工做,好比v8內存監控等,可是比較遺憾的是alinode至今沒有開源。因而乎有了個人第一篇關於libuv的文章,後面爭取還會更新nodejs、v8等相關的內容。 本文從下面幾個方面來介紹libuv,經過fs、net兩方面介紹libuv的思想。java

如何安裝、使用libuv這個框架

首先咱們能夠在libuv上找到libuv這個框架,在README.md裏,咱們就能夠在Build Instructions找到安裝方法,做者電腦操做系統是macox(因此後面的實例也是以linux、unix爲主,不會講windows)。咱們首先把項目clone到咱們的電腦上,在項目根目錄執行一下的命令,在執行過程當中可能會出現各類底層庫沒有安裝的狀況,按照提示自行安裝就能夠了,做者在執行 xcodebuild 的時候發現不能加上 -target All 的參數,不加的話能夠順利build過去。node

$ ./gyp_uv.py -f xcode
$ xcodebuild -ARCHS="x86_64" -project uv.xcodeproj \
     -configuration Release -target All

build完成後 咱們能夠在項目目錄裏找到 build/Release/libuv.a 文件,這個就是編譯後的文件了,咱們稍後會用到。 準備工做作好以後咱們就能夠建立一個C或者C++的工程了,在Mac上我通常使用xcode來編寫oc、c、c++的項目。 首先建立一個C項目,這個時候咱們須要把咱們以前編譯的libuv.a的文件加入到項目的依賴中,咱們在Build Phases中的 Link Binary with Libraries中添加libuv.a的路徑,同時咱們須要在項目根目錄引入uv.h等文件頭。準備工做作好以後,咱們就開始學習怎麼寫標準的hello world了 哈哈哈哈。linux

#include <stdio.h>
#include <stdlib.h>
#include <uv.h>

int main() {
    uv_loop_t *loop = malloc(sizeof(uv_loop_t));
    uv_loop_init(loop);

    printf("Now quitting.\n");
    uv_run(loop, UV_RUN_DEFAULT);

    uv_loop_close(loop);
    free(loop);
    return 0;
}

上述代碼僅僅初始化了一個loop循環,並無執行任何內容,而後就close且退出了。雖然上述代碼並無利用libuv的async功能,可是給咱們展現了 uv_loop_init uv_run 兩個核心函數。咱們稍後會介紹他們作了什麼。c++

先從一個數據結構開始

在開始介紹整個整個libuv以前,我不得不首先介紹一個數據結構,由於這個數據結構在libuv裏無處不在,這個數據結構就是--循環雙向鏈表。 咱們在項目根目錄下的src目錄能夠找到queue.h的頭文件。不錯,這個數據結構就是用宏實現的,那我讓咱們一塊兒來學習一下什麼是鏈表。git

鏈表的定義:

鏈表是一種物理存儲單元上非連續、非順序的存儲結構github

那什麼是雙向鏈表呢?

雙向鏈表其實就是頭尾相連macos

那什麼是雙向循環鏈表呢?

看圖咱們就明白了,所謂的循環鏈表就是把頭尾相連。json

來看一下 queue.h 是怎麼實現的

#define QUEUE_NEXT(q)       (*(QUEUE **) &((*(q))[0]))
#define QUEUE_PREV(q)       (*(QUEUE **) &((*(q))[1]))
#define QUEUE_PREV_NEXT(q)  (QUEUE_NEXT(QUEUE_PREV(q)))
#define QUEUE_NEXT_PREV(q)  (QUEUE_PREV(QUEUE_NEXT(q)))

/* Public macros. */
#define QUEUE_DATA(ptr, type, field)                                          \
  ((type *) ((char *) (ptr) - offsetof(type, field)))

#define QUEUE_INIT(q)                                                         \
  do {                                                                        \
    QUEUE_NEXT(q) = (q);                                                      \
    QUEUE_PREV(q) = (q);                                                      \
  }                                                                           \
  while (0)

上述代碼我只截取了部分的實現 其實這裏我只想講兩個點 1:QUEUE_NEXT 的實現windows

(*(QUEUE **) &((*(q))[0]))

在這個宏裏,他爲何用這個複雜的方式來實現呢? 其實他有兩個目的:強制類型轉換、成爲左值

*(q))[0]

這個步驟是取到數組的第一個元素

(QUEUE **)

這個步驟進行強制類型轉換

(*(nnn) &(xxx))

這個步驟目的就是爲了使xxx成爲左值

2:QUEUE_DATA 獲取鏈表的值 巧妙的使用了地址的偏移量來完成

來看一個使用queue.h的demo吧

#include "queue.h"
#include <stdio.h>

static QUEUE* q;
static QUEUE queue;

struct user_s {
    int age;
    char* name;
    QUEUE node;
};

int main() {
    struct user_s* user;
    struct user_s john;
    struct user_s henry;

    john.name = "john";
    john.age = 44;
    henry.name = "henry";
    henry.age = 32;

    QUEUE_INIT(&queue);
    QUEUE_INIT(&john.node);
    QUEUE_INIT(&henry.node);
    QUEUE_INIT(&willy.node);
    QUEUE_INIT(&sgy.node);

    ((*(&queue))[0]) = john.node;
    (*(QUEUE **) &((*(&queue))[0])) = &john.node;

    QUEUE_INSERT_TAIL(&queue, &john.node);
    QUEUE_INSERT_TAIL(&queue, &henry.node);

    q = QUEUE_HEAD(&queue);

    user = QUEUE_DATA(q, struct user_s, node);

    printf("Received first inserted user: %s who is %d.\n",
           user->name, user->age);

    QUEUE_REMOVE(q);

    QUEUE_FOREACH(q, &queue) {
        user = QUEUE_DATA(q, struct user_s, node);

        printf("Received rest inserted users: %s who is %d.\n",
               user->name, user->age);
    }

    return 0;
}

從上面代碼能夠總結出5個方法 QUEUE_INIT 隊列初始化 QUEUE_INSERT_TAIL 插入到隊尾 QUEUE_HEAD 頭部第一個元素 QUEUE_DATA 得到元素的內容 QUEUE_REMOVE 從隊列中移除元素

那雙向循環鏈表就先簡單介紹到這。

libuv的核心

libuv爲何能夠這麼高效呢?實際他使用了操做系統提供的高併發異步模型

linux: epoll

freebsd: kqueue

windows: iocp

每一個咱們常見的操做系統都爲咱們封裝了相似的高併發異步模型,那libuv其實就是對各個操做系統進行封裝,最後暴露出統一的api供開發者調用,開發者不須要關係底層是什麼操做系統,什麼API了。 咱們來看一下同步模型和異步模型的區別

阻塞模型

 

咱們在一個線程中調用網絡請求,以後線程就會被阻塞,直到返回結果才能繼續執行線程

異步模型

 

在異步模型中 咱們調用網絡請求後不在去直接調用accept阻塞線程,而是輪詢fd是否發生變化,在返回內容後咱們在調用cb執行咱們的代碼,這個過程是非阻塞的。 說了這麼多咱們經過2個例子瞭解一下其中的原理。

學習如何創建一個socket

咱們首先了解一下 C是如何建立socket的,以後咱們在看一下若是經過高併發異步模型來建立socket,最後咱們在瞭解一下 libuv下怎麼建立socket。

C如何建立一個socket呢?

#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <fcntl.h>
#include <sys/shm.h>

#define MYPORT  8887
#define QUEUE   20
#define BUFFER_SIZE 1024

int main()
{
    //定義sockfd AF_INET(IPv4) AF_INET6(IPv6) AF_LOCAL(UNIX協議) AF_ROUTE(路由套接字) AF_KEY(祕鑰套接字)
    // SOCK_STREAM(字節流套接字) SOCK_DGRAM
    int server_sockfd = socket(AF_INET, SOCK_STREAM, 0);

    ///定義sockaddr_in
    struct sockaddr_in server_sockaddr;
    server_sockaddr.sin_family = AF_INET;
    server_sockaddr.sin_port = htons(MYPORT);
    server_sockaddr.sin_addr.s_addr = htonl(INADDR_ANY);

    ///bind,成功返回0,出錯返回-1
    if(bind(server_sockfd,(struct sockaddr *)&server_sockaddr,sizeof(server_sockaddr))==-1)
    {
        perror("bind");
        exit(1);
    }

    printf("監聽%d端口\n", MYPORT);
    ///listen,成功返回0,出錯返回-1
    if(listen(server_sockfd, QUEUE) == -1)
    {
        perror("listen");
        exit(1);
    }

    ///客戶端套接字
    char buffer[BUFFER_SIZE];
    struct sockaddr_in client_addr;
    socklen_t length = sizeof(client_addr);

    printf("等待客戶端鏈接\n");
    ///成功返回非負描述字,出錯返回-1
    int conn = accept(server_sockfd, (struct sockaddr*)&client_addr, &length);
    if(conn<0)
    {
        perror("connect");
        exit(1);
    }
    printf("客戶端成功鏈接\n");

    while(1)
    {
        memset(buffer,0,sizeof(buffer));
        long len = recv(conn, buffer, sizeof(buffer), 0);
        //客戶端發送exit或者異常結束時,退出
        ;
        if(strcmp(buffer,"exit\n")==0 || len<=0) {
            printf("出現異常");
            break;
        }

        printf("來自客戶端數據:\n");
        fwrite(buffer, len, 1, stdout);
        send(conn, buffer, len, 0);
        printf("發送給客戶端數據:\n");
        fwrite(buffer, len, 1, stdout);
    }
    close(conn);
    close(server_sockfd);
    return 0;
}

代碼一大坨,其實上咱們簡單拆分一下

第一步:建立socket 文件描述符
第二步:定義socket addr
第三步:綁定文件描述符和地址  bind
第四步:監聽文件描述符 listen
第五步:等待socket返回內容 accept
第六步:接收信息 recv

那咱們如何使用kqueue來建立socket呢?

因爲做者電腦是macos,因此只能使用kqueue,不能使用epoll。

#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <fcntl.h>
#include <sys/shm.h>

#define MYPORT  8887
#define QUEUE   20
#define BUFFER_SIZE 1024

int main()
{
    // 定義sockfd AF_INET(IPv4) AF_INET6(IPv6) AF_LOCAL(UNIX協議) AF_ROUTE(路由套接字) AF_KEY(祕鑰套接字)
    // SOCK_STREAM(字節流套接字) SOCK_DGRAM
    int server_sockfd = socket(AF_INET, SOCK_STREAM, 0);

    // 定義sockaddr_in
    struct sockaddr_in server_sockaddr;
    server_sockaddr.sin_family = AF_INET;
    server_sockaddr.sin_port = htons(MYPORT);
    server_sockaddr.sin_addr.s_addr = htonl(INADDR_ANY);

    // bind,成功返回0,出錯返回-1
    if(bind(server_sockfd,(struct sockaddr *)&server_sockaddr,sizeof(server_sockaddr))==-1)
    {
        perror("bind");
        exit(1);
    }

    printf("監聽%d端口\n", MYPORT);
    // listen,成功返回0,出錯返回-1
    if(listen(server_sockfd, QUEUE) == -1)
    {
        perror("listen");
        exit(1);
    }

    //建立一個消息隊列並返回kqueue描述符
    int kq =  kqueue();
    struct kevent change_list;  //想要監控的事件
    struct kevent event_list[10000];  //用於kevent返回
    char buffer[1024];
    int nevents;
    // 監聽sock的讀事件
    EV_SET(&change_list, server_sockfd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
    while(1) {
        printf("new loop...\n");
        // 等待監聽事件的發生
        nevents = kevent(kq, &change_list, 1, event_list, 2, NULL);
        if (nevents < 0) {
            printf("kevent error.\n");  // 監聽出錯
        } else if (nevents > 0) {
            printf("get events number: %d\n", nevents);
            for (int i = 0; i < nevents; ++i) {
                printf("loop index: %d\n", i);
                struct kevent event = event_list[i]; //監聽事件的event數據結構
                int clientfd = (int) event.ident;  // 監聽描述符
                // 表示該監聽描述符出錯
                if (event.flags & EV_ERROR) {
                    close(clientfd);
                    printf("EV_ERROR: %s\n", strerror(event_list[i].data));
                }
                // 表示sock有新的鏈接
                if (clientfd == server_sockfd) {
                    printf("new connection\n");
                    struct sockaddr_in client_addr;
                    socklen_t client_addr_len = sizeof(client_addr);
                    int new_fd = accept(server_sockfd, (struct sockaddr *) &client_addr, &client_addr_len);
                    long len = recv(new_fd, buffer, sizeof(buffer), 0);
                    char remote[INET_ADDRSTRLEN];
                    printf("connected with ip: %s, port: %d\n",
                           inet_ntop(AF_INET, &client_addr.sin_addr, remote, INET_ADDRSTRLEN),
                           ntohs(client_addr.sin_port));
                    send(new_fd, buffer, len, 0);
                }
            }
        }
    }
    return 0;
}

咱們能夠看到,listen以前都是同樣的,不在贅述,簡化一下後面的步驟

第一步:建立 kqueue描述符
第二部:監聽socket讀事件 EV_SET
第三步:綁定kq 和 change_list kevent

一直while循環直到kevent返回能夠的文件描述符數量 那到這裏其實咱們就徹底弄懂了 如何直接用C寫出高併發異步是怎麼運行的。那麼咱們就看看使用libuv的例子吧

使用libuv的scoket

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <uv.h>

#define DEFAULT_PORT 7000
#define DEFAULT_BACKLOG 128

uv_loop_t *loop;
struct sockaddr_in addr;

typedef struct {
    uv_write_t req;
    uv_buf_t buf;
} write_req_t;

void free_write_req(uv_write_t *req) {
    write_req_t *wr = (write_req_t*) req;
    free(wr->buf.base);
    free(wr);
}

void alloc_buffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
    buf->base = (char*) malloc(suggested_size);
    buf->len = suggested_size;
}

void on_close(uv_handle_t* handle) {
    free(handle);
}

void echo_write(uv_write_t *req, int status) {
    if (status) {
        fprintf(stderr, "Write error %s\n", uv_strerror(status));
    }
    free_write_req(req);
}

void echo_read(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
    if (nread > 0) {
        write_req_t *req = (write_req_t*) malloc(sizeof(write_req_t));
        req->buf = uv_buf_init(buf->base, nread);
        fwrite(buf->base, 30, 1, stdout);
        uv_write((uv_write_t*) req, client, &req->buf, 1, echo_write);
        return;
    }
    if (nread < 0) {
        if (nread != UV_EOF)
            fprintf(stderr, "Read error %s\n", uv_err_name(nread));
        uv_close((uv_handle_t*) client, on_close);
    }

    free(buf->base);
}

void on_new_connection(uv_stream_t *server, int status) {
    if (status < 0) {
        fprintf(stderr, "New connection error %s\n", uv_strerror(status));
        // error!
        return;
    }

    uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
    uv_tcp_init(loop, client);
    if (uv_accept(server, (uv_stream_t*) client) == 0) {
        uv_read_start((uv_stream_t*) client, alloc_buffer, echo_read);
    }
    else {
        uv_close((uv_handle_t*) client, on_close);
    }
}

int main() {
    loop = uv_default_loop();

    uv_tcp_t server;
    uv_tcp_init(loop, &server);

    uv_ip4_addr("0.0.0.0", DEFAULT_PORT, &addr);

    uv_tcp_bind(&server, (const struct sockaddr*)&addr, 0);
    int r = uv_listen((uv_stream_t*) &server, DEFAULT_BACKLOG, on_new_connection);
    if (r) {
        fprintf(stderr, "Listen error %s\n", uv_strerror(r));
        return 1;
    }
    return uv_run(loop, UV_RUN_DEFAULT);
}

實際上總體咱們均可以把libuv和咱們原生的c kqueue進行一一對應,發現相差很少,惟一不一樣是咱們須要定義 uv_loop 這個內部循環,後面咱們在來說套循環機制。

學習如何進行文件讀寫

咱們學習完了網絡,那麼咱們再來看看文件i/o是怎麼處理的。

 

剛剛咱們玩轉了socket來看這張圖是否是很熟悉?可是發現右側有了很大的不一樣。文件操做、DNS、用戶代碼不是基於epoll這種模型嗎? 顯而易見咱們有了答案,這是爲何呢?其實很簡單文件的不少操做就是同步的,可是libuv爲了統一異步,利用開闢線程進行文件等操做模擬了異步的過程!!原來咱們用了這麼久才發現他是個騙子。哈哈!實際上是咱們學藝不精。 那其實講到這裏文件讀寫其實講的差很少了,咱們仍是來看看例子吧!

#include <stdio.h>
#include <uv.h>

uv_fs_t open_req;
uv_fs_t _read;

static char buffer[1024];
static uv_buf_t iov;

void on_read(uv_fs_t *req) {
    printf("%s\n",iov.base);
}
void on_open(uv_fs_t *req) {
    printf("%zd\n",req->result);
    iov = uv_buf_init(buffer, sizeof(buffer));
    uv_fs_read(uv_default_loop(), &_read, (int)req->result,
               &iov, 1, -1, on_read);
}
int main() {
    const char* path = "/Users/sgy/koa/package.json";
    // O_RDONLY 、 O_WRONLY 、 O_RDWR 、 O_CREAT
    uv_fs_open(uv_default_loop(), &open_req, path, O_RDONLY, 0, on_open);
    uv_run(uv_default_loop(), UV_RUN_DEFAULT);
    uv_fs_req_cleanup(&open_req);
    return 0;
}

其實libuv底層對文件open和read的操做是分開的。 看到這裏文件api沒啥講的了,咱們來簡單講講線程池。

線程池

線程池就是對線程的統一管理,預先建立出線程,若是有任務就把任務放到線程池裏去執行。

 

經過上圖咱們能夠看到有任務進來首先會插入到鏈表中進行排隊等待, 直到線程空餘就會去鏈表中去取。 經過閱讀 src/threadpool.c文件咱們能夠了解 MAX_THREADPOOL_SIZE 128 最大線程爲128個 default_threads[4] 默認只會開闢4個線程 若是你對底層不瞭解 那當你在進行大量的文件i/o時 線程池數量就是阻礙你的最大障礙。 爲啥最大隻能建立128個線程呢?由於大多數操做系統建立一個線程大概花費1M的內存空間,外加用戶自己代碼也要佔用大量的內存,因此這裏設置了最大128的限制。

瞭解libuv的循環機制

咱們經過網絡和文件瞭解了libuv,那麼咱們來看看libuv的循環機制

uv_loop_t *loop;
 loop = uv_default_loop()
 uv_run(loop, UV_RUN_DEFAULT);

首先咱們會建立 loop 而後一系列的騷操做以後 最後咱們執行了uv_run 嗯嗯 那uv_run 確定是突破口了 在src/unix/core.c 文件裏 咱們找到了 uv_run的定義

int uv_run(uv_loop_t* loop, uv_run_mode mode) {
  int timeout;
  int r;
  int ran_pending;

  r = uv__loop_alive(loop);
  if (!r)
    uv__update_time(loop);

  while (r != 0 && loop->stop_flag == 0) {
    uv__update_time(loop);
    uv__run_timers(loop);
    ran_pending = uv__run_pending(loop);
    uv__run_idle(loop);
    uv__run_prepare(loop);

    timeout = 0;
    if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT)
      timeout = uv_backend_timeout(loop);

    uv__io_poll(loop, timeout);
    uv__run_check(loop);
    uv__run_closing_handles(loop);

    if (mode == UV_RUN_ONCE) {
      /* UV_RUN_ONCE implies forward progress: at least one callback must have
       * been invoked when it returns. uv__io_poll() can return without doing
       * I/O (meaning: no callbacks) when its timeout expires - which means we
       * have pending timers that satisfy the forward progress constraint.
       *
       * UV_RUN_NOWAIT makes no guarantees about progress so it's omitted from
       * the check.
       */
      uv__update_time(loop);
      uv__run_timers(loop);
    }

    r = uv__loop_alive(loop);
    if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT)
      break;
  }

從代碼中 咱們就能夠總結出libuv的運行週期 經過while循環不斷的查詢 loop中是否有中止符 若是有則退出 不然就不停的進行循環。

 

上面的圖已經清楚的描述咱們uv_run的流程了 那其中的核心 就在*uvio_poll* 中 例如在 src/unix/linux-core.c 中的uvio_poll函數 咱們就能夠找到 咱們 epoll 熟悉的身影了。實現邏輯也和咱們以前使用過的差很少。

總結

洋洋灑灑寫了這麼多,最後總結一下也提出本身的思考。 其實libuv底層的 actor模型是很是高效的,不少遊戲服務器內核也使用actor模型,那相對於火的不行的go(協程模型) nodejs一直沒有在服務端發揮它的高效呢? 我以爲其實緣由很簡單,由於nodejs他並不高效,我以爲nodejs可以快速的被開發出來而且js運行如此高效 v8功不可沒。可是成也v8敗也v8,JIT優化的在好 依然和編譯型語言相差甚遠。 可是一點的性能是阻礙大數據等框架使用go而不是用nodejs的緣由嗎?我以爲其實並非,最大的緣由我以爲是生態!很是多的Apache開源框架使用java編寫,不少大數據使用go來承載,nodejs有什麼頂級生態嗎?我以爲並無,他大多數面向的是前端這個羣體致使他的生態的發展。 謝謝你們能看到這裏,上述的心得都是近期整理的,若是有不對的地方歡迎你們多多批評。上述內容若是轉載請附帶原文連接,感謝。

 

================= End

相關文章
相關標籤/搜索