Libevent教程001: 簡介與配置

本文內容大體翻譯自 libevent-book, 但不是照本翻譯. 成文時, libevent最新的穩定版爲 2.1.8 stable. 即本文如無特殊說明, 全部描述均以 2.1.8 stable 版本爲準.git

本文爲系列文章的第一篇, 對應libevent-book的 chapter 0 + chapter 1 + R0 + R1程序員

0. 前提條件

這個文檔是對libevent的介紹與指導, 閱讀文檔須要你具備如下的能力:github

  1. 你精通C語言
  2. 你至少了解Unix網絡編程.
  3. 你會安裝libevent
  4. 你大體知道libevent是幹什麼用的.

1. 基本概念: 阻塞/非阻塞/同步/異步/回調機制的討論

這裏首先要解釋四個名詞: 阻塞, 非阻塞, 同步, 異步. 它們都是修飾"接口"的形容詞, 或者說的土一點, 它們都是修飾"函數"的形容詞.redis

同步, 仍是異步, 是從"消息通訊"的視角去描述這個接口的行爲. 而所謂的消息通訊, 你能夠簡單的把"函數"想象成一個淘寶客服, 把"調用方"想象成你本身. 調用函數的過程其實就是三步:編程

  1. "你"詢問"淘寶客服"一個問題. 好比, "在嗎?". 在這個場景中, 你就是"調用方", "淘寶客服"是函數, 而那句"在嗎?", 則是函數參數, 你把函數參數傳遞給函數.
  2. "淘寶客服"進行後臺處理. 這時淘寶客服接收到了你的詢問消息, 若是他沒有在忙, 那麼他能夠當即回覆你. 若是他如今正在忙, 好比正在吃飯, 好比正在和老婆吵架, 好比淘寶客服須要先看一下你以前的行爲記錄, 而後再決定如何回覆你.(好比他看到你正在瀏覽一雙襪子,以爲你在潛在的買家, 他決定回覆你. 好比他看到你三天前下單買了一雙襪子, 但襪子尚未發貨, 他以爲你有退貨的風險, 從而決定不理你, 僞裝不在.) 這個客服思考決斷的過程, 就是函數內部進行處理運算的過程. 固然這個例子很簡單, 有些牽強.
  3. 最終, 淘寶客服回覆了你, "在的, 親". 這裏, 回覆這個動做, 就是函數返回, 而"在的, 親"這句話, 就是函數的返回值.

你從這個角度去看, 函數調用, 就是消息通訊的過程, 你發送消息給函數, 函數通過一番運算思考, 把結果再回發給你.小程序

所謂的同步, 異步, 指的是:windows

  1. 這個淘寶客服很老實, 對於每一個顧客發來的問題, 他都須要通過一番思考, 再進行答覆. 這個函數很老實, 對於每一個函數調用, 都很老實的根據傳入參數進行計算, 再返回結果. 也是是說, 在淘寶客服思考結束以前, 這個客服不會向你發送答覆, 你也收不到答覆. 也就是說, 在函數運算結束以前, 函數不會返回, 你也得不到返回值. 那麼, 這個客服是同步的, 這個函數調用的過程是同步調用, 這個函數是同步的.
  2. 假如這個淘寶客服很不老實, 他裝了一個自動答覆小程序. 對於每一個詢問的顧客, 都先自動回覆一句"親, 如今很忙喲, 客服MM可能過一會才能給你答覆". 也就是說, 顧客在發出詢問以後, 當即就能獲得一個答覆. 也就是說, 調用方在調用一個函數以後, 這個函數就當即返回了. 而真正的結果, 可能在過五分鐘以後纔會給你. 便是五分鐘以後客服對你說"在的呢, 親". 這樣的函數, 就叫異步函數.

異步客服須要解決一個問題: 當真正的運算結果得出以後, 被調用的客服如何通知做爲調用方的你, 取走答案. 在淘寶客戶端上, 是經過手機的震動消息提醒, 是經過聊天框的紅點.api

因此, 關於同步, 和異步, 這裏作一個稍微正式一點的總結:數組

  1. 同步的過程: 調用方傳參->函數運算->函數返回運算結果.
  2. 異步的過程: 調用方傳參->函數說我知道了, 而後過了五分鐘, 函數說我算出來了, 結果在這裏, 你來取.

這裏咱們着眼於消息的傳遞, 通信方式, 也就是站在函數的角度去看, 結果是如何傳遞給調用方的. 同步接口, 運算結果在"函數調用"這個場景下就返回給了調用方. 異步接口: 運算結果在"函數調用"這個場景以後的某個不定的時刻, 經過某種通知方式, 傳遞給調用方.安全

整個過程當中咱們忽略了一件事: 就是, 在函數執行運算的過程當中, 調用方在幹什麼. 也是是, 在淘寶客服心裏思考如何回覆你的時候, 你在幹什麼.

這就引出了阻塞與非阻塞:

  1. 阻塞: 在函數執行運算的過程當中, 當前線程什麼也作不了. 在等待客服回覆的過程當中, 你什麼也不作, 就在那乾等着, 直到他回覆了你.
  2. 非阻塞: 在函數執行去處的過程當中, 當前線程能夠去作其它事情. 在等待客服回覆的過程當中, 你上了個廁所, 還順便洗了個澡.

換句話說:

  1. 同步與異步, 描述的是 被調用的函數, 如何將結果返回給調用者
  2. 阻塞與非阻塞, 描述的是 調用方, 在獲得結果以前能不能脫身

這是兩個維度上的邏輯概念, 這兩個維度互相有必定的干涉, 並非徹底正交的兩個維度, 這樣, 既然是兩個維度, 那麼就有四種組合.

  1. 同步, 且阻塞: 調用方發起調用直至獲得結果以前, 都不能幹其它事情. 被調函數接收到參數直到運算結束以前, 都不會返回.
  2. 同步, 非阻塞: 調用方發起調用直至獲得結果以前這段時間, 能夠作其它事情. 但被調函數接收到參數直到運算結束以前, 都不會返回. 很顯然這個邏輯概念說得通, 但實際上是反常理的. 由於: 若是調用方在發起調用以後, 獲得結果(函數返回)以前, 要去作其它事情, 那麼就有一個隱含的前提條件: 調用方必須知道本次調用的耗時, 且被調方(函數)嚴格遵照這個時間約定. 一毫秒很少, 一毫秒很多. 這在代碼的世界裏是很難達到的.
  3. 異步, 且阻塞: 調用方發起調用直至獲得結果以前, 都不能幹其它事情. 被調用函數接收到參數以後當即返回, 但在隨後的某個時間點才把運算結果傳遞給調用方. 以後調用方繼續活動. 這個邏輯概念依然說得通, 可是很彆扭. 這就至關於, 在你問淘寶客服問題的時候, 淘寶客服的自動回覆機器人已經給你說了"客服很忙喲, 可能過一會才能答覆你", 但你就是啥也不幹, 非得等到客服答覆你以後, 纔去上廁所. 這種情景在代碼世界裏可能發生, 但彷佛很智障.
  4. 異步, 非阻塞: 調用方發起調用直至獲得結果以前這段時間, 能夠作其它事情. 被調函數接收到參數後當即返回, 但在以後的某一個時間點才把運算結果傳遞給調用方. 這提及來很繞口, 舉個栗子, 仍是客服:

    1. 你拿出手機, 向客服發送消息, "在嗎?". 而後把手機放桌子上, 轉向上廁所去了.
    2. 客服收到你的消息, 機器人回覆你"很差意思, 客服如今很忙, 但咱們會盡快答覆你的, 親!".
    3. 你上廁所回來了, 看手機沒消息, 又去吃飯了.
    4. 客服開始處理你的消息, 終於開始給你真正的回覆"親, 2333號客服爲您服務, 你有什麼要了解的嗎?".
    5. 你吃飯的過程當中, 手機震動, 你點開淘寶, 發現有了回覆. 整個流程結束.

能夠看到

  1. 阻塞方式下, 調用方老是能第一時間拿到調用結果. 由於在阻塞期間, 調用方啥也不幹, 就等着函數返回結果. 非阻塞方式下, 調用方通常都是在函數返回告終果以後纔去查看運算結果.
  2. 異步方式下, 被調用方能夠推遲處理任務. 客服收到你的消息後能夠先把飯吃完, 函數收到你的調用後並不必定當即就開始運算.
  3. 同步且阻塞, 雙方都是槓精, 都是老實人. 理解起來比較天然.
  4. 異步非阻塞, 調用方不在意何時能獲得運算結果. 被調用方不在意調用方着急不着急, 雙方都是佛系青年. 理解起來也比較天然.

還有一個點要給你們介紹到, 就是回調函數. 在上面講過, 異步調用, 須要函數以某種機制, 在運算結果得出以後, 將運算結果傳遞給調用方. 但回調函數又繞了一個彎.

假設沒有回調函數機制, 異步流程就是:

  1. 顧客詢問客服, "大家家有沒有紅色36D的胸罩啊? 我想給我老婆買一件, 我老婆的胸是36D的". 而後去上廁所去了
  2. 自動機器人向顧客回覆"很忙喲, 請耐心等待"
  3. 客服開始處理顧客的詢問. 去庫房查貨.
  4. 庫房有貨, 客服要想辦法將這個信息送到顧客手中. 他經過淘寶客戶端發表了答覆, 淘寶客戶端致使手機震動, 這個震動信號通知了顧客.
  5. 顧客在廁所正拉屎, 看到手機上的消息提醒, 思考了一分鐘, 顧客下單購買了這個胸罩.

這個流程裏顧客作了兩件事:

  1. 詢問客服"有沒有36D的紅色胸罩". 這是調用函數的行爲
  2. 在獲得確定的答覆以後, 下單購買了這個胸罩. 這是獲得函數返回的運算結果, 並根據運算結果進一步執行程序流程.(調用了另一個函數: 購買)

而淘寶客服只作了一件事:

  1. 查詢庫房裏是否有貨

而有了回調機制後, 異步流程就是這樣的:

  1. 顧客詢問客服, "大家家有沒有紅色36D的胸罩?". 而後顧客把手機交給祕書, 叮囑道:"你盯着這個客服, 若是她說有, 你就下單買了, 地址寫我家, 若是沒有, 你就啥也不作". 而後顧客坐上了出差的飛機
  2. 自動機器人向顧客回覆"很忙喲, 請耐心等待"
  3. 客服開始處理顧客的詢問. 去庫房查貨.
  4. 庫房有貨, 客服要想辦法將這個信息送到顧客手中. 他經過淘寶客戶端發表了答覆, 淘寶客戶端致使手機震動, 這個震動信號通知了祕書.
  5. 祕書根據老闆的指示, 下單購買了這個胸罩.

這個流程裏, 顧客作了兩件事:

  1. 詢問客服"有沒有36D的胸罩". 這是調用函數行爲.
  2. 向祕書叮囑. 這是向消息監控方註冊回調函數的行爲. 消息監控方負責接收函數的返回結果. 回調函數則是: "若是有, 就買給老闆夫人, 若是沒有, 就什麼也不作"

淘寶客服只作了一件事:

  1. 查詢庫房裏是否有貨

而消息監控方, 也就是祕書, 作了一件事:

  1. 根據客服的答覆選擇不一樣的行爲. 即在函數調用結果得出以後, 調用回調函數.

這就是回調函數的一個生動的例子, 回調函數機制中有了一個調用結果監控方, 就是祕書, 這個角色承擔着很是重要的職責: 便是在函數返回結果以後, 調用對應的回調函數. 回調機制通常都實如今異步調用框架之中, 對於寫代碼的人來講是透明的, 它簡化了調用方的職責與智力負擔, 必定程度上抽象了代碼邏輯, 簡化了編程模型(注意: 是必定程度上!). 有了回調機制:

  1. 調用方沒必要再去關心函數返回結果以及返回時機. 沒必要經過輪詢或其它方式去檢查異步函數是否返回告終果.
  2. 調用方在調用時就向調用結果監控方註冊合適的回調, 在調用函數那一刻, 將後續業務邏輯寫在回調函數中, 只負責調用就好了. 代碼越寫越像狀態機.

不過正所謂回調一時爽, 調試火葬廠. 寫過JavaScript的同窗對這一點必定是深有體會. 當程序不能正確運行的時候, 調試很蛋疼. 異步框架自己因爲函數返回時機不肯定, 調試就比較蛋疼, 再加上回調機制, 那真是火葬廠了. 特別是回調嵌套回調, 裏面套個七八層的時候, 那真是把圖靈從墳裏挖出來也沒用的絕望場景.

2. 異步IO與多路複用技術

咱們先來看一段經典的同步且阻塞的HTTP客戶端程序:

#include <netinet/in.h>     // for socketaddr_in
#include <sys/socket.h>     // for socket functions
#include <netdb.h>          // for gethostbyname
#include <sys/errno.h>      // for errno

#include <unistd.h>
#include <string.h>
#include <stdio.h>

int main(int argc, char ** argv)
{
    const char query[] = "GET / HTTP/1.0\r\n"
                         "Host: www.baidu.com\r\n"
                         "\r\n";
    const char hostname[] = "www.baidu.com";

    struct sockaddr_in sin;
    struct hostent * h;
    const char * cp;
    int fd;
    ssize_t n_written, remaining;

    char buf[4096];

    /*
     * Look up the IP address for the hostname.
     * Watch out; this isn't threadsafe on most platforms.
     */
    h = gethostbyname(hostname);

    if(!h)
    {
        fprintf(stderr, "E: gethostbyname(%s) failed. ErrMsg: %s\n", hostname, hstrerror(h_errno));
        return -__LINE__;
    }

    if(h->h_addrtype != AF_INET)
    {
        fprintf(stderr, "E: gethostbyname(%s) returned an non AF_INET address.\n", hostname);
        return -__LINE__;
    }

    /*
     * Allocate a new socket
     */
    fd = socket(AF_INET, SOCK_STREAM, 0);
    if(fd < 0)
    {
        fprintf(stderr, "E: socket failed: %s\n", strerror(errno));
        return -__LINE__;
    }

    /*
     * Connect to the remote host
     */
    sin.sin_family = AF_INET;
    sin.sin_port = htons(80);
    sin.sin_addr = *((struct in_addr *)(h->h_addr));
    if(connect(fd, (struct sockaddr *)(&sin), sizeof(sin)) != 0)
    {
        fprintf(stderr, "E: connect to %s failed: %s\n", hostname, strerror(errno));
        close(fd);
        return -__LINE__;
    }

    /*
     * Write the query
     * XXX Can send succeed partially?
     */
    cp = query;
    remaining = strlen(query);
    while(remaining)
    {
        n_written = send(fd, cp, remaining, 0);
        if(n_written < 0)
        {
            fprintf(stderr, "E: send failed: %s\n", strerror(errno));
            close(fd);
            return -__LINE__;
        }

        remaining -= n_written;
        cp += n_written;
    }

    /*
     * Get an answer back
     */
    while(1)
    {
        ssize_t result = recv(fd, buf, sizeof(buf), 0);
        if(result == 0)
        {
            break;
        }
        else if(result < 0)
        {
            fprintf(stderr, "recv failed: %s\n", strerror(errno));
            close(fd);
            return -__LINE__;
        }

        fwrite(buf, 1, result, stdout);
    }

    close(fd);

    return 0;
}

在上面的示例代碼裏, 大部分有關網絡與IO的函數調用, 都是阻塞式的. 好比gethostbyname, 在DNS解析成功域名以前是不返回的(或者解析失敗了會返回失敗), connect函數, 在與對端主機成功創建TCP鏈接以前是不返回的(或者鏈接失敗), 再好比recvsend函數, 在成功操做, 或明確失敗以前, 也是不返回的.

阻塞式IO確實比較土, 上面的程序編譯運行的時候, 若是你網絡情況很差, 可能會卡一兩秒纔會讀到百度的首頁, 這卡就是由於阻塞IO的緣故. 固然, 雖然比較土, 但像這樣的場合, 使用阻塞IO是沒什麼問題的. 但假如你想寫一個程序同時讀取兩個網站的首頁的話, 就比較麻煩了: 由於你不知道哪一個網站會先響應你的請求.. 你能夠寫一些, 好比像下面這樣的, 很土的代碼:

char buf[4096];
int i, n;
while(i_still_want_to_read())
{
    for(i = 0; i < n_sockets; ++i)
    {
        n = recv(fd[i], buf, sizeof(buf), 0);

        if(n == 0)
        {
            handle_close(fd[i]);
        }
        else if(n < 0)
        {
            handle_error(fd[i], errno);
        }
        else
        {
            handle_input(fd[i], buf, n);
        }
    }
}

若是你的fd[]數組裏有兩個網站的鏈接, fd[0]接着百度, fd[1]接着hao123, 假如hao123正常響應了, 能夠從fd[1]裏讀出數據了, 但百度的服務器被李老闆炸了, 響應不了了, 這時, 上面的代碼就會卡在i==0時循環裏的n = recv(fd[0], buf, sizeof(buf), 0)這條語句中, 直到李老闆把服務器修好. 這就很蛋疼.

固然, 你能夠用多線程解決這個問題, 多數狀況下, 你有一個問題, 你嘗試使用多線程解決, 而後你多個了有問題.

上面是一個冷笑話, 多線程或多進程是一個解決方案, 一般狀況下, 最簡單的套路是使用一個線程或進程去創建TCP鏈接, 而後鏈接創建成功後, 爲每一個鏈接建立獨立的線程或進程來進行IO讀寫. 這樣即便一個網站抽風了, 也只阻塞屬於它本身的那個讀寫線程或進程, 不會影響到其它網站的響應.

下面是另一個例子程序, 這是一個服務端程序, 監聽40173端口上的TCP鏈接請求, 而後把客戶端發送的數據按ROT13法再回寫給客戶端, 一次處理一行數據. 這個程序使用Unix上的fork()函數爲每一個客戶端的鏈接建立一個獨立的處理進程.

#include <netinet/in.h>     // for sockaddr_in
#include <sys/socket.h>     // for socket functions
#include <sys/errno.h>      // for errno

#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>

#define MAX_LINE 16384

char rot13_char(char c)
{
    if(
        (c >= 'a' && c <= 'm') ||
        (c >= 'A' && c <= 'M')
    )
    {
        return c+13;
    }
    else if(
        (c >= 'n' && c <= 'z') ||
        (c >= 'N' && c <= 'Z')
    )
    {
        return c-13;
    }
    else
    {
        return c;
    }
}

void child(int fd)
{
    char outbuf[MAX_LINE + 1];  // extra byte for '\0'
    size_t outbuf_used = 0;
    ssize_t result;

    while(1)
    {
        char ch;
        result = recv(fd, &ch, 1, 0);

        if(result == 0)
        {
            break;
        }
        else if(result == -1)
        {
            perror("read");
            break;
        }

        if(outbuf_used < sizeof(outbuf))
        {
            outbuf[outbuf_used++] = rot13_char(ch);
        }

        if(ch == '\n')
        {
            send(fd, outbuf, outbuf_used, 0);
            outbuf_used = 0;
            continue;
        }
    }
}

void run(void)
{
    int listener;
    struct sockaddr_in sin;

    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = 0;
    sin.sin_port = htons(40713);

    listener = socket(AF_INET, SOCK_STREAM, 0);

    int one = 1;
    setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));

    if(bind(listener, (struct sockaddr *)(&sin), sizeof(sin)) < 0)
    {
        perror("bind");
        return;
    }

    if(listen(listener, 16) < 0)
    {
        perror("listen");
        return;
    }

    while(1)
    {
        struct sockaddr_storage ss;
        socklen_t slen = sizeof(ss);
        int fd = accept(listener, (struct sockaddr *)(&ss), &slen);

        if(fd < 0)
        {
            perror("accept");
        }
        else
        {
            if(fork() == 0)
            {
                child(fd);
                exit(0);
            }
        }
    }
}

int main(int argc, char ** argv)
{
    run();
    return 0;
}

你可使用下面的命令行, 經過netcat工具向本機的40713發送數據, 來試驗一下上面的服務端代碼:

printf "abcdefghijklmnopqrstuvwxyz\n" | nc -4 -w1 localhost 40713

多進程或多線程確實是一個還算是比較優雅的, 應對併發鏈接的解決方案. 這種解決方案的缺陷是: 進程或線程的建立是有開銷的, 在某些平臺上, 這個開銷仍是比較大的. 這裏優化的方案是使用線程, 並使用線程池策略. 若是你的機器須要處理上千上萬的併發鏈接, 這就意味着你須要建立成千上萬個線程, 想象一下, 服務器通常也就十幾個核心, 64個不得了了, 若是有五千併發鏈接, 5000個線程排除輪64個核心的大米, 線程調度確定是個大開銷.

這個時候咱們就須要瞭解一下非阻塞了, 經過下面的Unix調用, 能夠將一個文件描述符設置爲"非阻塞"的. 明確一下: "非阻塞"描述的是IO函數的行爲, 將一個文件描述符設置爲"非阻塞"的, 實際上是指, 在這個文件描述符上執行IO操做, 函數的行爲會變成非阻塞的.

fcntl(fd, F_SETFL, O_NONBLOCK);

當這個文件描述符是socket的文件描述符時, 咱們通常也會直接稱, "把一個socket設置爲非阻塞". 將一個socket設置爲非阻塞以後, 在對應的文件描述符上, 不管是執行網絡編程相關的函數, 仍是執行IO相關的函數, 函數行爲都會變成非阻塞的, 即函數在調用以後就當即返回: 要麼當即返回成功, 要把當即告訴調用者: "暫時不可用, 請稍後再試"

有了非阻塞這種手段, 咱們就能夠改寫咱們的訪問網頁程序了: 咱們這時能夠正確的處理同時下載兩個網站的數據的需求了. 代碼片段以下:

int i, n;
char buf[1024];

for(i = 0; i < n_sockets; ++i)
{
    fcntl(fd[i], F_SETFL, O_NONBLOCK);
}

while(i_still_want_to_read)
{
    for(int i = 0; i < n_sockets; ++i)
    {
        n = recv(fd[i], buf, sizeof(buf), 0);
        if(n == 0)
        {
            handle_close(fd[i]);        // peer was closed
        }
        else if(n < 0)
        {
            if(errno == EAGAIN)
            {
                // do nothing, the kernel didn't have any data for us to read
                // retry
            }
            else
            {
                handle_error(fd[i], errno);
            }
        }
        else
        {
            handle_input(fd[i], buf, n);    // read success
        }
    }
}

這樣寫確實解決了問題, 可是, 在對端網站尚未成功響應的那幾百毫秒裏, 這段代碼將會瘋狂的死循環, 會把你的一個核心佔滿. 這是一個很蛋疼的解決方案, 緣由是: 對於真正的數據什麼時候到達, 咱們沒法肯定, 只能開個死循環輪詢.

舊式的改進方案是使用一個叫select()的系統調用函數. select()函數內部維護了三個集合:

  1. 有數據可供讀取的文件描述符
  2. 能夠進行寫入操做的文件描述符
  3. 出現異常的文件描述符

select()函數在這三個集合有至少一個集合不爲空的時候返回. 若是三個集合都爲空, 那麼select()函數將阻塞.

下面是使用select()改進後的代碼片段:

fd_set readset;
int i, n;
char buf[1024];

while(i_still_want_to_read)
{
    int maxfd = -1;
    FD_ZERO(&readset);

    // add all of the interesting fds to readset
    for(i = 0; i < n_sockets; ++i)
    {
        if(fd[i] > maxfd)
        {
            maxfd = fd[i];
        }

        FD_SET(fd[i], &readset):
    }

    select(maxfd+1, &readset, NULL, NULL, NULL);

    for(int i = 0; i < n_sockets; ++i)
    {
        if(FDD_ISSET(fd[i], &readset))
        {
            n = recv(fd[i], &readset);

            if(n == 0)
            {
                handle_close(fd[i]);
            }
            else if(n < 0)
            {
                if(errno == EAGAIN)
                {
                    // the kernel didn't have any data for us to read
                }
                else
                {
                    handle_error(fd[i], errno);
                }
            }
            else
            {
                handle_input(fd[i], buf, n);
            }
        }
    }
}

使用select()改進了程序, 但select()蛋疼的地方在於: 它只告訴你, 三集合中有數據了, 可是: 哪一個fd可讀, 哪一個fd可寫, 哪一個fd有異常, 這些具體的信息, 它仍是沒告訴你. 若是你的fd數量很少, OK, 上面的代碼沒什麼問題, 但若是你持有着上千個併發鏈接, 那每次select()返回時, 你都須要把全部fd都輪一遍.

下面是使用select()調用對rot13服務端示例代碼的重構

#include <netinet/in.h>     // for sockaddr_in
#include <sys/socket.h>     // for socket functions
#include <sys/errno.h>      // for errno
#include <fcntl.h>          // for fcntl
#include <sys/select.h>     // for select

#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>

#define MAX_LINE 16384

char rot13_char(char c)
{
    if(
        (c >= 'a' && c <= 'm') ||
        (c >= 'A' && c <= 'M')
    )
    {
        return c+13;
    }
    else if(
        (c >= 'n' && c <= 'z') ||
        (c >= 'N' && c <= 'Z')
    )
    {
        return c-13;
    }
    else
    {
        return c;
    }
}

struct fd_state{
    char buffer[MAX_LINE];
    size_t buffer_used;

    int writing;
    size_t n_written;
    size_t write_upto;
};

struct fd_state * alloc_fd_state(void)
{
    struct fd_state * state = malloc(sizeof(struct fd_state));
    if(!state)
    {
        return NULL;
    }
    
    state->buffer_used = state->n_written = state->writing = state->write_upto = 0;
    return state;
}

void free_fd_state(struct fd_state * state)
{
    free(state);
}

void make_nonblocking(int fd)
{
    fcntl(fd, F_SETFL, O_NONBLOCK);
}

int do_read(int fd, struct fd_state * state)
{
    char buf[1024];
    int i;
    ssize_t result;

    while(1)
    {
        result = recv(fd, buf, sizeof(buf), 0);

        if(result <= 0)
        {
            break;
        }

        for(int i = 0; i < result; ++i)
        {
            if(state->buffer_used < sizeof(state->buffer))
            {
                state->buffer[state->buffer_used++] = rot13_char(buf[i]);
            }

            if(buf[i] == '\n')
            {
                state->writing = 1;
                state->write_upto = state->buffer_used;
            }
        }
    }

    if(result == 0)
    {
        return 1;
    }
    else if(result < 0)
    {
        if(errno == EAGAIN)
        {
            return 0;
        }

        return -1;
    }

    return 0;
}

int do_write(int fd, struct fd_state * state)
{
    while(state->n_written < state->write_upto)
    {
        ssize_t result = send(fd, state->buffer + state->n_written, state->write_upto - state->n_written, 0);

        if(result < 0)
        {
            if(errno == EAGAIN)
            {
                return 0;
            }

            return -1;
        }

        assert(result != 0);

        state->n_written += result;
    }

    if(state->n_written == state->buffer_used)
    {
        state->n_written = state->write_upto = state->buffer_used = 0;
    }

    state->writing = 0;

    return 0;
}

void run(void)
{
    int listener;
    struct fd_state * state[FD_SETSIZE];
    struct sockaddr_in sin;
    int i, maxfd;
    fd_set readset, writeset, exset;

    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = 0;
    sin.sin_port = htons(40713);
    
    for(i = 0; i < FD_SETSIZE; ++i)
    {
        state[i] = NULL;
    }

    listener = socket(AF_INET, SOCK_STREAM, 0);
    make_nonblocking(listener);

    int one = 1;
    setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));

    if(bind(listener, (struct sockaddr *)(&sin), sizeof(sin)) < 0)
    {
        perror("bind");
        return;
    }

    if(listen(listener, 16) < 0)
    {
        perror("listen");
        return;
    }

    FD_ZERO(&readset);
    FD_ZERO(&writeset);
    FD_ZERO(&exset);

    while(1)
    {
        maxfd = listener;

        FD_ZERO(&readset);
        FD_ZERO(&writeset);
        FD_ZERO(&exset);

        FD_SET(listener, &readset);

        for(i = 0; i < FD_SETSIZE; ++i)
        {
            if(state[i])
            {
                if(i > maxfd)
                {
                    maxfd = i;
                }

                FD_SET(i, &readset);

                if(state[i]->writing)
                {
                    FD_SET(i, &writeset);
                }
            }
        }

        if(select(maxfd + 1, &readset, &writeset, &exset, NULL) < 0)
        {
            perror("select");
            return;
        }

        if(FD_ISSET(listener, &readset))
        {
            struct sockaddr_storage ss;
            socklen_t slen = sizeof(ss);

            int fd = accept(listener, (struct sockaddr *)(&ss), &slen);

            if(fd < 0)
            {
                perror("accept");
            }
            else if(fd > FD_SETSIZE)
            {
                close(fd);
            }
            else
            {
                make_nonblocking(fd);
                state[fd] = alloc_fd_state();
                assert(state[fd]);
            }
        }

        for(i = 0; i < maxfd + 1; ++i)
        {
            int r = 0;

            if(i == listener)
            {
                continue;
            }

            if(FD_ISSET(i, &readset))
            {
                r = do_read(i, state[i]);
            }

            if(r == 0 && FD_ISSET(i, &writeset))
            {
                r = do_write(i, state[i]);
            }

            if(r)
            {
                free_fd_state(state[i]);
                state[i] = NULL;
                close(i);
            }
        }
    }
}

int main(int argc, char ** argv)
{
    setvbuf(stdout, NULL, _IONBF, 0);

    run();

    return 0;
}

但這樣還不夠好: FD_SETSIZE是一個很大的值, 至少不小於1024. 當要監聽的fd的值比較大的時候, 就很噁心, 遍歷會遍歷不少次. 對於非阻塞IO接口來說, select是一個很粗糙的解決方案, 這個系統調用提供的功能比較薄弱, 只能說是夠用, 但接口確實太屎了, 很差用, 性能也堪優.

不一樣的操做系統平臺上提供了不少select的替代品, 它們都用於配套非阻塞IO接口來使單線程程序也有必定的併發能力. 這些替代品有poll(), epoll(), kqueue(), evports/dev/poll. 而且這些替代品的性能都比select()要好的多. 但比較蛋疼的是, 上面提到的全部接口, 幾乎都不是跨平臺的. epoll()是Linux獨有的, kqueue()是BSD系列(包括OS X)獨有的. evports/dev/poll是Solaris獨有的. 是的, select()屬於POSIX標準的一部分, 但就是性能捉急. 也就是說, 若是你寫的程序想跨平臺, 高性能, 你就得本身寫一層抽象, 把不一樣平臺對於IO多路複用的底層統一塊兒來: 這也就是Libevent乾的事情.

libevent的低級API爲IO多路複用提供了統一的接口, 其底層實如今不一樣的操做系統平臺上都是最高效的實現.

下面, 咱們將使用libevent對上面的程序進行重構. 注意: fd_sets不見了, 取而代之的是一個叫event_base的結構體.

/* For sockaddr_in */
#include <netinet/in.h>
/* For socket functions */
#include <sys/socket.h>
/* For fcntl */
#include <fcntl.h>

#include <event2/event.h>

#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>

#define MAX_LINE 16384

void do_read(evutil_socket_t fd, short events, void *arg);
void do_write(evutil_socket_t fd, short events, void *arg);

char
rot13_char(char c)
{
    /* We don't want to use isalpha here; setting the locale would change
     * which characters are considered alphabetical. */
    if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
        return c + 13;
    else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
        return c - 13;
    else
        return c;
}

struct fd_state {
    char buffer[MAX_LINE];
    size_t buffer_used;

    size_t n_written;
    size_t write_upto;

    struct event *read_event;
    struct event *write_event;
};

struct fd_state *
alloc_fd_state(struct event_base *base, evutil_socket_t fd)
{
    struct fd_state *state = malloc(sizeof(struct fd_state));
    if (!state)
        return NULL;
    state->read_event = event_new(base, fd, EV_READ|EV_PERSIST, do_read, state);
    if (!state->read_event) {
        free(state);
        return NULL;
    }
    state->write_event =
        event_new(base, fd, EV_WRITE|EV_PERSIST, do_write, state);

    if (!state->write_event) {
        event_free(state->read_event);
        free(state);
        return NULL;
    }

    state->buffer_used = state->n_written = state->write_upto = 0;

    assert(state->write_event);
    return state;
}

void
free_fd_state(struct fd_state *state)
{
    event_free(state->read_event);
    event_free(state->write_event);
    free(state);
}

void
do_read(evutil_socket_t fd, short events, void *arg)
{
    struct fd_state *state = arg;
    char buf[1024];
    int i;
    ssize_t result;
    while (1) {
        assert(state->write_event);
        result = recv(fd, buf, sizeof(buf), 0);
        if (result <= 0)
            break;

        for (i=0; i < result; ++i)  {
            if (state->buffer_used < sizeof(state->buffer))
                state->buffer[state->buffer_used++] = rot13_char(buf[i]);
            if (buf[i] == '\n') {
                assert(state->write_event);
                event_add(state->write_event, NULL);
                state->write_upto = state->buffer_used;
            }
        }
    }

    if (result == 0) {
        free_fd_state(state);
    } else if (result < 0) {
        if (errno == EAGAIN) // XXXX use evutil macro
            return;
        perror("recv");
        free_fd_state(state);
    }
}

void
do_write(evutil_socket_t fd, short events, void *arg)
{
    struct fd_state *state = arg;

    while (state->n_written < state->write_upto) {
        ssize_t result = send(fd, state->buffer + state->n_written,
                              state->write_upto - state->n_written, 0);
        if (result < 0) {
            if (errno == EAGAIN) // XXX use evutil macro
                return;
            free_fd_state(state);
            return;
        }
        assert(result != 0);

        state->n_written += result;
    }

    if (state->n_written == state->buffer_used)
        state->n_written = state->write_upto = state->buffer_used = 1;

    event_del(state->write_event);
}

void
do_accept(evutil_socket_t listener, short event, void *arg)
{
    struct event_base *base = arg;
    struct sockaddr_storage ss;
    socklen_t slen = sizeof(ss);
    int fd = accept(listener, (struct sockaddr*)&ss, &slen);
    if (fd < 0) { // XXXX eagain??
        perror("accept");
    } else if (fd > FD_SETSIZE) {
        close(fd); // XXX replace all closes with EVUTIL_CLOSESOCKET */
    } else {
        struct fd_state *state;
        evutil_make_socket_nonblocking(fd);
        state = alloc_fd_state(base, fd);
        assert(state); /*XXX err*/
        assert(state->write_event);
        event_add(state->read_event, NULL);
    }
}

void
run(void)
{
    evutil_socket_t listener;
    struct sockaddr_in sin;
    struct event_base *base;
    struct event *listener_event;

    base = event_base_new();
    if (!base)
        return; /*XXXerr*/

    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = 0;
    sin.sin_port = htons(40713);

    listener = socket(AF_INET, SOCK_STREAM, 0);
    evutil_make_socket_nonblocking(listener);

#ifndef WIN32
    {
        int one = 1;
        setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
    }
#endif

    if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) {
        perror("bind");
        return;
    }

    if (listen(listener, 16)<0) {
        perror("listen");
        return;
    }

    listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
    /*XXX check it */
    event_add(listener_event, NULL);

    event_base_dispatch(base);
}

int
main(int c, char **v)
{
    setvbuf(stdout, NULL, _IONBF, 0);

    run();
    return 0;
}

總之:

  1. 注意連接的時候加上 -levent
  2. 代碼量沒有減小, 邏輯也沒有簡化. libevent只是給你提供了一個通用的多路IO接口. 或者叫事件監聽接口.
  3. evutil_socket_t類型的使用, 與evutil_make_socket_nonblocking()函數的使用, 均是爲也跨平臺兼容性. 使用這些類型名與工具函數, 使得在windows平臺上代碼也能跑起來.

如今, 你看, 異步IO+事件處理(或者叫多路IO複用), 是單線程單進程程序取得併發能力的最佳途徑, 而libevent則是把多平臺的IO多路複用庫給你抽象統一成一層接口了. 這樣代寫的代碼不須要改動, 就能夠運行在多個平臺上.

這樣就有了三個問題:

  1. 若是個人代碼須要跨平臺, 或者只須要跨部分平臺(好比我只考慮Linux和BSD用戶, 徹底不考慮Windows平臺), 我爲何不本身把多路IO庫作個簡單的封裝, 爲何要使用libevent呢? 典型的就是Redis, 用了很薄的一層封裝, 下面統一了epoll, kqueue, evport, select等. 爲何, 我須要使用libevent呢?
  2. 若是將libevent做爲一個黑盒去用, 不可避免的問題就是: 它的性能怎麼樣? 它封裝了多個多路IO庫, 在封裝上是否有性能損失?
  3. 如今是個輪子都說本身解決了跨平臺問題, 那麼libevent在windows上表現怎麼樣? 它能兼容IOCP式多路IO庫嗎? 畢竟IOCP的設計思路和epoll``select``evport``kqueue等都不同.

答案在這裏:

  1. 你沒有任何理由非得使用libevent, redis就是一個很好的例子. libevent有很多功能, 但若是你只是跨小部分平臺, 而且只關注在多路IO複用上, 那麼真的沒什麼必要非得用libevent. 你徹底能夠像redis那樣, 用幾百行簡單的把多路IO庫本身封裝一下.
  2. 基本上這麼講吧: 你使用系統原生異步IO多路複用接口的性能是多少, 使用libevent就是多少. 說實施libevent裏沒太多的抽象, 接口也沒有多麼好用, 封閉很薄, 和你使用原生接口基本同樣.
  3. libevent從版本2開始就能搞定windows了. 上面咱們使用的是libevent很底層的接口, 其設計思路是遵循*nix上的事件處理模型的, 典型的就是selectepoll: 當網絡可讀寫時, 通知應用程序去讀去寫. 而windows上IOCP的設計思路是: 當網絡可讀可寫時不通知應用程序, 而是先完成讀與寫, 再通知應用程序, 應用程序直接拿到的就是數據. 當在libevent 2提供的bufferevents系列接口中, 它將*nix平臺下的設計, 改巴改巴改爲了IOCP式的. 使用這個系列的接口不可避免的, 對*nix平臺有性能損失(這和asio封裝網絡庫是同樣的作法), 但實話講, IOCP式的設計確實對程序員更友好, 代碼可讀性高了很多.

總的來講, 你應該在以下的場合使用libevent

  1. 代碼須要跨多個平臺, 甚至是windows
  2. 想在*nix平臺上使用IOCP式的事件接口編程
  3. 你不想本身封裝多個平臺上的多路IO接口, 而且自認爲, 就算本身作, 作的也確定沒有libevent好. libevent是一個久經考驗的很基礎的庫. 身經百戰.

下面是使用bufferevents系列接口, 以IOCP式風格對以前例子代碼的重構, 體驗一下更人性的事件處理方式:

/* For sockaddr_in */
#include <netinet/in.h>
/* For socket functions */
#include <sys/socket.h>
/* For fcntl */
#include <fcntl.h>

#include <event2/event.h>
#include <event2/buffer.h>
#include <event2/bufferevent.h>

#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>

#define MAX_LINE 16384

void do_read(evutil_socket_t fd, short events, void *arg);
void do_write(evutil_socket_t fd, short events, void *arg);

char
rot13_char(char c)
{
    /* We don't want to use isalpha here; setting the locale would change
     * which characters are considered alphabetical. */
    if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
        return c + 13;
    else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
        return c - 13;
    else
        return c;
}

void
readcb(struct bufferevent *bev, void *ctx)
{
    struct evbuffer *input, *output;
    char *line;
    size_t n;
    int i;
    input = bufferevent_get_input(bev);
    output = bufferevent_get_output(bev);

    while ((line = evbuffer_readln(input, &n, EVBUFFER_EOL_LF))) {
        for (i = 0; i < n; ++i)
            line[i] = rot13_char(line[i]);
        evbuffer_add(output, line, n);
        evbuffer_add(output, "\n", 1);
        free(line);
    }

    if (evbuffer_get_length(input) >= MAX_LINE) {
        /* Too long; just process what there is and go on so that the buffer
         * doesn't grow infinitely long. */
        char buf[1024];
        while (evbuffer_get_length(input)) {
            int n = evbuffer_remove(input, buf, sizeof(buf));
            for (i = 0; i < n; ++i)
                buf[i] = rot13_char(buf[i]);
            evbuffer_add(output, buf, n);
        }
        evbuffer_add(output, "\n", 1);
    }
}

void
errorcb(struct bufferevent *bev, short error, void *ctx)
{
    if (error & BEV_EVENT_EOF) {
        /* connection has been closed, do any clean up here */
        /* ... */
    } else if (error & BEV_EVENT_ERROR) {
        /* check errno to see what error occurred */
        /* ... */
    } else if (error & BEV_EVENT_TIMEOUT) {
        /* must be a timeout event handle, handle it */
        /* ... */
    }
    bufferevent_free(bev);
}

void
do_accept(evutil_socket_t listener, short event, void *arg)
{
    struct event_base *base = arg;
    struct sockaddr_storage ss;
    socklen_t slen = sizeof(ss);
    int fd = accept(listener, (struct sockaddr*)&ss, &slen);
    if (fd < 0) {
        perror("accept");
    } else if (fd > FD_SETSIZE) {
        close(fd);
    } else {
        struct bufferevent *bev;
        evutil_make_socket_nonblocking(fd);
        bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
        bufferevent_setcb(bev, readcb, NULL, errorcb, NULL);
        bufferevent_setwatermark(bev, EV_READ, 0, MAX_LINE);
        bufferevent_enable(bev, EV_READ|EV_WRITE);
    }
}

void
run(void)
{
    evutil_socket_t listener;
    struct sockaddr_in sin;
    struct event_base *base;
    struct event *listener_event;

    base = event_base_new();
    if (!base)
        return; /*XXXerr*/

    sin.sin_family = AF_INET;
    sin.sin_addr.s_addr = 0;
    sin.sin_port = htons(40713);

    listener = socket(AF_INET, SOCK_STREAM, 0);
    evutil_make_socket_nonblocking(listener);

#ifndef WIN32
    {
        int one = 1;
        setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
    }
#endif

    if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) {
        perror("bind");
        return;
    }

    if (listen(listener, 16)<0) {
        perror("listen");
        return;
    }

    listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
    /*XXX check it */
    event_add(listener_event, NULL);

    event_base_dispatch(base);
}

int
main(int c, char ** argv)
{
    setvbuf(stdout, NULL, _IONBF, 0);

    run();
    return 0;
}

說實話也沒人性到哪裏去, 底層庫就是這樣, libevent仍是太基礎了. 算不上十分友好的輪子.

3. Libevent 簡介

如今咱們要正式介紹Libevent

3.1 libevent的賣點

  1. 代碼跨平臺.
  2. 性能高. libevent在非阻塞IO+多路複用的底層實現上, 選取的是特定平臺上最快的接口. 好比Linux上用epoll, BSD上用kqueue
  3. 高併發可擴展. libevent就是爲了那種, 須要維持成千上萬的活動socket鏈接的應用程序使用的.
  4. 接口友好. 雖然並無友好多少, 但至少比原生的epoll要好一點.

3.2 libevent下的各個子模塊

  1. evutil 通用類型定義, 跨平臺相關的通用定義, 以及一些通用小函數
  2. event and event_base 核心模塊. 事件庫. *nix風格的事件模型: 在socket可讀可寫時通知應用程序.
  3. bufferevent 對核心事件庫的再一層封裝, IOCP式的事件模型: 在數據已讀已寫後通知應用程序
  4. evbuffer 這是bufferevent模塊內部使用的緩衝區實現.
  5. evhttp 簡單的HTTP C/S實現
  6. evdns 簡單的 DNS C/S實現
  7. evrpc 簡單的 RPC實現

總的來講, 做爲使用者, 須要關心的是:

  1. evutil是須要關心的
  2. 對於主在*nix平臺上寫後臺服務端程序的人: 只須要關心 event and event_base 核心庫的用法便可.
  3. 對於跨平臺, 特別是包含win平臺的開發人員: 須要關注 buffereventevbuffer, 對於核心庫event and event_base, 能夠不關心
  4. evhttp, evdns, evrpc, 如無須要, 能夠不用關心

3.3 libevent下的二進制庫

如下是在連接你的代碼的時候, 你須要瞭解的二進制庫.

  1. libevent_core 包含event and event_base, evutil, evbuffer, bufferevent中的全部函數
  2. libevent_extra 包含協議相關的函數. 包括 HTTP/DNS/RPC 等. 若是你用不到 evhttp/evdns/evrpc裏的函數, 那麼這個庫不用連接.
  3. libevent 滿清遺老, 包含了上面兩個庫裏的全部函數. 官方不建議在使用libevent 2.0以上的版本時連接這個庫. 這是個懶人庫.
  4. libevent_pthreads 若是你編寫多線程應用程序. 那麼這個庫裏包含了基於POSIX線程庫的相關函數實現. 若是你沒有用到libevent中有關的多線程函數, 那麼這個庫不用連接. 之前這些函數是劃分在libevent_core中的, 後來被單獨割出來了.注意: 這個庫不是全平臺的.
  5. libevent_openssl 這個庫裏的與OpenSSL相關的函數實現. 若是你沒有用到libevent中有關OpenSSL的函數, 那麼這個庫不用連接. 之前這些函數也算在libevent_core中, 最後也割出來了. 注意: 這個庫也不是全平臺的

3.4 libevent中的頭文件

libevent中的頭文件分爲三類, 全部頭文件都位於event2目錄下. 也就是說在代碼中你應當這樣寫:

#include <event2/xxxx>
#include <event2/xxxx>
#include <event2/xxxx>
#include <event2/xxxx>

具體有哪些頭文件在後續章節會詳細介紹, 目前只介紹這個分類:

  1. API 頭文件. 這些頭文件定義了libevent對外的接口. 這些頭文件沒有特定前綴.
  2. 兼容性 頭文件. 這些頭文件是爲了向前兼容老版本libevent存在的, 它們裏面定義了老版本的一些廢棄接口. 除非你是在作老代碼遷移工做, 不然不建議使用這些頭文件.
  3. 類型定義 頭文件. 定義了libevent庫中相關的類型. 這些頭文件有共同後綴_struct.h

3.5 如何將老版本的代碼遷移到libevent 2上

官方建議你們使用版本2, 但有時候這個世界就是不那麼讓人舒服, 若是你須要和版本1的歷史代碼打交道, 你能夠參照下面的對照表: 老頭文件與新頭文件的對照表

舊頭文件 新頭文件
event.h event2/event*.h, event2/buffer*.h, event2/bufferevent*.h, event2/tag*.h
evdns.h event2/dns*.h
evhttp.h event2/http*.h
evrpc.h event2/rpc*.h
evutil.h event2/util*.h

在當前的2.0版本中, 老的舊頭文件實際上是不須要替換的, 這些舊頭文件依然存在. 但仍是建議將他們替換成新頭文件, 由於說不定50年後libevent升級到3.0版本, 這些舊頭文件就被扔了.

另外還有一些點須要你注意:

  1. 在1.4版本以前, 只有一個二進制庫文件. libevent, 裏面是libevent的全部實現. 現在這些實現被分割到了 libevent_corelibevent_extra兩個庫中.
  2. 在2.0以前, libevent不支持鎖. 也就是說, 2.0以前若是要寫出線程安全的代碼, 你只能避免在線程間共享數據實例. 沒有其它辦法.

3.6 滿清遺老

官方對待老版本是這樣建議的:

  1. 1.4.7以前的版本被正式廢棄了
  2. 1.3以前的版本有一堆bug, 用的時候看臉吧.
  3. 推薦使用2.0後的版本

我對老版本的態度是這樣的: 能幹活就好. 沒有特殊緣由, 我是不會作代碼遷移的. 而且考慮到應用場景, 有時候用老版本也挺好的.

1.4.x版本的libevent被大量項目使用, 其實挺穩定的, 官方不建議使用, 只是官方再也不在1.4版本上再加特性修bug了. 1.4版本最後的一個小版本號就中止在7上不動了. 而對於1.3版本, 確實不該該再碰了.

4. 使用libevent的正確姿式

libevent有幾項全局設定, 若是你須要改動這幾項設定, 那麼確保在代碼初始化的時候設定好值, 一旦你的代碼流程開始了, 調用了第一個libevent中的任何函數, 後續強烈建議不要再更改設定值, 不然會引發不可預知的後果.

4.1 libevent中的日誌

libevent默認狀況下將把錯誤與警告日誌寫進stderr, 而且若是你須要一些libevent內部的調試日誌的話, 也能夠經過更改設定來讓其輸出調試日誌, 以在程序崩潰時提供更多的參考信息. 這些行爲均可以經過自行實現日誌函數進行更改. 下面是libevent相關的日誌接口.

// 如下是日誌級別
#define EVENT_LOG_DEBUG 0
#define EVENT_LOG_MSG   1
#define EVENT_LOG_WARN  2
#define EVENT_LOG_ERR   3

// 如下是已經被廢棄的日誌級別定義
/* Deprecated; see note at the end of this section */
#define _EVENT_LOG_DEBUG EVENT_LOG_DEBUG
#define _EVENT_LOG_MSG   EVENT_LOG_MSG
#define _EVENT_LOG_WARN  EVENT_LOG_WARN
#define _EVENT_LOG_ERR   EVENT_LOG_ERR

// 這個是一個函數指針類型別名, 指向日誌輸出函數
// 日誌輸出函數應當是一個雙參, 無返回值的函數, 第一個參數severity爲日誌級別, 第二個參數爲日誌字符串
typedef void (*event_log_cb)(int severity, const char *msg);

// 這是用戶自定義設置日誌處理函數的接口. 若是調用該函數時入參設置爲NULL
// 則將採用默認行爲
void event_set_log_callback(event_log_cb cb);

好比下面我須要改寫libevent記錄日誌的方式:

#include <event2/event.h>
#include <stdio.h>

// 丟棄日誌
static void discard_cb(int severity, const char * msg)
{
    // 這個日誌函數內部什麼也不作
}

// 將日誌記錄至文件
static FILE * logfile = NULL;
static void write_to_file_cb(int severity, const char * msg)
{
    const char * s;
    if(!logfile)
    {
        return;
    }

    switch(severity)
    {
        case EVENT_LOG_DEBUG:   s = "[DBG]";    break;
        case EVENT_LOG_MSG:     s = "[MSG]";    break;
        case EVENT_LOG_WARN:    s = "[WRN]";    break;
        case EVENT_LOG_ERR:     s = "[ERR]";    break;
        default:                s = "[???]";    break;
    }

    fprintf(logfile, "[%s][%s][%s] %s\n", __FILE__, __func__, s, msg);
}

void suppress_logging(void)
{
    event_set_log_callback(discard_cb);
}

void set_logfile(FILE * f)
{
    logfile = f;
    event_set_log_callback(write_to_file_cb);
}

注意: 在自定義的日誌輸出函數中, 不要調用其它libevent中的函數! 好比, 若是你要把日誌遠程輸出至網絡socket上去, 你還使用了bufferevent來輸出你的日誌, 在目前的libevent版本中, 這會致使一些奇怪的bug. libevent官方也認可這是一個設計上沒有考慮到的點, 這可能在後續版本中被移除, 但截止目前的2.1.8 stable版本, 這個問題都尚未解決. 不要做死.

默認狀況下的日誌級別是EVENT_LOG_MSG, 也就是說EVENT_LOG_DEBUG級別的日誌不會調用至日誌輸出函數. 要讓libevent輸出調試級別的日誌, 請使用下面的接口:

#define EVENT_DBG_NONE 0
#define EVENT_DBG_ALL 0xffffffffu

// 若是傳入 EVENT_DBG_NONE, 將保持默認狀態: 不輸出調試日誌
// 若是傳入 EVENT_DEG_ALL, 將開啓調試日誌的輸出
void event_enable_debug_logging(ev_uint32_t which);

調試日誌很詳盡, 一般狀況下對於libevent的使用者而言是沒有輸出的必要的. 由於要用到調試級別日誌的場合, 是你百般無奈, 開始懷疑libevent自己有bug的時候. 雖然從宏的命名上, 彷彿還存在着 EVENT_DGB_SOMETHING 這樣, 能夠單獨控制某個模塊的調試日誌輸出的參數, 但實際上並無: 調試日誌要麼全開, 要麼全關. 沒有中間地帶. 官方宣稱可能在後續的版本中細化調試日誌的控制.

而若是你要控制其它日誌級別的輸出與否, 請自行實現日誌輸出函數. 好比忽略掉EVENT_LOG_MSG級別的日誌之類的. 上面的接口只是控制"若是產生了調試日誌, libevent調用或不調用日誌輸出函數"而已.

上面有關日誌的接口均定義在<event2/event.h>中.

  1. 日誌輸出函數相關接口早在版本1.0時就有了.
  2. event_enable_debug_logging()接口在2.1.1版本以後纔有
  3. 日誌級別宏名, 在2.0.19以前, 是如下劃線開頭的, 即_DEBUG_LOG_XXX, 但如今已經廢棄掉了這種定義, 在新版本中請使用不帶下劃線開頭的版本.

4.2 正確處理致命錯誤

當libevent檢測到有致命的內部錯誤發生時(好比踩內存了之類的不可恢復的錯誤), 默認行爲是調用exit()abort(). 出現這種狀況99.99的緣由是使用者自身的代碼出現了嚴重的bug, 另外0.01%的緣由是libevent自身有bug.

若是你但願在進程退出以前作點額外的事情, 寫幾行帶fxxk的日誌之類的, libevent提供了相關的入口, 這能夠改寫libevent對待致命錯誤的默認行爲.

typedef void (*event_fatal_cb)(int err);
void event_set_fatal_callback(event_fatal_cb cb);

注意, 不要試圖強行恢復這種致命錯誤, 也就是說, 雖然libevent給你提供了這麼個接口, 但不要在註冊的函數中試圖讓進程繼續執行. 由於這個時候libevent內部已經有坑了, 若是繼續強行恢復, 結果是不可預知的. 換個說法: 這個函數應該提供的是臨終遺言, 而不該該試圖救死扶傷.

這個函數也定義在 <event2/event.h>中, 在2.0.3版本以後可用.

4.3 內存管理

默認狀況下, libevent使用的是標準C庫中的內存管理函數, 即malloc(), realloc(), free()等. libevent容許你使用其它的內存管理庫, 好比tcmallocjemalloc. 相關接口以下:

void event_set_mem_functions(void *(*malloc_fn)(size_t sz),
                             void *(*realloc_fn)(void *ptr, size_t sz),
                             void (*free_fn)(void *ptr));

接口的第一個參數是內存分配函數指針, 第二個參數是內存重分配函數指針, 第三個參數是內存釋放函數指針.

下面是一個使用的例子:

#include <event2/event.h>
#include <sys/types.h>
#include <stdlib.h>

union alignment
{
    size_t sz;
    void * ptr;
    double dbl;
};

#define ALIGNMENT sizeof(union alignment)

#define OUTPTR(ptr)     (((char *)ptr) + ALIGNMENT)
#define INPTR(ptr)      (((char *)ptr) - ALIGNMENT)

static size_t total_allocated = 0;

static void * my_malloc(size_t sz)
{
    void * chunk = malloc(sz + ALIGNMENT);
    if(!chunk)  return chunk;

    total_allocated += sz;

    *(size_t *)chunk = sz;

    return OUTPTR(chunk);
}

static void * my_realloc(void * ptr, size_t sz)
{
    size_t old_size = 0;

    if(ptr)
    {
        ptr = INPTR(ptr);
        old_size = *(size_t*)ptr;
    }

    ptr = realloc(ptr, sz + ALIGNMENT);

    if(!ptr)
    {
        return NULL;
    }

    *(size_t *)ptr = sz;

    total_allocated = total_allocated - old_size + sz;

    return OUTPTR(ptr);
}

static void my_free(void * ptr)
{
    ptr = INPTR(ptr);
    total_allocated -= *(size_t *)ptr;
    free(ptr);
}

void start_counting_bytes(void)
{
    event_set_mem_functions(
        my_malloc, my_realloc, my_free
    );
}

上面這個例子中, 提供了一種記錄全局內存使用量的簡單方案, 非線程安全.

對於自定義內存管理接口, 須要注意的有:

  1. 再次重申, 這是一個全局設定, 一旦設定, 後續全部的libevent函數內部的內存操做都會受影響. 而且不要在代碼流程中途更改設定.
  2. 自定義的內存管理函數, 在分配內存時, 返回的指針後必須確保在至少sz個字節可用.
  3. 自定義的內存重分配函數, 必須正確處理realloc(NULL, sz)這種狀況: 即, 使之行爲等同於 malloc(sz). 也必須正確處理realloc(ptr, 0)這種狀況: 即, 使之行爲與free(ptr)相同且返回NULL.
  4. 自定義的內存釋放函數, 必須正確處理 free(NULL): 什麼也不作.
  5. 自定義的內在分配函數, 必須正確處理 malloc(0): 返回NULL.
  6. 若是你在多線程環境中使用libevent, 請務必確保內存分配函數是線程安全的.
  7. 若是你要釋放一個由libevent建立來的內存區域, 請確認你使用的free()版本與libevent內部使用的內存管理函數是一致的. 也就是說: 若是要操做libevent相關的內存區域, 請確保相關的內存處理函數和libevent內部使用的內在管理函數是一致的. 或者簡單一點: 若是你決定使用某個內存管理庫, 那麼在整個項目範圍內都使用它, 這樣最簡單, 不容易出亂子. 不然應該盡力避免在外部操做libevent建立的內存區域.

event_set_mem_functions()接口也定義在<event2/event.h>中, 在2.0.2版本後可用.

須要注意的是: libevent在編譯安裝的時候, 能夠關閉event_set_mem_functions()這個特性. 若是關閉了這個特性, 而在項目中又使用了這個特性, 那麼在項目編譯時, 編譯將報錯. 若是要檢測當前引入的libevent庫是否啓用了這個功能, 能夠經過檢測宏EVENT_SET_MEM_FUNCTIONS_IMPLEMENTED宏是否被定義來判斷.

4.4 線程與鎖

多線程程序設計裏的數據訪問是個大難題. 目前的版本里, libevent支持了多線程編程, 但這個支持法呢, 怎麼講呢, 使用者仍是須要知道很多細節才能正確的寫出多線程應用. libevent中的數據結構分爲三類:

  1. 有一些數據結構就是非線程安全的. 這是歷史遺留問題, libevent在大版本號更新爲2後才支持多線程, 這些數據結構是從版本1一路繼承下來的, 不要在多線程中共享這些實例. 沒辦法.
  2. 有一些數據結構的實例能夠用鎖保護起來, 以在多線程環境中共享. 若是你須要在多個線程中訪問某個實例, 那麼你須要給libevent說明這個狀況, 而後libevent會爲這個實例加上適當的鎖保護, 以確保你在多線程訪問它時是安全的. 加鎖不須要你去加, 你須要作的只是告訴libevent一聲, 如何具體操做後面再講.
  3. 有些數據結構, 天生就是帶鎖的. 若是你帶 libevent_pthreads 庫連接你的程序, 那麼這些結構的實例在多線程環境中必定的安全的. 你想讓它不安全都沒辦法.

雖然libevent爲你寫了一些加鎖解鎖的無聊代碼, 你沒必要要手動爲每一個對象加鎖了, 但libevent仍是須要你指定加鎖的函數. 就像你能夠爲libevent指定其它的內存管理庫同樣. 注意這也是一個全局設定, 請遵循咱們一再強調的使用規則: 進程初始化時就定好, 後續不準再更改.

若是你使用的是POSIX線程庫, 或者標準的windows原生線程庫, 那麼簡單了一些. 設置加解鎖函數只須要一行函數調用, 接口以下:

#ifdef WIN32
int evthread_use_windows_threads(void);
#define EVTHREAD_USE_WINDOWS_THREADS_IMPLEMENTED
#endif

#ifdef _EVENT_HAVE_PTHREADS
int evthread_use_pthreads();
#define EVTHREAD_USE_PTHREADS_IMPLEMENTED
#endif

這兩個函數在成功時都返回0, 失敗時返回-1.

這只是加解鎖. 但若是你想要自定義的是整個線程庫, 那麼你就須要手動指定以下的函數與結構定義

  1. 鎖的定義
  2. 加鎖函數
  3. 解鎖函數
  4. 鎖分配函數
  5. 鎖釋放函數
  6. 條件變量定義
  7. 條件變量建立函數
  8. 條件變量釋放函數
  9. 條件變量等待函數
  10. 通知/廣播條件變量的函數
  11. 線程定義
  12. 線程ID檢測函數

這裏須要注意的是: libevent並不會爲你寫哪怕一行的多線程代碼, libevent內部也不會去建立線程. 你要使用多線程, OK, 你用哪一種線程庫都行, 沒問題. 但你須要將配套的鎖/條件變量/線程檢測函數以及相關定義告訴libevent, 這樣libevent纔會知道如何在多線程環境中保護本身的實例, 以供你在多線程環境中安全的訪問.

  1. 若是你使用的是POSIX線程或者windows原生線程庫, 就方便了一點, 調一行函數的事情.
  2. 若是你在使用POSIX純種或windows原生線程庫時, 你不想使用POSIX配套的鎖, 那OK, 你在調用完evthread_use_xxx_threads()以後, 把你本身的鎖函數或者條件變量函數提供給libevent就行了. 注意這種狀況下, 在你的程序的其它地方也須要使用你指定的鎖或條件變量.
  3. 而若是你使用的是其它線程庫, 也OK, 只不過麻煩一點, 要提供鎖的相關信息, 要提供條件變量的相關信息, 也要提供線程ID檢測函數

下面是相關的接口

// 鎖模式是 lock 與 unlock 函數的參數, 它指定了加鎖解鎖時的一些額外信息
// 若是調用 lock 或 unlock 時的鎖都不知足下面的三種模式, 參數傳0便可
#define EVTHREAD_WRITE  0x04        // 鎖模式: 僅對讀寫鎖使用: 獲取或釋放寫鎖
#define EVTHREAD_READ   0x08        // 鎖模式: 僅對讀寫鎖使用: 獲取或釋放讀鎖
#define EVTHREAD_TRY    0x10        // 鎖模式: 僅在加鎖時使用: 僅在能夠當即加鎖的時候纔去加鎖. 
                                    //         若當前不可加鎖, 則lock函數當即返回失敗, 而不是阻塞

// 鎖類型是 alloc 與 free 函數的參數, 它指定了建立與銷燬的鎖的類型
// 鎖類型能夠是 EVTHREAD_LOCKTYPE_XXX 之一或者爲0
// 全部支持的鎖類型均須要被登記在 supported_locktypes 中, 若是支持多種鎖, 則多個宏之間用 | 連結構成該字段的值
                                            // 當鎖類型爲0時, 指的是普通的, 非遞歸鎖
#define EVTHREAD_LOCKTYPE_RECURSIVE 1       // 鎖類型: 遞歸鎖, 你必須提供一種遞歸鎖給libevent使用
#define EVTHREAD_LOCKTYPE_READWRITE 2       // 鎖類型: 讀寫鎖, 在2.0.4版本以前, libevent內部沒有使用到讀寫鎖

#define EVTHREAD_LOCK_API_VERSION 1

// 將你要用的有關鎖的全部信息放在這個結構裏
struct evthread_lock_callbacks {
       int lock_api_version;                // 必須與宏 EVTHREAD_LOCK_API_VERSION的值一致
       unsigned supported_locktypes;        // 必須是宏 EVTHREAD_LOCKTYPE_XXX 的或組合, 或爲0
       void *(*alloc)(unsigned locktype);               // 鎖分配, 須要指定鎖類型
       void (*free)(void *lock, unsigned locktype);     // 鎖銷燬, 須要指定鎖類型
       int (*lock)(unsigned mode, void *lock);          // 加鎖, 須要指定鎖模式
       int (*unlock)(unsigned mode, void *lock);        // 解鎖, 須要指定鎖模式
};

// 調用該函數以設置相關鎖函數
int evthread_set_lock_callbacks(const struct evthread_lock_callbacks *);

// 調該函數以設置線程ID檢測函數
void evthread_set_id_callback(unsigned long (*id_fn)(void));

// 將你要用的有關條件變量的全部信息都放在這個結構裏
struct evthread_condition_callbacks {
        int condition_api_version;
        void *(*alloc_condition)(unsigned condtype);
        void (*free_condition)(void *cond);
        int (*signal_condition)(void *cond, int broadcast);
        int (*wait_condition)(void *cond, void *lock,
            const struct timeval *timeout);
};

// 經過該函數以設置相關的條件變量函數
int evthread_set_condition_callbacks(
        const struct evthread_condition_callbacks *);

要探究具體如何使用這些函數, 請看libevent源代碼中的evthread_pthread.cevthread_win32.c文件.

對於大多數普通用戶來講, 只須要調用一下evthread_use_windows_threads()evthread_use_pthreads()就好了.

上面這些函數均定義在 <event2/thread.h>中. 在2.0.4版本後這些函數纔可用. 2.0.1至2.0.3版本中使用了一些舊接口, event_use_pthreads()等. 有關條件變量的相關接口直至2.0.7版本纔可用, 引入條件變量是爲了解決以前libevent出現的死鎖問題.

libevent自己能夠被編譯成不支持鎖的二進制庫, 用這種二進制庫連接你的多線程代碼, bomshakalaka, 跑不起來. 這算是個無用知識點.

另外額外注意: 多線程程序, 而且還使用了POSIX線程庫和配套的鎖, 那麼你須要連接libevent_pthreads. windows平臺則不用.

4.5 小知識點: 有關鎖的調試

libevent有一個額外的特性叫"鎖調試", 開啓這種特性後, libevent將把它內部有關鎖的全部調用都再包裝一層, 以檢測/獲取在鎖調用過程當中出現的錯誤, 好比:

  1. 解了一個沒有持有的鎖
  2. 對一個非遞歸鎖進行了二次加鎖

若是出現了上述錯誤, 則libevent會致使進程退出, 並附送一個斷言錯誤

要開啓這個特性, 調用下面的接口:

void evthread_enable_lock_debugging(void);
#define evthread_enable_lock_debuging() evthread_enable_lock_debugging()

注意, 這也是一個全局設置項, 請遵循: 一次設置, 初始化時就設置, 永不改動的規則.

這個特性在2.0.4版本中開始支持, 當時接口函數名拼寫錯誤了, 少寫了一個g: evthread_enable_lock_debuging(). 後來在2.1.2版本中把這個錯誤的拼寫修正過來了. 但仍是兼容了以前的錯誤拼寫.

這個特性吧, 很明顯是libevent內部開發時使用的. 如今開放出來估計是考慮到, 若是你的代碼中出現了一個bug是由libevent內部加解鎖失誤致使的, 那麼用個特性能夠定位到libevent內部. 不然你很難把鍋甩給libevent. 固然這種狀況不多見.

4.6 小知識點: 排除不正確的使用姿式

libevent是一個比較薄的庫, 薄的好處是性能很好, 壞處是沒有在接口上對使用者作過多的約束. 這就致使一些二把刀使用者常常會錯誤的使用libevent. 常見的智障行爲有:

  1. 向相關接口傳遞了一個未初始化的事件結構實例
  2. 試圖第二次初始化一個正在被使用的事件結構實例

這種錯誤其實挺難發現的, 爲了解決這個痛點, libevent額外開發了一個新特性: 在發生上述狀況的時候, libevent給你報錯.

但這是一個會額外消耗資源的特性, libevent內部實際上是追蹤了每一個事件結構的初始化與銷燬, 因此僅在開發測試的時候打開它, 發現問題, 解決問題. 在實際部署的時候, 不要使用這個特性. 開啓這個特性的接口以下:

void event_enable_debug_mode(void);

再不厭其煩的講一遍: 全局設定, 初始化時設定, 一次設定, 永不更改.

這個特性開啓後, 也有一個比較蛋疼的事情: 就是若是你的代碼裏大量使用了event_assign()來建立事件結構, 可能你的程序在這個特性下會OOM掛掉..緣由是: libevent能夠經過對event_new()event_free()的追蹤來檢測事件結構實例是否未被初始化, 或者被屢次初始化, 或者被非法使用. 可是對於event_assign()拷貝來的事件結構, 這追蹤就無能爲力了, 而且蛋疼的是event_assign()仍是淺拷貝. 這樣, 若是你的代碼裏大量的使用了event_assign(), 這就會致使內置的的追蹤功能一旦追上車就下不來了, 完事車太多就OOM掛掉了.

爲了不在這個特性下因爲追蹤event_assign()建立的事件實例(或許這裏叫實例已經不合適了, 應該叫句柄)而致使程序OOM, 能夠調用下面的函數以解除對這種事件實例的追蹤, 以免OOM

void event_debug_unassign(struct event * ev);

這樣, 調試模式下, 相關的追蹤檢測就會放棄追蹤由event_assign建立的事件. 因此你看, 這個特性也不是萬能的, 有缺陷, 湊合用吧. 在不開啓調試模式下, 調用event_debug_unassign()函數沒有任何影響

下面是一個例子:

#include <event2/event.h>
#include <event2/event_struct.h>

#include <stdlib.h>

void cb(evutil_socket_t fd, short what, void *ptr)
{
    struct event *ev = ptr;

    if (ev)     // 經過判斷入參是否爲NULL, 來確認入參攜帶的事件實例是event_new來的仍是event_assign來的
        event_debug_unassign(ev); // 若是是event_assign來的, 那麼就放棄對它的追蹤
}

/*
 * 下面是一個簡單的循環, 等待fd1與fd2同時可讀
 */
void mainloop(evutil_socket_t fd1, evutil_socket_t fd2, int debug_mode)
{
    struct event_base *base;
    struct event event_on_stack, *event_on_heap;        // 一個是棧上的事件實例, 一個是堆上的事件實例

    if (debug_mode)
       event_enable_debug_mode();       // 開啓調試模式

    base = event_base_new();

    event_on_heap = event_new(base, fd1, EV_READ, cb, NULL);    // 經過event_new來建立堆上的實例, 並把事件回調的入參設置爲NULL
    event_assign(&event_on_stack, base, fd2, EV_READ, cb, &event_on_stack); // 經過event_assign來初始化棧上的實例, 並把事件回調的入參設置爲事件實例自身的指針

    event_add(event_on_heap, NULL);
    event_add(&event_on_stack, NULL);

    event_base_dispatch(base);

    event_free(event_on_heap);
    event_base_free(base);
}

這個例子也寫的比較蛋疼, 湊合看吧.

另外, 調試模式下的詳情調試信息, 只能經過在編譯時額外定義宏USE_DEBUG來附加. 即在編譯時加上-DUSE_DEBUG來開啓. 加上這個編譯時的宏定義後, libevent就會輸出一大坨有關其內部流程的詳情日誌, 包括但不限於

  1. 事件的增長
  2. 事件的刪除
  3. 與具體平臺相關的事件通知信息

這些詳情不能經過調用API的方式開啓或關閉. 而開啓調試模式的API, 在2.0.4版本後纔可用.

4.7 檢測當前項目中引用的libevent的版本

接口很簡單, 以下:

#define LIBEVENT_VERSION_NUMBER 0x02000300
#define LIBEVENT_VERSION "2.0.3-alpha"
const char *event_get_version(void);        // 獲取字符串形式的版本信息
ev_uint32_t event_get_version_number(void); // 獲取值形式的版本信息

值形式的版本信息由一個uint32_t類型存儲, 從高位到低位, 每8位表明一個版本號. 好比 0x02000300表明的版本號就是02.00.03.00. 三級版本號後可能還有一個小版本號, 好比就存在過一個2.0.1.18的版本

下面是一個在編譯期檢查libevent版本的寫法, 若版本小於2.0.1, 則編譯不經過. 須要注意的是, 編譯期檢查的是宏裏的值, 若是你的項目構建比較混亂, 極可能出現頭文件的版本, 和最終連接的二進制庫的版本不一致的狀況. 因此編譯期檢查也不必定靠譜

#include <event2/event.h>

#if !defined(LIBEVENT_VERSION_NUMBER) || LIBEVENT_VERSION_NUMBER < 0x02000100
#error "This version of Libevent is not supported; Get 2.0.1-alpha or later."
#endif

int
make_sandwich(void)
{
        /* Let's suppose that Libevent 6.0.5 introduces a make-me-a
           sandwich function. */
#if LIBEVENT_VERSION_NUMBER >= 0x06000500
        evutil_make_me_a_sandwich();
        return 0;
#else
        return -1;
#endif
}

下面是一個在運行時檢查libdvent版本的寫法. 檢查運行期的版本是經過函數調用檢查的, 這就保證了返回的版本號必定是連接進的庫的版本號. 這個比較靠譜. 另外須要注意的是, 數值形式的版本號在libevent2.0.1以後才提供. 因此只能比較蠢的用比較字符串的方式去判斷版本號

#include <event2/event.h>
#include <string.h>

int
check_for_old_version(void)
{
    const char *v = event_get_version();
    /* This is a dumb way to do it, but it is the only thing that works
       before Libevent 2.0. */
    if (!strncmp(v, "0.", 2) ||
        !strncmp(v, "1.1", 3) ||
        !strncmp(v, "1.2", 3) ||
        !strncmp(v, "1.3", 3)) {

        printf("Your version of Libevent is very old.  If you run into bugs,"
               " consider upgrading.\n");
        return -1;
    } else {
        printf("Running with Libevent version %s\n", v);
        return 0;
    }
}

int
check_version_match(void)
{
    ev_uint32_t v_compile, v_run;
    v_compile = LIBEVENT_VERSION_NUMBER;
    v_run = event_get_version_number();
    if ((v_compile & 0xffff0000) != (v_run & 0xffff0000)) {
        printf("Running with a Libevent version (%s) very different from the "
               "one we were built with (%s).\n", event_get_version(),
               LIBEVENT_VERSION);
        return -1;
    }
    return 0;
}

接口和宏的定義位於 <event2/event.h>中, 字符串形式的版本號在1.0版本就提供了, 數值形式的版本號直至2.0.1才提供

4.8 一鍵釋放全部全局實例

就算你手動釋放了全部在程序代碼初始化時建立的libevent對象, 在程序退出以前, 也依然有一些內置的, 對使用者不可見的libevent內部實例以及一些全局配置實例存在着, 而且存在在堆區. 通常狀況下不用管它們: 程序都退出了, 釋放不釋放有什麼區別呢? 反正操做系統會幫你清除的. 但有時你想引入一些第三方的分析工具, 好比檢測內存泄漏的工具時, 就會致使這些工具誤報內存泄漏.

你能夠簡單的調一下下面這個函數, 完成一鍵徹底清除:

void libevent_global_shutdown(void);

注意哦: 這個函數不會幫你釋放你本身調用libevent接口建立出來的對象哦! 還沒那麼智能哦!

另外, 很顯然的一點是, 當調用了這個函數以後, 再去調用其它libevent接口, 可能會出現異常哦! 因此沒事不要調用它, 若是你調用它, 那麼必定是自殺前的最後一秒.

函數定義在<event2/event.h>中, 2.1.1版本後可用

相關文章
相關標籤/搜索