衆所周知,Redis 服務器是一個事件驅動程序。那麼事件驅動對於 Redis 而言有什麼含義?源碼中又是如何實現事件驅動的呢?今天,咱們一塊兒來認識下 Redis 服務器的事件驅動。數據庫
對於 Redis 而言,服務器須要處理如下兩類事件:服務器
接下來,咱們先來認識下文件事件。網絡
Redis 基於 Reactor 模式開發了本身的網絡事件處理器,這個處理器被稱爲文件事件處理器(file event handler):數據結構
雖然文件處理器以單線程方式運行,但經過 IO 多路複用程序監聽多個套接字,既實現了高性能的網絡通訊模型,又能夠很好的與 Redis 服務器中其它一樣以單線程運行的模塊進行對接,保持了 Redis 內部單線程設計的簡潔。併發
圖 1 展現了文件事件處理器的四個組成部分:dom
文件事件是對套接字的抽象。每當一個套接字準備好執行鏈接應答(accept)、寫入、讀取、關閉等操做時,就好產生一個文件事件。由於一個服務器一般會鏈接多個套接字,因此多個文件事件有可能會併發的出現。socket
而 IO 多了複用程序負責監聽多個套接字,並向文件事件分派器分發那些產生事件的套接字。ide
儘管多個文件事件可能會併發的出現,但 IO 多路複用程序老是會將全部產生事件的套接字都放到一個隊列裏面,而後經過這個隊列,以有序、同步的方式,把每個套接字傳輸給文件事件分派器。當上一個套接字產生的事件被處理完畢以後(即,該套接字爲事件所關聯的事件處理器執行完畢),IO 多路複用程序纔會繼續向文件事件分派器傳送下一個套接字。如圖 2 所示:函數
文件事件分派器接收 IO 多路複用程序傳來的套接字,並根據套接字產生的事件類型,調用相應的事件處理器。
服務器會爲執行不一樣任務的套接字關聯不一樣的事件處理器。這些處理器本質上就是一個個函數。它們定義了某個事件發生時,服務器應該執行的動做。
Redis 的 IO 多路複用程序的全部功能都是經過包裝常見的 select、epoll、evport 和 kqueue 這些 IO 多路複用函數庫來實現的。每一個 IO 多路複用函數庫在 Redis 源碼中都對應一個單獨的文件,好比 ae_select.c、ae_poll.c、ae_kqueue.c 等。
因爲 Redis 爲每一個 IO 多路複用函數庫都實現了相同的 API,因此 IO 多路複用程序的底層實現是能夠互換的,如圖 3 所示:
Redis 在 IO 多路複用程序的實現源碼中用 #include
宏定義了相應的規則,**程序會在編譯時自動選擇系統中性能最高的 IO 多路複用函數庫來做爲 Redis 的 IO 多路複用程序的底層實現,這保證了 Redis 在各個平臺的兼容性和高性能。對應源碼以下:
/* Include the best multiplexing layer supported by this system. * The following should be ordered by performances, descending. */ #ifdef HAVE_EVPORT #include "ae_evport.c" #else #ifdef HAVE_EPOLL #include "ae_epoll.c" #else #ifdef HAVE_KQUEUE #include "ae_kqueue.c" #else #include "ae_select.c" #endif #endif #endif
IO 多路複用程序能夠監聽多個套接字的 ae.h/AE_READABLE
和 ae.h/AE_WRITABLE
事件,這兩類事件和套接字操做之間有如下對應關係:
IO 多路複用程序容許服務器同時監聽套接字的 AR_READABLE 事件和 AE_WRITABLE 事件。若是一個套接字同時產生了兩個事件,那麼文件分派器會優先處理 AE_READABLE 事件,而後再處理 AE_WRITABLE 事件。簡單來講,若是一個套接字既可讀又可寫,那麼服務器將先讀套接字,後寫套接字。
Redis 爲文件事件編寫了多個處理器,這些事件處理器分別用於實現不一樣的網絡通訊需求。好比說:
在這些事件處理器中,服務器最經常使用的是與客戶端進行通訊的鏈接應答處理器、命令請求處理器和命令回覆處理器。
1)鏈接應答處理器
networking.c/acceptTcpHandle
函數是 Redis 的鏈接應答處理器,這個處理器用於對鏈接服務器監聽套接字的客戶端進行應答,具體實現爲 sys/socket.h/accept
函數的包裝。
當 Redis 服務器進行初始化的時候,程序會將這個鏈接應答處理器和服務器監聽套接字的 AE_READABLE 事件關聯。對應源碼以下
# server.c/initServer ... /* Create an event handler for accepting new connections in TCP and Unix * domain sockets. */ for (j = 0; j < server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) { serverPanic( "Unrecoverable error creating server.ipfd file event."); } } ...
當有客戶端用 sys/scoket.h/connect
函數鏈接服務器監聽套接字時,套接字就會產生 AE_READABLE 事件,引起鏈接應答處理器執行,並執行相應的套接字應答操做。如圖 4 所示:
2)命令請求處理器
networking.c/readQueryFromClient
函數是 Redis 的命令請求處理器,這個處理器負責從套接字中讀入客戶端發送的命令請求內容,具體實現爲 unistd.h/read
函數的包裝。
當一個客戶端經過鏈接應答處理器成功鏈接到服務器以後,服務器會將客戶端套接字的 AE_READABLE 事件和命令請求處理器關聯起來(networking.c/acceptCommonHandler
函數)。
當客戶端向服務器發送命令請求的時候,套接字就會產生 AR_READABLE 事件,引起命令請求處理器執行,並執行相應的套接字讀入操做,如圖 5 所示:
在客戶端鏈接服務器的整個過程當中,服務器都會一直爲客戶端套接字的 AE_READABLE 事件關聯命令請求處理器。
3)命令回覆處理器
networking.c/sendReplToClient
函數是 Redis 的命令回覆處理器,這個處理器負責將服務器執行命令後獲得的命令回覆經過套接字返回給客戶端。
當服務器有命令回覆須要發給客戶端時,服務器會將客戶端套接字的 AE_WRITABLE 事件和命令回覆處理器關聯(networking.c/handleClientsWithPendingWrites
函數)。
當客戶端準備好接收服務器傳回的命令回覆時,就會產生 AE_WRITABLE 事件,引起命令回覆處理器執行,並執行相應的套接字寫入操做。如圖 6 所示:
當命令回覆發送完畢以後,服務器就會解除命令回覆處理器與客戶端套接字的 AE_WRITABLE 事件的關聯。對應源碼以下:
# networking.c/writeToClient ... if (!clientHasPendingReplies(c)) { c->sentlen = 0; # buffer 緩衝區命令回覆已發送,刪除套接字和事件的關聯 if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); /* Close connection after entire reply has been sent. */ if (c->flags & CLIENT_CLOSE_AFTER_REPLY) { freeClient(c); return C_ERR; } } ...
以前咱們經過 debug 的形式大體認識了客戶端與服務器的鏈接過程。如今,咱們站在文件事件的角度,再一次來追蹤 Redis 客戶端與服務器進行鏈接併發送命令的整個過程,看看在過程當中會產生什麼事件,這些事件又是如何被處理的。
先來看客戶端與服務器創建鏈接的過程:
server.c/initServer()
)。networking.c/acceptTcpHandler()
)。networking.c/acceptCommonHandler()
)進行關聯,使得服務器能夠接收該客戶端發來的命令請求。此時,客戶端已成功與服務器創建鏈接了。上述過程,咱們仍然能夠用 gdb 調試,查看函數的執行過程。具體調試過程以下:
gdb ./src/redis-server (gdb) b acceptCommonHandler # 給 acceptCommonHandler 函數設置斷點 (gdb) r redis-conf --port 8379 # 啓動服務器
另外開一個窗口,使用 redis-cli 鏈接服務器:redis-cli -p 8379
回到服務器窗口,咱們會看到已進入 gdb 調試模式,輸入:info stack
,能夠看到如圖 6 所示的堆棧信息。
如今,咱們再來認識命令的執行過程:
server.c/processCommad()
中 lookupCommand
函數調用;server.c/processCommad()
中 call
函數調用。network.c/writeToClient()
函數。圖 7 展現了命令執行過程的堆棧信息。圖 8 則展現了命令回覆過程的堆棧信息。
上一節咱們一塊兒認識了文件事件。接下來,讓咱們再來認識下時間事件。
Redis 的時間時間分爲如下兩類:
對於時間事件,數據結構源碼(ae.h/aeTimeEvent):
/* Time event structure */ typedef struct aeTimeEvent { long long id; /* time event identifier. */ long when_sec; /* seconds */ long when_ms; /* milliseconds */ aeTimeProc *timeProc; aeEventFinalizerProc *finalizerProc; void *clientData; struct aeTimeEvent *next; } aeTimeEvent;
主要屬性說明:
時間事件進程執行的函數爲 ae.c/processTimeEvents()
。
此外,對於時間事件的類型區分,取決於時間事件處理器的返回值:
ae.h/AE_NOMORE
,爲定時事件。該事件在到達一次後就會被刪除;ae.h/AE_NOMORE
,爲週期事件。當一個週期時間事件到達後,服務器會根據事件處理器返回的值,對時間事件的 when_sec 和 when_ms 屬性進行更新,讓這個事件在一段時間以後再次到達,並以這種方式一致更新運行。好比,若是一個時間事件處理器返回 30,那麼服務器應該對這個時間事件進行更新,讓這個事件在 30 毫秒後再次執行。持續運行的 Redis 服務器須要按期對自身的資源和狀態進行檢查和調整,從而確保服務能夠長期、穩定的運行。這些按期操做由 server.c/serverCron()
函數負責執行。主要操做包括:
Redis 服務器以週期性事件的方式來運行 serverCron 函數,在服務器運行期間,每隔一段時間,serverCron 就會執行一次,直到服務器關閉爲止。
關於執行次數,可參見 redis.conf
文件中的 hz 選項。默認爲 10,表示每秒運行 10 次。
因爲服務器同時存在文件事件和時間事件,因此服務器必須對這兩種事件進行調度,來決定什麼時候處理文件事件,什麼時候處理時間事件,以及花多少時間來處理它們等等。
事件的調度和執行有 ae.c/aeProcessEvents()
函數負責。源碼以下:
int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents; /* Nothing to do? return ASAP */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; /* 首先判斷是否存在須要監聽的文件事件,若是存在須要監聽的文件事件,那麼經過IO多路複用程序獲取 * 準備就緒的文件事件,至於IO多路複用程序是否等待以及等待多久的時間,依發生時間距離如今最近的時間事件肯定; * 若是eventLoop->maxfd == -1表示沒有須要監聽的文件事件,可是時間事件確定是存在的(serverCron()), * 若是此時沒有設置 AE_DONT_WAIT 標誌位,此時調用IO多路複用,其目的不是爲了監聽文件事件是否準備就緒, * 而是爲了使線程休眠到發生時間距離如今最近的時間事件的發生時間(做用相似於unix中的sleep函數), * 這種休眠操做的目的是爲了不線程一直不停的遍歷時間事件造成的無序鏈表,形成沒必要要的資源浪費 */ if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL; struct timeval tv, *tvp; /* 尋找發生時間距離如今最近的時間事件,該時間事件的發生時間與當前時間之差就是IO多路複用程序應該等待的時間 */ if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) shortest = aeSearchNearestTimer(eventLoop); if (shortest) { long now_sec, now_ms; // 建立 timeval 結構 aeGetTime(&now_sec, &now_ms); tvp = &tv; /* How many milliseconds we need to wait for the next * time event to fire? */ long long ms = (shortest->when_sec - now_sec)*1000 + shortest->when_ms - now_ms; /* 若是時間之差大於0,說明時間事件到時時間未到,則等待對應的時間; * 若是時間間隔小於0,說明時間事件已經到時,此時若是沒有 * 文件事件準備就緒,那麼IO多路複用程序應該當即返回,以避免 * 耽誤處理時間事件*/ if (ms > 0) { tvp->tv_sec = ms/1000; tvp->tv_usec = (ms % 1000)*1000; } else { tvp->tv_sec = 0; tvp->tv_usec = 0; } } else { /* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to set the timeout * to zero */ if (flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ tvp = NULL; /* wait forever */ } } // 阻塞並等等文件事件產生,最大阻塞事件由 timeval 結構決定 numevents = aeApiPoll(eventLoop, tvp); for (j = 0; j < numevents; j++) { // 處理全部已產生的文件事件 aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int fired = 0; /* Number of events fired for current fd. */ int invert = fe->mask & AE_BARRIER; if (!invert && fe->mask & mask & AE_READABLE) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } /* Fire the writable event. */ if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } /* If we have to invert the call, fire the readable event now * after the writable one. */ if (invert && fe->mask & mask & AE_READABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } processed++; } } /* Check time events */ if (flags & AE_TIME_EVENTS) // 處理全部已到達的時間事件 processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */ }
將 aeProcessEvents
函數置於一個循環裏面,加上初始化和清理函數,就構成了 Redis 服務器的主函數 server.c/main()
。如下是主函數的僞代碼:
def main(): // 初始化服務器 init_server(); // 一直處理事件,直到服務器關閉爲止 while server_is_not_shutdown(): aeProcessEvents(); // 服務器關閉,執行清理操做 clear_server()
從事件處理的角度來看,Redis 服務器的運行流程能夠用流程圖 1 來歸納:
如下是事件的調度和執行規則: