redis網絡通訊模塊設計與實現分析

redis的通訊模塊封裝得很是簡單易用,能夠直接用到本身項目中,學習下也是頗有價值的。git

本文基於redis源碼4.0.1寫成,redis源碼下載:github.com/antirez/red…github

文件結構

redis的網絡通訊模塊由8個文件構成 redis

做用以下

文件 做用
ae.c 統一epoll、evport、kqueue、select網絡事件處理接口, 函數實現
ae.h 統一epoll、evport、kqueue、select網絡事件處理接口,函數原型,共享結構體定義
ae_epoll.c 封裝epoll網絡事件處理庫到統一的接口
ae_evport.c 封裝evport網絡事件處理庫到統一的接口
ae_kqueue.c 封裝kqueue網絡事件處理庫到統一的接口
ae_select.c 封裝select網絡事件處理庫到統一的接口

統一網絡庫底層接口

被統一的網絡事件處理接口以下,參考:ae_epoll.c, ae_evport.c, ae_kqueue.c, ae_select.cshell

接口 做用
aeApiState 底層網絡庫須要的fd、事件等數據共享結構體
aeApiCreate 建立網絡句柄,好比epoll句柄,在這個基礎上才能進行網絡事件的監聽處理
aeApiResize 修改網絡庫存放事件的容器大小,就是修改aeApiState結構體的events數組的大小
aeApiFree 刪除aeApiState這個共享結構體
aeApiAddEvent 將網絡fd的讀寫操做交給網絡庫進行處理,好比給這個epoll進行處理
aeApiDelEvent 從網絡操做庫中刪除對某個fd的監聽,通常服務器往客戶端寫完數據後,主動斷開客戶端鏈接時會使用
aeApiPoll 輪詢獲取網絡正在發生io讀寫事件的事件
aeApiName 獲取底層網絡庫的名字,好比epool,kqueue等

統一網絡庫上層接口

參考 ae.h, ae.c,封裝通用的結構體和函數api

網絡讀寫事件aeFileEvent結構體

/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;
複製代碼

這個結構體封裝了網絡數據讀寫事件的處理函數原型,和客戶端傳過來的數據。數組

aeFileProc是處理函數的原型的宏定義,宏定義參考以下:服務器

typedef void aeFileProc( struct aeEventLoop *eventLoop, int fd, void *clientData, int mask );
複製代碼

實際使用時,網絡讀寫事件的函數處理須要自行編寫網絡

定時事件aeTimeEvent結構體

/* Time event structure */
typedef struct aeTimeEvent {
    long long id; /* time event identifier. */
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */
    aeTimeProc *timeProc;
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *next;
} aeTimeEvent;
複製代碼

這個定時事件是redis用來處理後臺定時任務使用的,好比處理客戶端鏈接超時,服務端斷開操做socket

活躍的fd

/* A fired event */
typedef struct aeFiredEvent {
    int fd;
    int mask;
} aeFiredEvent;
複製代碼

就是正在等待讀、正在等待寫的網絡fd,通常以一個數組出現tcp

mask字段表示是等待讀,仍是等待寫,對應的值以下

#define AE_READABLE 1
#define AE_WRITABLE 2
複製代碼

網絡庫上層接口核心結構體aeEventLoop

/* State of an event based program */
typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    time_t lastTime;     /* Used to detect system clock skew */
    aeFileEvent *events; /* Registered events */
    aeFiredEvent *fired; /* Fired events */
    aeTimeEvent *timeEventHead;
    int stop;
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
    aeBeforeSleepProc *aftersleep;
} aeEventLoop;
複製代碼

核心字段以下 * 事件列表*events, index表示fd, 表示網絡fd的讀寫事件發生時,執行的回調函數 * *fired 正在等待讀和寫的fd列表,經過輪詢獲得,好比epoll經過epoll_wait獲得的 * beforesleep 在epoll_wait前執行 * aftersleep 在epoll_wait後執行

操做aeEventLoop相關函數

主要是對aeEventLoop結構體進行操做,這裏拿幾個核心的函數舉例

aeCreateEventLoop()

這個函數主要是建立aeEventLoop結構體,並根據setsize參數給events, fired分配內容空間 而後再調用aeApiCreate驅動底層的網絡庫,好比epoll的epoll_create(),而後保存到aeEventLoop的apidata屬性上

aeMain()

主要是啓動無限循環、輪詢處理活躍的fd,若是epoll,底層則調用epoll_wait獲取可讀寫的fd:ae.c::aeMain() -> ae.c::aeProcessEvents() -> ae_epoll.c::aeApiPool() -> ae_epoll.c::epoll_wait()

aeCeateFileEvent()

註冊網絡fd到底層網絡庫上,並註冊當讀寫事件發生時,須要執行的業務回調函數 拿epoll舉例,調用到epoll通過的步驟:ae.c::aeCreateFileEvent() -> ae_epoll.c::aeApiAddEvent() -> epoll_ctl()

anet.h & anet.c

主要是封裝socket操做,屏蔽系統底層socket操做的差別性,提供更好用的api,好比建立tcp server等

這裏就拿建立tcp server來看看它的流程

anetTcpServer()

anet.c::anetTcpServer() -> anet.c::_anetTcpServer() -> <sys/socket.h>::socket() -> anet.c::anetListen() -> <sys/socket.h>::listen()

核心代碼就是調用系統socket庫的listen函數創建起了一個tcp的server

示例

這裏使用ae.h和anet.h來建立一個簡單的tcp服務器,並把給客戶端傳回一個"Hello World"

代碼以下:

#include <stdio.h>
#include <zconf.h>
#include "../src/ae.h"
#include "../src/anet.h"

char myerr[ANET_ERR_LEN] = {0};
void acceptFd(struct aeEventLoop *eventLoop, int fd, void *clientdata, int mask) {
    char myerr[ANET_ERR_LEN] = {0};
    printf("獲取客戶端鏈接的fd.\n");
    char ip[20] = {0};
    int port = 0;
    int clientfd = anetTcpAccept(myerr, fd, ip, sizeof(ip), &port);

    if (clientfd == AE_ERR) {
        printf("獲取客戶端鏈接的fd異常!! \n");
        return;
    }

    printf("客戶端ip %s port %d \n", ip, port);

    int ret = anetNonBlock(myerr, clientfd);
    if (ret == ANET_OK) {
        printf("客戶端事件非阻塞處理設置成功\n\n");
    }
    anetEnableTcpNoDelay(myerr, clientfd);
    write(clientfd,"Hello Client!\n",14);
}

int main() {
    aeEventLoop *eventLoop = aeCreateEventLoop(1024);
    if(!eventLoop){
        return 1;
    }
    int fd = anetTcpServer(myerr, 8080, "0.0.0.0", 511);

    if (fd != ANET_ERR) {
        anetNonBlock(NULL, fd);
        if (aeCreateFileEvent(eventLoop, fd, AE_READABLE, acceptFd, NULL)) {
            printf("註冊tcp服務器接收客戶端鏈接的事件處理函數異常");
        }
    }

    printf("服務器在 0.0.0.0:8080 端口監聽了! \n");
    aeMain(eventLoop);
    aeDeleteEventLoop(eventLoop);
    return 0;
}
複製代碼

而後進行編譯並啓動,編譯可使用gcc,效果以下

gcc -o myserver src/ae.c src/anet.c src/zmalloc.c myserver.c
複製代碼

可使用telnet進行測試

能夠看到客戶端已經成功收到服務器傳回的Hello World了

一些注意的點

Mac OS 10.14.5下若是沒法找到c的標準庫,須要安裝這個文件:/Library/Developer/CommandLineTools/Packages/macOS_SDK_headers_for_macOS_10.14.pkg

參考資料

  1. blog.51cto.com/leejia/1021…
  2. blog.csdn.net/yusiguyuan/…
相關文章
相關標籤/搜索