每一個cs程序尤爲是高併發的網絡服務端程序都有本身的網絡異步事件處理庫,redis不例外。html
事件庫僅僅包括ae.c、ae.h,還有3個不一樣的多路複用(本文僅描述epoll)的wrapper文件,事件庫封裝了框架調用的主循環函數,暴露了時間、文件事件註冊和銷燬函數,典型的依賴反轉模式。
網絡操做都在networking.c裏,封裝了常見的socket操做。mysql
咱們從redis啓動的main函數開始,從用戶發出鏈接鍵入命令開始遍歷網絡事件庫所涉及的函數,unix套接口相關函數不表。redis
首先對幾個最經常使用對象進行解釋。sql
//redis啓動的時候(init_server())會建立的一個全局使用的事件循環結構 typedef struct aeEventLoop { int maxfd; //僅僅是select 使用 long long timeEventNextId; aeFileEvent events[AE_SETSIZE]; //用於保存epoll須要關注的文件事件的fd、觸發條件、註冊函數。 aeFiredEvent fired[AE_SETSIZE]; //epoll_wait以後得到可讀或者可寫的fd數組,經過aeFiredEvent->fd再定位到events。 aeTimeEvent *timeEventHead; //以鏈表形式保存多個時間事件,每隔一段時間機會觸發註冊的函數。 int stop; void *apidata; //每種多路複用方法的使用的私有變量,例如epoll就是epfd和一個事件數組;而select是保存rset、wset。 aeBeforeSleepProc *beforesleep; // sleep以前調用的函數,有些事情是每次循環必須作的,並不是文件、時間事件。 } aeEventLoop; //文件可讀寫事件 typedef struct aeFileEvent { int mask; //觸發條件:讀、寫 aeFileProc *rfileProc; //當fd可讀時執行的事件(accept,read) aeFileProc *wfileProc; //當fd可寫時執行的事件(write) void *clientData; //caller 傳入的數據指針 } aeFileEvent; //超時時間事件 typedef struct aeTimeEvent { long long id; /* time event identifier. */ long when_sec; /* seconds */ long when_ms; /* milliseconds */ aeTimeProc *timeProc; //當出現超時的時候所執行的事件 aeEventFinalizerProc *finalizerProc; void *clientData; //caller傳入的數據指針 struct aeTimeEvent *next; //單鏈表指向下一個時間事件 } aeTimeEvent; //從epoll_wait或者select返回的,已經觸發的文件事件 typedef struct aeFiredEvent { int fd; int mask; } aeFiredEvent;
咱們來模擬redis server 啓動和用戶鍵入命令的過程,先上圖。api
好吧,從main函數開始吧。數組
// src/redis.c 853 server.el = aeCreateEventLoop();
建立一個aeEventLoop結構體,成員apidata指向一個aeApiState對象,若是使用epoll,fd是由epoll_create建立的全局epoll fd ,events[] 用於保存epoll_wait返回的事件組。網絡
// src/redis.c 866 server.ipfd = anetTcpServer(server.neterr,server.port,server.bindaddr);
建立監聽。併發
// src/redis.c 903 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
註冊一個時間事件serverCron,做用之後再講。app
// src/redis.c 906 aeCreateFileEvent(server.el,server.ipfd,AE_READABLE, acceptTcpHandler,NULL)
爲監聽fd的註冊一個文件事件,首先把listenfd和觸發條件epoll_ctl加入到全局的epoll fd進行監控。再以fd做爲文件事件event數組index定位,mask填入只讀,rfileProc填入acceptTcpHandler函數。框架
// src/redis.c 1571 aeSetBeforeSleepProc(server.el,beforeSleep);
aeEventLoop註冊一個beforeSleep函數,這個函數在主循環裏每次會被調用,做用之後再講。
// src/ redis.c 1572 aeMain()
網絡事件庫的核心循環函數。
// src/ae.c 379 aeCreateLoop->beforesleep()
就是上面註冊的beforeSleep函數。
// src/ae.c 275 aeProcessEvents(eventLoop, AE_ALL_EVENTS);
先調度aeApiPoll,用epoll_wait處理文件事件,返回的fd和觸發條件先存儲在eventLoop->fired[]裏,而後根據fired[]每一個事件的的fd,定位到events,根據觸發條件調用已經註冊的事件。
文件事件處理完畢後,接下來就是處理超時時間事件,這裏不表。
假若有個用戶鏈接上redis,則從redis servere就從上面的aeApiPoll的poll_wait返回,產生的只讀事件會調度上面註冊了acceptTcpHandler函數。
// src/networking.c 390 acceptTcpHandler(eventLoop, fd, NULL, AE_READABLE)
accept返回產生了一個新鏈接的fd(這裏省略了不少步驟),須要重新的鏈接讀取數據,因而爲這個fd註冊可讀事件。
// src/networking.c 20 aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c)
因而調用了aeCreateFileEvent 爲只讀事件註冊, 這裏的步驟和上面的listen fd 同樣。用epoll_ctl將fd加入到epoll fd裏等待下次epoll_wait,再註冊觸發條件和readQueyFromClient函數,接着aeMain()繼續執行等待用戶的數據。
假如用戶在這個鏈接鍵入redis命令例如:set foo bar,redis server端將會從aeApiPoll的epoll_wait返回,和accept同樣的處理方式。返回的fd填入到fired[]數組,經過fired[fd]->fd找到是哪一個文件事件,找到events[fd]->rfielProc這個函數,就是上面註冊的readQueyFromClient 函數。
// src/networking.c 816 readQueyFromClient(server.el, fd, NULL, AE_READABLE)
這個函數首先會執行read從tcp buffer讀取用戶鍵入的命令(非阻塞io),而後處理buffer,找到對應的command(lookupCommand),接下來執行這個command(call(c,cmd)),命令執行完畢須要返回結果的時候,會再次註冊一個文件事件
// src/networking.c 71 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, sendReplyToClient, c)
這樣下次循環epoll_wait的時候就發現這個fd可寫,因而就會執行sendReplyToClient,講結果發送給client。
redis的這個網絡事件庫是比較標準的網絡框架的模式,實現的功能不算多但夠用。
[...] redis的網絡事件庫,咱們在前面的文章已經講過,readQueryFromClient先從fd中讀取數據,先存儲在c->querybuf裏(networking.c 823)。接下來函數processInputBuffer來解析querybuf,上面說過若是是telnet發送的裸協議數據是沒有*打頭的表示參數個數的輔助信息,針對telnet的數據跳到processInlineBuffer函數,而其餘則經過函數processMultibulkBuffer。 這兩個函數的做用同樣,解析c->querybuf的字符串,分解成多參數到c->argc和c->argv裏面,argc表示參數的個數,argv是個redis_object的指針數組,每一個指針指向一個redis_object, object的ptr裏存儲具體的內容,對於」get a「的請求轉化後,argc就是2,argv就是 [...]