redis的通訊模塊封裝得很是簡單易用,能夠直接用到本身項目中,學習下也是頗有價值的。git
本文基於redis源碼4.0.1寫成,redis源碼下載:github.com/antirez/red…github
redis的網絡通訊模塊由8個文件構成 redis
文件 | 做用 |
---|---|
ae.c | 統一epoll、evport、kqueue、select網絡事件處理接口, 函數實現 |
ae.h | 統一epoll、evport、kqueue、select網絡事件處理接口,函數原型,共享結構體定義 |
ae_epoll.c | 封裝epoll網絡事件處理庫到統一的接口 |
ae_evport.c | 封裝evport網絡事件處理庫到統一的接口 |
ae_kqueue.c | 封裝kqueue網絡事件處理庫到統一的接口 |
ae_select.c | 封裝select網絡事件處理庫到統一的接口 |
被統一的網絡事件處理接口以下,參考:ae_epoll.c, ae_evport.c, ae_kqueue.c, ae_select.cshell
接口 | 做用 |
---|---|
aeApiState | 底層網絡庫須要的fd、事件等數據共享結構體 |
aeApiCreate | 建立網絡句柄,好比epoll句柄,在這個基礎上才能進行網絡事件的監聽處理 |
aeApiResize | 修改網絡庫存放事件的容器大小,就是修改aeApiState結構體的events數組的大小 |
aeApiFree | 刪除aeApiState這個共享結構體 |
aeApiAddEvent | 將網絡fd的讀寫操做交給網絡庫進行處理,好比給這個epoll進行處理 |
aeApiDelEvent | 從網絡操做庫中刪除對某個fd的監聽,通常服務器往客戶端寫完數據後,主動斷開客戶端鏈接時會使用 |
aeApiPoll | 輪詢獲取網絡正在發生io讀寫事件的事件 |
aeApiName | 獲取底層網絡庫的名字,好比epool,kqueue等 |
參考 ae.h, ae.c,封裝通用的結構體和函數api
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
複製代碼
這個結構體封裝了網絡數據讀寫事件的處理函數原型,和客戶端傳過來的數據。數組
aeFileProc是處理函數的原型的宏定義,宏定義參考以下:服務器
typedef void aeFileProc( struct aeEventLoop *eventLoop, int fd, void *clientData, int mask );
複製代碼
實際使用時,網絡讀寫事件的函數處理須要自行編寫網絡
/* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *next;
} aeTimeEvent;
複製代碼
這個定時事件是redis用來處理後臺定時任務使用的,好比處理客戶端鏈接超時,服務端斷開操做socket
/* A fired event */
typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent;
複製代碼
就是正在等待讀、正在等待寫的網絡fd,通常以一個數組出現tcp
mask字段表示是等待讀,仍是等待寫,對應的值以下
#define AE_READABLE 1
#define AE_WRITABLE 2
複製代碼
/* State of an event based program */
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
time_t lastTime; /* Used to detect system clock skew */
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
} aeEventLoop;
複製代碼
核心字段以下 * 事件列表*events, index表示fd, 表示網絡fd的讀寫事件發生時,執行的回調函數 * *fired 正在等待讀和寫的fd列表,經過輪詢獲得,好比epoll經過epoll_wait獲得的 * beforesleep 在epoll_wait前執行 * aftersleep 在epoll_wait後執行
主要是對aeEventLoop結構體進行操做,這裏拿幾個核心的函數舉例
這個函數主要是建立aeEventLoop結構體,並根據setsize參數給events, fired分配內容空間 而後再調用aeApiCreate驅動底層的網絡庫,好比epoll的epoll_create(),而後保存到aeEventLoop的apidata屬性上
主要是啓動無限循環、輪詢處理活躍的fd,若是epoll,底層則調用epoll_wait獲取可讀寫的fd:ae.c::aeMain() -> ae.c::aeProcessEvents() -> ae_epoll.c::aeApiPool() -> ae_epoll.c::epoll_wait()
註冊網絡fd到底層網絡庫上,並註冊當讀寫事件發生時,須要執行的業務回調函數 拿epoll舉例,調用到epoll通過的步驟:ae.c::aeCreateFileEvent() -> ae_epoll.c::aeApiAddEvent() -> epoll_ctl()
主要是封裝socket操做,屏蔽系統底層socket操做的差別性,提供更好用的api,好比建立tcp server等
這裏就拿建立tcp server來看看它的流程
anet.c::anetTcpServer() -> anet.c::_anetTcpServer() -> <sys/socket.h>::socket() -> anet.c::anetListen() -> <sys/socket.h>::listen()
核心代碼就是調用系統socket庫的listen函數創建起了一個tcp的server
這裏使用ae.h和anet.h來建立一個簡單的tcp服務器,並把給客戶端傳回一個"Hello World"
代碼以下:
#include <stdio.h>
#include <zconf.h>
#include "../src/ae.h"
#include "../src/anet.h"
char myerr[ANET_ERR_LEN] = {0};
void acceptFd(struct aeEventLoop *eventLoop, int fd, void *clientdata, int mask) {
char myerr[ANET_ERR_LEN] = {0};
printf("獲取客戶端鏈接的fd.\n");
char ip[20] = {0};
int port = 0;
int clientfd = anetTcpAccept(myerr, fd, ip, sizeof(ip), &port);
if (clientfd == AE_ERR) {
printf("獲取客戶端鏈接的fd異常!! \n");
return;
}
printf("客戶端ip %s port %d \n", ip, port);
int ret = anetNonBlock(myerr, clientfd);
if (ret == ANET_OK) {
printf("客戶端事件非阻塞處理設置成功\n\n");
}
anetEnableTcpNoDelay(myerr, clientfd);
write(clientfd,"Hello Client!\n",14);
}
int main() {
aeEventLoop *eventLoop = aeCreateEventLoop(1024);
if(!eventLoop){
return 1;
}
int fd = anetTcpServer(myerr, 8080, "0.0.0.0", 511);
if (fd != ANET_ERR) {
anetNonBlock(NULL, fd);
if (aeCreateFileEvent(eventLoop, fd, AE_READABLE, acceptFd, NULL)) {
printf("註冊tcp服務器接收客戶端鏈接的事件處理函數異常");
}
}
printf("服務器在 0.0.0.0:8080 端口監聽了! \n");
aeMain(eventLoop);
aeDeleteEventLoop(eventLoop);
return 0;
}
複製代碼
而後進行編譯並啓動,編譯可使用gcc,效果以下
gcc -o myserver src/ae.c src/anet.c src/zmalloc.c myserver.c
複製代碼
可使用telnet進行測試
Mac OS 10.14.5下若是沒法找到c的標準庫,須要安裝這個文件:/Library/Developer/CommandLineTools/Packages/macOS_SDK_headers_for_macOS_10.14.pkg