使用ePump框架編寫TCP服務器

基於非阻塞、多線程、事件驅動模型的 ePump 框架能夠很好地解決複雜的線程調度、高效通訊等問題,使用 ePump 框架可快速開發各種通訊服務器系統,像常見的HTTP Web服務器、RTMP流媒體服務器、及時通訊消息系統等等。git

一. 使用ePump框架開發前需下載安裝的庫

在使用ePump框架編程前,首先須要從GitHub下載並安裝C語言基礎庫adif 數據結構和基礎算法庫ePump框架,ePump框架依賴於adif基礎庫,adif 基礎庫 和 ePump 框架都是標準C語言開發,並以庫的形式集成到應用程序中。下載這兩個開源系統的代碼到本地,make && make install 後,編譯成功的動態庫和靜態庫缺省地被安裝到 /usr/local/lib 目錄下,頭文件則安裝到 /usr/local/include/adif 和 /usr/local/include 中。程序員

下面講解如何使用ePump框架來開發一個echo功能的TCP服務器程序。github

使用這兩個庫編程時,須要包含adif基礎庫和ePump框架的頭文件:算法

#include "adifall.ext"
#include "epump.h"
#include <signal.h>

其中adifall.ext文件包含了adif庫中全部基礎數據結構和算法模塊的頭文件,具體功能能夠參考開源項目adif 數據結構和基礎算法庫。epump.h是調用ePump框架功能模塊的頭文件。因爲須要處理信號,包含了signal.h文件。編程

二. 開發高性能通訊服務器基本需求

你們都知道,建立TCP監聽服務器時,最基本的通訊開發啓蒙知識三部曲,首先要建立socket_t文件描述符,綁定本地IP地址和端口,在指定端口上啓動監聽,等待客戶端發起TCP鏈接請求。可是對於高級程序員或商業級服務器需求的系統來講,高性能是必須的終極需求,開發人員還須要嚴肅地考慮以下問題:bash

  1. TCP服務器系統既要支持IPv4地址,也要支持IPv6地址;
  2. 如何採用多線程或多進程來處理每個鏈接請求;
  3. 短期內產生大量的TCP鏈接請求時,如何在多個線程或多個進程間負載均衡;
  4. 因爲線程或進程總數有限(小於1024),單臺機器處理幾十萬併發的TCP鏈接時,如何採用多路複用技術解決大併發I/O;

這些問題直接考驗了商業級通訊服務器系統其性能的高低、吞吐能力的大小、CPU處理能力的高效運用等等,能解決好這些問題,無疑是一個能經受實戰的好系統,ePump框架天生就是解決這些問題的好手。服務器

三. 建立框架實例

使用ePump框架,先調用API接口,建立ePump框架實例,數據結構

epcore_t  * pcore = NULL;
    void      * mlisten = NULL;
    int         listenport = 8080;

    gpcore = pcore = epcore_new(65536, 1);

其中第一個參數是服務器系統能同時容許打開的文件描述符的最大數量,也就是這款TCP服務器能支撐的最大併發數量,第二個參數通常設置爲1,指的是建立iodev_t等基礎設備對象時,儘可能將其派送到當前工做線程。多線程

四. 啓動TCP監聽服務

使用ePump框架,啓動TCP監聽服務很簡單,調用eptcp_mlisten接口便可,併發

mlisten = eptcp_mlisten(pcore, NULL, listenport, NULL, echo_pump, pcore);
    if (!mlisten) goto exit;
 
    printf("EchoSrv TCP Port: %d being listened\n", listenport);

按照頭文件中的描述,該接口函數定義以下:

/* Note: automatically detect if Linux kernel supported REUSEPORT. 
   if supported, create listen socket for every current running epump threads
   and future-started epump threads.
   if not, create only one listen socket for all epump threads to bind. */
 
void * eptcp_mlisten (void * vpcore, char * localip, int port, void * para,
                      IOHandler * cb, void * cbpara);

最新的Linux操做系統對於通訊編程作了不少優化,其中對於內核對象Socket使用REUSEPORT選項,來解決端口複用問題,這樣使得每個線程或進程均可以監聽同一個端口,並接受新的鏈接請求。那麼大量的客戶端同時對監聽端口發起TCP三路握手,想創建到達服務器的TCP鏈接時,這些鏈接到底交給哪個啓動了監聽服務的線程或進程?這個問題你們本身作功課,這裏不贅述。

ePump框架中採用 epoll / select等多路複用接口來監聽鏈接到來和讀寫事件,監聽設備和定時器併產生事件的線程是epump線程,處理事件的線程是worker線程。eptcp_mlisten自動地爲每個epump線程建立listen socket(支持REUSEPORT時),或建立一個listen socket但綁定到每個epump線程中。這樣大量的鏈接請求到來時,將會由epump線程處理其負載均衡。

eptcp_mlisten函數的第一個參數是ePump框架實例;第二個參數爲綁定的本機IP地址,若是綁定全部本機IP地址,該值爲NULL;第三個參數爲監聽的端口號;第四個參數是設置當前正在建立的監聽設備的綁定參數,通常跟當前監聽設備iodev_t有關的實例對象;第五個參數是設置當前正在建立的監聽設備iodev_t對象當有鏈接請求過來時的回調函數;第六個參數是傳遞給回調函數的參數變量。

做者在程序中習慣爲全部的事件回調處理設置爲一個統一的回調函數echo_pump,固然你們能夠根據本身的愛好和習慣,爲每一個iodev_t對象的讀寫事件設置不一樣的回調函數。

這樣,啓動這個TCP服務器監聽服務時,前面提到的各類商業TCP服務器需面對的問題,這裏都解決了。

五. 定時器的運用

這個sample程序中,給你們示範了定時器的用法。使用定時器能夠按期作一些檢查或校驗工做,譬如一個TCP鏈接長時間沒有數據往來,經過定時器機制來關閉着這些不活躍的TCP鏈接。

iotimer_start(pcore, 90*1000, 1001, NULL, echo_pump, pcore);

這是啓動一個90秒後發送超時TIMEOUT事件的定時器,超時事件將由echo_pump函數來處理。本例的回調函數中沒有處理不活躍TCP鏈接的代碼,你們感興趣可自行添加。

六. 啓動ePump線程組

建立了TCP監聽服務後,須要啓動ePump框架的兩類線程組:epump線程組 和 worker線程組,代碼以下:

cpunum = get_cpu_num();
    epumpnum = cpunum * 0.2;
    if (epumpnum < 3) epumpnum = 3;
    workernum = cpunum - epumpnum;
    if (workernum < 3) workernum = 3;
 
    /* start 3 worker threads */
    epcore_start_worker(pcore, workernum);
 
    /* start 3 epump threads */
    epcore_start_epump(pcore, epumpnum - 1);

    /* main thread executing the epump_main_proc as an epump thread */
    epump_main_start(pcore, 0);
 
    epcore_clean(pcore);
 
    printf("main thread exited\n");
    return 0;
 
exit:
    epcore_clean(pcore);
    printf("main thread exception exited\n");
    return -1;

七. ePump框架的回調函數

以上介紹的是主程序,下面須要介紹的回調函數echo_pump的實現.

回調函數的原型定義以下:

typedef int IOHandler (void * vmgmt, void * pobj, int event, int fdtype);

第一個參數是設置回調函數時給定的參數,第二個是當前產生事件的對象,或者是iodev_t對象,或者是iotimer_t對象,第三個參數是event事件類型,第四個參數是文件描述符類型。

其中的event事件類型以下:

/* define the event type, including getting connected, connection accepted, readable,
 * writable, timeout. the working threads will be driven by these events */
#define IOE_CONNECTED        1
#define IOE_CONNFAIL         2
#define IOE_ACCEPT           3
#define IOE_READ             4
#define IOE_WRITE            5
#define IOE_INVALID_DEV      6
#define IOE_TIMEOUT          100
#define IOE_DNS_RECV         200
#define IOE_USER_DEFINED     10000

其中的文件描述符類型fdtype預約義值共有:

/* the definition of FD type in the EventPump device */
#define FDT_LISTEN            0x01
#define FDT_CONNECTED         0x02
#define FDT_ACCEPTED          0x04
#define FDT_UDPSRV            0x08
#define FDT_UDPCLI            0x10
#define FDT_USOCK_LISTEN      0x20
#define FDT_USOCK_CONNECTED   0x40
#define FDT_USOCK_ACCEPTED    0x80
#define FDT_RAWSOCK           0x100
#define FDT_FILEDEV           0x200
#define FDT_TIMER             0x10000
#define FDT_USERCMD           0x20000
#define FDT_LINGER_CLOSE      0x40000
#define FDT_STDIN             0x100000
#define FDT_STDOUT            0x200000

八. 如何接受客戶端鏈接請求

當被監聽端口8080上收到一個TCP鏈接請求時,echo_pump函數會被回調,回調參數中event爲IOE_ACCEPT,fdtype爲FDT_LISTEN,其中pobj就是監聽設備對象。

switch (event) {
    case IOE_ACCEPT:
        if (fdtype != FDT_LISTEN)
            return -1;
 
        while (1) {
            pdev = eptcp_accept(iodev_epcore(vobj), vobj, NULL, &ret, echo_pump, pcore, BIND_ONE_EPUMP);
            if (!pdev) break;
 
            printf("\nThreadID=%lu, ListenFD=%d EPumpID=%lu WorkerID=%lu "
               " ==> Accept NewFD=%d EPumpID=%lu\n",
               get_threadid(), iodev_fd(vobj), epumpid(iodev_epump(vobj)),
               workerid(worker_thread_self(pcore)),
               iodev_fd(pdev), epumpid(iodev_epump(pdev)));
        }
        break;

這裏使用了一個while循環來調用eptcp_accept函數,目的是解決多個TCP鏈接同時到來,ePump框架使用一個事件通知驅動回調函數去處理和執行的狀況,不使用循環處理,就會漏掉某些客戶的TCP鏈接請求。

函數 eptcp_accept 接受TCP鏈接請求,並建立新鏈接對應的iodev_t設備對象pdev,設置該對象在數據可讀時的回調函數,這個函數會自動處理多線程之間的鏈接設備對象的負載均衡。函數成功執行完後的結果是一個新的客戶端TCP鏈接創建起來了,針對該新鏈接進行數據讀取操做的回調函數也都設置了。

eptcp_accept函數的原型以下:

void * eptcp_accept (void * vpcore, void * vld, void * para, int * retval,
                     IOHandler * cb, void * cbpara, int bindtype);

第一個參數爲ePump框架實例,第二個參數是監聽設備iodev_t對象,由回調函數攜帶進來,第三個參數是新建立的客戶端TCP鏈接設備iodev_t對象的內置參數,第四個參數爲返回值,大於等於0表示鏈接創建成功,小於0失敗,第五個和第六個參數爲新建立的鏈接對象的回調函數,第七個參數是設置綁定epump線程的指令類型,共有以下幾種:

/* bind type specifying how iodev_t devices are bound to the underlying epump thread */
#define BIND_NONE                0
#define BIND_ONE_EPUMP           1
#define BIND_GIVEN_EPUMP         2
#define BIND_ALL_EPUMP           3
#define BIND_NEW_FOR_EPUMP       4
#define BIND_CURRENT_EPUMP       5

綁定epump線程的指令類型共有6個,其含義以下:

  • BIND_NONE是初始值,不綁定任何epump線程;
  • BIND_ONE_EPUMP是從當前epump線程中找一個負載最低的線程來綁定;
  • BIND_GIVEN_EPUMP是指定一個epump線程來創建綁定;
  • BIND_ALL_EPUMP是綁定全部的epump線程。這中狀況通常是在監聽設備對象建立後,通常在Linux內核版本低於3.9版本狀況下,即不支持REUSEPORT功能時,使用這個類型。
  • BIND_NEW_FOR_EPUMP通常用於ePump框架內部,應用程序不建議使用。
  • BIND_CURRENT_EPUMP是綁定產生當前鏈接事件的epump線程。系統內部和操做系統內核對負載會實現均衡分配,通常建議應用開發時使用這個類型。

一旦綁定了epump線程,就可能當即產生可讀事件,並驅動回調函數來處理。若是新的的pdev對象是由另一個工做線程來處理時,上述這個例子中就可能出現打印語句還沒結束,該新鏈接設備對象可讀事件的回調函數就已經執行了。在商業級系統開發過程當中,調用本函數接受客戶端新鏈接並建立新的設備對象pdev後,須要作不少跟鏈接設備對象相關聯的數據結構的初始化工做,在這些初始化操做完成以後,再調用 iodev_bind_epump 函數來綁定epump線程,因此,這種狀況下接受新鏈接時,通常不設置綁定關係,而是將第七個參數設置爲 BIND_NONE。

pdev = eptcp_accept(iodev_epcore(vobj), vobj, NULL, &ret, echo_pump, pcore, BIND_NONE);
    
    /* do some initialization of related objects, examples as following */
    /*  pcon = http_con_fetch(mgmt);
        pcon->pdev = pdev;
        iodev_para_set(pdev, (void *)pcon->conid);
 
        pcon->hl = hl;
 
        pcon->casetype = HTTP_SERVER;
        pcon->reqdiag = hl->reqdiag;
        pcon->reqdiagobj = hl->reqdiagobj;
 
        pcon->ssl_link = hl->ssl_link;
 
        str_cpy(pcon->srcip, iodev_rip(pcon->pdev));
        str_cpy(pcon->dstip, iodev_lip(pcon->pdev));
        pcon->srcport = iodev_rport(pcon->pdev);
        pcon->dstport = iodev_lport(pcon->pdev);
        pcon->createtime = time(&pcon->stamp);
        pcon->transbgn = pcon->stamp;*/

    iodev_bind_epump(pdev, BIND_CURRENT_EPUMP, NULL);

九. 讀取客戶端的TCP請求數據

創建好TCP鏈接以後,客戶端會發送數據到服務器,ePump框架中對全部socket文件描述符設置成了非阻塞模式,數據到達本機時,內核會產生可讀事件,由ePump框架驅動回調函數來處理數據讀操做。

case IOE_READ:
        ret = tcp_nb_recv(iodev_fd(vobj), rcvbuf, sizeof(rcvbuf), &num);
        if (ret < 0) {
            printf("Client %s:%d close the connection while receiving, epump: %lu\n",
                   iodev_rip(vobj), iodev_rport(vobj), epumpid(iodev_epump(vobj)) );
            iodev_close(vobj);
            return -100;
        }
 
        ret = tcp_nb_send(iodev_fd(vobj), rcvbuf, num, &sndnum);
        if (ret < 0) {
            printf("Client %s:%d close the connection while sending, epump: %lu\n",
                   iodev_rip(vobj), iodev_rport(vobj), epumpid(iodev_epump(vobj)));
            iodev_close(vobj);
            return -100;
        }
        break;

採用非阻塞模式的讀數據函數,讀取客戶端請求內容。這個讀函數 tcp_nb_recv 是在adif基礎庫中實現的,調用系統調用read並一直讀到出現 EAGAIN 錯誤爲止,表示這次可讀事件的全部數據都被讀完。開發者須要注意的是,在回調函數中處理ePump框架的可讀事件時,必定要將全部的位於內核緩衝區中的數據讀取完,不建議讀一部分數據、留一部分數據。

因爲本sample程序實現的是echo回彈功能,讀取了客戶端多少數據,就返回客戶端多少數據。因此當即使用 tcp_nb_send 函數發送這些數據到客戶端。

十. 定時器超時事件回調處理

本例中示範了定時器的啓動和超時處理,當定時器給定的時間逝去後,會產生TIMEOUT事件,並驅動回調函數來處理。ePump框架的定時器實例對象存活週期僅僅是在建立定時器到超時處理完成這段時間,即ePump框架的定時器是一次性的,超時處理完後,系統會自動銷燬該定時器對象。對於循環定時器,須要在處理超時事件時,從新啓動新的定時器實例。

case IOE_TIMEOUT:
        cmdid = iotimer_cmdid(vobj);
        if (cmdid == 1001) { 
            printf("\nThreadID=%lu IOTimerID=%lu EPumpID=%lu timeout, curtick=%lu\n",
                   get_threadid(), iotimer_id(vobj), 
                   epumpid(iotimer_epump(vobj)), time(0));
            epcore_print(pcore);
            iotimer_start(pcore, 90*1000, 1001, NULL, echo_pump, pcore);
        }
        break;

定時器的用例很是普遍,開發人員能夠根據實際需求來使用該功能。

十一. 完整的具備echo功能的TCP服務器代碼

以上詳細介紹瞭如何運用ePump框架實現一個完整的具備echo回彈功能的TCP服務器,代碼詳細以下:

/*
 * Copyright (c) 2003-2021 Ke Hengzhong <kehengzhong@hotmail.com>
 * All rights reserved.
 */
 
#include "adifall.ext"
#include <signal.h>
#include "epump.h"
 
epcore_t  * gpcore = NULL;
 
int echo_pump (void * vpcore, void * vobj, int event, int fdtype);
 
 
static void signal_handler(int sig)
{
    switch(sig) {
    case SIGHUP:
        printf("hangup signal catched\n");
        break;
    case SIGTERM:
    case SIGKILL:
    case SIGINT:
        printf("terminate signal catched, now exiting...\n");
        epcore_stop_epump(gpcore);
        epcore_stop_worker(gpcore);
        usleep(1000);
        break;
    }
}
 
 
int main (int argc, char ** argv)
{
    epcore_t  * pcore = NULL;
    void      * mlisten = NULL;
    int         listenport = 8080;
 
    signal(SIGCHLD, SIG_IGN); /* ignore child */
    signal(SIGTSTP, SIG_IGN); /* ignore tty signals */
    signal(SIGTTOU, SIG_IGN);
    signal(SIGPIPE, SIG_IGN);
    signal(SIGTTIN, SIG_IGN);
    signal(SIGHUP,  signal_handler); /* catch hangup signal */
    signal(SIGTERM, signal_handler); /* catch kill signal */
    signal(SIGINT, signal_handler); /* catch SIGINT signal */
 
    gpcore = pcore = epcore_new(65536, 1);
 
    /* do some initialization */
    mlisten = eptcp_mlisten(pcore, NULL, listenport, NULL, echo_pump, pcore);
    if (!mlisten) goto exit;
 
    printf("EchoSrv TCP Port: %d being listened\n\n", listenport);
 
    iotimer_start(pcore, 90*1000, 1001, NULL, echo_pump, pcore);
 
    /* start 2 worker threads */
    epcore_start_worker(pcore, 2);
 
    /* start 1 epump threads */
    epcore_start_epump(pcore, 1);
 
    /* main thread executing the epump_main_proc as an epump thread */
    epump_main_start(pcore, 0);
 
    epcore_clean(pcore);
 
    printf("main thread exited\n");
    return 0;
 
exit:
    epcore_clean(pcore);
    printf("main thread exception exited\n");
    return -1;
}
 
 
int echo_pump (void * vpcore, void * vobj, int event, int fdtype)
{
    epcore_t  * pcore = (epcore_t *)vpcore;
    iodev_t   * pdev = NULL;
    int         cmdid;
    int         ret = 0, sndnum = 0;
    char        rcvbuf[2048];
    int         num = 0;
 
    switch (event) {
    case IOE_ACCEPT:
        if (fdtype != FDT_LISTEN)
            return -1;
 
        while (1) {
            pdev = eptcp_accept(iodev_epcore(vobj), vobj, NULL, &ret,
                                 echo_pump, pcore, BIND_ONE_EPUMP);
            if (!pdev) break;
 
            printf("\nThreadID=%lu, ListenFD=%d EPumpID=%lu WorkerID=%lu "
               " ==> Accept NewFD=%d EPumpID=%lu\n",
               get_threadid(), iodev_fd(vobj), epumpid(iodev_epump(vobj)),
               workerid(worker_thread_self(pcore)),
               iodev_fd(pdev), epumpid(iodev_epump(pdev)));
        }
        break;
 
    case IOE_READ:
        ret = tcp_nb_recv(iodev_fd(vobj), rcvbuf, sizeof(rcvbuf), &num);
        if (ret < 0) {
            printf("Client %s:%d close the connection while receiving, epump: %lu\n",
                   iodev_rip(vobj), iodev_rport(vobj), epumpid(iodev_epump(vobj)) );
            iodev_close(vobj);
            return -100;
        }
 
        printf("\nThreadID=%lu FD=%d EPumpID=%lu WorkerID=%lu Recv %d bytes from %s:%d\n",
               get_threadid(), iodev_fd(vobj), epumpid(iodev_epump(vobj)),
               workerid(worker_thread_self(pcore)),
               num, iodev_rip(vobj), iodev_rport(vobj));
        printOctet(stderr, rcvbuf, 0, num, 2);
 
        ret = tcp_nb_send(iodev_fd(vobj), rcvbuf, num, &sndnum);
        if (ret < 0) {
            printf("Client %s:%d close the connection while sending, epump: %lu\n",
                   iodev_rip(vobj), iodev_rport(vobj), epumpid(iodev_epump(vobj)));
            iodev_close(vobj);
            return -100;
        }
        break;
 
    case IOE_WRITE:
    case IOE_CONNECTED:
        break;
 
    case IOE_TIMEOUT:
        cmdid = iotimer_cmdid(vobj);
        if (cmdid == 1001) { 
            printf("\nThreadID=%lu IOTimerID=%lu EPumpID=%lu timeout, curtick=%lu\n",
                   get_threadid(), iotimer_id(vobj), 
                   epumpid(iotimer_epump(vobj)), time(0));
            epcore_print(pcore);
            iotimer_start(pcore, 90*1000, 1001, NULL, echo_pump, pcore);
        }
        break;
 
    case IOE_INVALID_DEV:
        break;
 
    default:
        break;
    }
 
    printf("ThreadID=%lu event: %d  fdtype: %d  WorkerID=%lu\n\n",
            get_threadid(), event, fdtype,
            workerid(worker_thread_self(pcore)));
 
    return 0;
}

這個示例中使用大量的多餘的打印代碼,看起沒那麼美觀,有潔癖的程序員能夠去掉。

使用gcc編譯以上代碼的命令以下:

gcc -g -O3 -Wall -DUNIX -I/usr/local/include -I/usr/local/include/adif -L/usr/local/lib -lm -lpthread -ladif -lepump echosrv.c -o echosrv

編譯完成後你們執行並調試,享受編程樂趣。

十二. 使用ePump框架開發高性能程序總結

以上用一個TCP服務器程序來展現如何使用ePump框架進行編程的實例,管中窺豹,以一律全,感興趣的程序員能夠下載和體驗。

使用ePump框架最成功的案例是eJet Web服務器開源項目,這是一個輕量級、高性能、嵌入式Web服務器,各項功能不遜於Nginx。研究這個項目能夠有助於理解ePump框架的工做原理。

簡單總結ePump框架的功能特色:

  • ePump框架封裝了不少瑣碎的容易出錯誤的細節,讓開發人員將更多時間花在業務處理上;
  • 將複雜的各個操做系統都互不兼容的多路複用技術封裝後,提供了標準的接口給程序員,大大節省了應用開發週期;
  • 高效利用事件驅動、多線程調度機制來實現多核CPU的並行運算能力;
  • 使用ePump開發高性能程序,代碼簡單幹練,可靠性高;
  • 對IPv六、DNS等頭提供了支持;
  • ......太多了,寫不下去了,你們補充吧

2021年2月17 春節期間 寫於北京

相關文章
相關標籤/搜索