最近在看 UNIX 網絡編程並研究了一下 Redis 的實現, Redis 的源代碼十分適合閱讀和分析,其中 I/O 多路複用(mutiplexing)部分的實現很是乾淨和優雅,在這裏想對這部分的內容進行簡單的整理。(文末有彩蛋!)redis
爲何 Redis 中要使用 I/O 多路複用這種技術呢?編程
首先,Redis 是跑在單線程中的,全部的操做都是按照順序線性執行的,可是因爲讀寫操做等待用戶輸入或輸出都是阻塞的,因此 I/O 操做在通常狀況下每每不能直接返回,這會致使某一文件的 I/O 阻塞致使整個進程沒法對其它客戶提供服務,而 I/O 多路複用就是爲了解決這個問題而出現的。設計模式
先來看一下傳統的阻塞 I/O 模型究竟是如何工做的:當使用 read
或者 write
對某一個**文件描述符(File Descriptor 如下簡稱 FD)**進行讀寫時,若是當前 FD 不可讀或不可寫,整個 Redis 服務就不會對其它的操做做出響應,致使整個服務不可用。api
這也就是傳統意義上的,也就是咱們在編程中使用最多的阻塞模型:數組
阻塞模型雖然開發中很是常見也很是易於理解,可是因爲它會影響其餘 FD 對應的服務,因此在須要處理多個客戶端任務的時候,每每都不會使用阻塞模型。bash
雖然還有不少其它的 I/O 模型,可是在這裏都不會具體介紹。網絡
阻塞式的 I/O 模型並不能知足這裏的需求,咱們須要一種效率更高的 I/O 模型來支撐 Redis 的多個客戶(redis-cli),這裏涉及的就是 I/O 多路複用模型了:函數
在 I/O 多路複用模型中,最重要的函數調用就是 select
,該方法的可以同時監控多個文件描述符的可讀可寫狀況,當其中的某些文件描述符可讀或者可寫時,select
方法就會返回可讀以及可寫的文件描述符個數。oop
關於
select
的具體使用方法,在網絡上資料不少,這裏就不過多展開介紹了;性能與此同時也有其它的 I/O 多路複用函數
epoll/kqueue/evport
,它們相比select
性能更優秀,同時也能支撐更多的服務。
Redis 服務採用 Reactor 的方式來實現文件事件處理器(每個網絡鏈接其實都對應一個文件描述符)
文件事件處理器使用 I/O 多路複用模塊同時監聽多個 FD,當 accept
、read
、write
和 close
文件事件產生時,文件事件處理器就會回調 FD 綁定的事件處理器。
雖然整個文件事件處理器是在單線程上運行的,可是經過 I/O 多路複用模塊的引入,實現了同時對多個 FD 讀寫的監控,提升了網絡通訊模型的性能,同時也能夠保證整個 Redis 服務實現的簡單。
I/O 多路複用模塊封裝了底層的 select
、epoll
、avport
以及 kqueue
這些 I/O 多路複用函數,爲上層提供了相同的接口。
在這裏咱們簡單介紹 Redis 是如何包裝 select
和 epoll
的,簡要了解該模塊的功能,整個 I/O 多路複用模塊抹平了不一樣平臺上 I/O 多路複用函數的差別性,提供了相同的接口:
static int aeApiCreate(aeEventLoop *eventLoop)
static int aeApiResize(aeEventLoop *eventLoop, int setsize)
static void aeApiFree(aeEventLoop *eventLoop)
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask)
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)
同時,由於各個函數所須要的參數不一樣,咱們在每個子模塊內部經過一個 aeApiState
來存儲須要的上下文信息:
// select
typedef struct aeApiState {
fd_set rfds, wfds;
fd_set _rfds, _wfds;
} aeApiState;
// epoll
typedef struct aeApiState {
int epfd;
struct epoll_event *events;
} aeApiState;
複製代碼
這些上下文信息會存儲在 eventLoop
的 void *state
中,不會暴露到上層,只在當前子模塊中使用。
select
能夠監控 FD 的可讀、可寫以及出現錯誤的狀況。
在介紹 I/O 多路複用模塊如何對 select
函數封裝以前,先來看一下 select
函數使用的大體流程:
int fd = /* file descriptor */
fd_set rfds;
FD_ZERO(&rfds);
FD_SET(fd, &rfds)
for ( ; ; ) {
select(fd+1, &rfds, NULL, NULL, NULL);
if (FD_ISSET(fd, &rfds)) {
/* file descriptor `fd` becomes readable */
}
}
複製代碼
fd_set
集合,保存須要監控可讀性的 FD;FD_SET
將 fd
加入 rfds
;select
方法監控 rfds
中的 FD 是否可讀;select
返回時,檢查 FD 的狀態並完成對應的操做。而在 Redis 的 ae_select
文件中代碼的組織順序也是差很少的,首先在 aeApiCreate
函數中初始化 rfds
和 wfds
:
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
FD_ZERO(&state->rfds);
FD_ZERO(&state->wfds);
eventLoop->apidata = state;
return 0;
}
複製代碼
而 aeApiAddEvent
和 aeApiDelEvent
會經過 FD_SET
和 FD_CLR
修改 fd_set
中對應 FD 的標誌位:
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
if (mask & AE_READABLE) FD_SET(fd,&state->rfds);
if (mask & AE_WRITABLE) FD_SET(fd,&state->wfds);
return 0;
}
複製代碼
整個 ae_select
子模塊中最重要的函數就是 aeApiPoll
,它是實際調用 select
函數的部分,其做用就是在 I/O 多路複用函數返回時,將對應的 FD 加入 aeEventLoop
的 fired
數組中,並返回事件的個數:
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, j, numevents = 0;
memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));
memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));
retval = select(eventLoop->maxfd+1,
&state->_rfds,&state->_wfds,NULL,tvp);
if (retval > 0) {
for (j = 0; j <= eventLoop->maxfd; j++) {
int mask = 0;
aeFileEvent *fe = &eventLoop->events[j];
if (fe->mask == AE_NONE) continue;
if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds))
mask |= AE_READABLE;
if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds))
mask |= AE_WRITABLE;
eventLoop->fired[numevents].fd = j;
eventLoop->fired[numevents].mask = mask;
numevents++;
}
}
return numevents;
}
複製代碼
Redis 對 epoll
的封裝其實也是相似的,使用 epoll_create
建立 epoll
中使用的 epfd
:
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
eventLoop->apidata = state;
return 0;
}
複製代碼
在 aeApiAddEvent
中使用 epoll_ctl
向 epfd
中添加須要監控的 FD 以及監聽的事件:
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
複製代碼
因爲 epoll
相比 select
機制略有不一樣,在 epoll_wait
函數返回時並不須要遍歷全部的 FD 查看讀寫狀況;在 epoll_wait
函數返回時會提供一個 epoll_event
數組:
typedef union epoll_data {
void *ptr;
int fd; /* 文件描述符 */
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll 事件 */
epoll_data_t data;
};
複製代碼
其中保存了發生的
epoll
事件(EPOLLIN
、EPOLLOUT
、EPOLLERR
和EPOLLHUP
)以及發生該事件的 FD。
aeApiPoll
函數只須要將 epoll_event
數組中存儲的信息加入 eventLoop
的 fired
數組中,將信息傳遞給上層模塊:
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
複製代碼
由於 Redis 須要在多個平臺上運行,同時爲了最大化執行的效率與性能,因此會根據編譯平臺的不一樣選擇不一樣的 I/O 多路複用函數做爲子模塊,提供給上層統一的接口;在 Redis 中,咱們經過宏定義的使用,合理的選擇不一樣的子模塊:
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
複製代碼
由於 select
函數是做爲 POSIX 標準中的系統調用,在不一樣版本的操做系統上都會實現,因此將其做爲保底方案:
Redis 會優先選擇時間複雜度爲 的 I/O 多路複用函數做爲底層實現,包括 Solaries 10 中的 evport
、Linux 中的 epoll
和 macOS/FreeBSD 中的 kqueue
,上述的這些函數都使用了內核內部的結構,而且可以服務幾十萬的文件描述符。
可是若是當前編譯環境沒有上述函數,就會選擇 select
做爲備選方案,因爲其在使用時會掃描所有監聽的描述符,因此其時間複雜度較差 ,而且只能同時服務 1024 個文件描述符,因此通常並不會以 select
做爲第一方案使用。
Redis 對於 I/O 多路複用模塊的設計很是簡潔,經過宏保證了 I/O 多路複用模塊在不一樣平臺上都有着優異的性能,將不一樣的 I/O 多路複用函數封裝成相同的 API 提供給上層使用。
整個模塊使 Redis 能以單進程運行的同時服務成千上萬個文件描述符,避免了因爲多進程應用的引入致使代碼實現複雜度的提高,減小了出錯的可能性。