Redis做爲一款NoSQL非關係內存數據庫,具備很高的讀寫性能,且原生支持的數據類型豐富,被普遍的做爲緩存、分佈式數據庫、消息隊列等應用。此外Redis還有許多高可用特性,包括數據持久化,主從模式備份等等,能夠知足對數據完整性有必定要求的場景。linux
Redis的源碼結構簡單清晰,有大量材料能夠參閱;經過閱讀Redis源碼,掌握一些經常使用技術在Redis中的實現,相信會對我的編程水平有很大幫助。這裏記錄下我閱讀Redis源碼的心得。從我本身比較關心的幾個技術點出發,每一個技術點都是來自我的使用Redis過程當中產生的問題。這裏也參考了黃建宏老師的《Redis設計與實現》部份內容,不得不說參考這本書再結合源碼註釋,看起來絕對事半功倍。redis
當初選用Redis的時候,很大程度上是因爲Redis的併發性能很高,能夠支持大量併發請求。那Redis是如何支持高併發請求的呢?這裏就引入了第一個技術點,事件處理機制。在Redis中使用了單線程的Reactor模式,屬於I/O多路複用的一種常見實現模式。這裏簡單介紹下Reactor模式。算法
從網上切一個類圖,簡單描述一下Reactor模式的主體結構數據庫
基本概念:編程
Handle:I/O操做的基本文件句柄,在linux下就是fd緩存
Synchronous Event Demultiplexer :同步事件分離器,阻塞等待Handles中的事件發生(Redis中的事件分離器設置了超時,不會一直阻塞)。服務器
Reactor: 事件分派器,負責事件的註冊,刪除以及對全部註冊到事件分派器的事件進行監控, 當事件發生時會調用Event Handler接口來處理事件。網絡
Event Handler: 事件處理器接口,這裏須要Concrete Event Handler來實現該接口併發
Concrete Event Handler:真實的事件處理器,一般都是綁定了一個handle,實現對可讀事件 進行讀取或對可寫事件進行寫入的操做。dom
關鍵點:
I/O多路複用指的就是以事件驅動爲基礎,可實現單個線程偵聽多個socket描述符的可讀可寫或異常狀態,不須要爲每一個socket描述符單首創建一個線程來偵聽描述符可讀仍是可寫。在Reactor模式中,對多個描述符進行偵聽的部件就是Synchronous Event Demultiplexer,一般是由操做系統提供的select/epoll/kqueue等函數實現。
Reactor模式大體的流程時序:主程序先向事件分派器註冊要監聽的事件,以後啓動事件分派器,由事件分派器調用操做系統提供的同步事件分離器(如select/epoll)偵聽事件,當事件發生時事件分派器會調用事件綁定好的處理函數handle_event()來處理事件。這裏的同步並非指阻塞,同步從API調用上來說就是調用結束後必定能確知本次調用是否成功,若是API調用超時,那麼使用者須要乘機再次發起調用才能達到目的(這裏若是設置了超時,就是非阻塞的,由於進程不會卡在API調用上直到其得到結果);對於異步來說,調用指望的結果不是在API調用結束後獲取的,一般是由CPU自行處理完成後發送通知給調用者的。由此能夠看出異步是自然非阻塞的。
事件分派器是單線程,這就要求每一個事件的處理函數handle_event()不能是阻塞的,不然一旦有某個事件的處理函數阻塞住,程序就沒法再調用其餘事件的處理函數了。
在Redis中,事件分爲兩大類:文件事件和時間事件。文件事件就是指客戶端的網絡鏈接請求到達,客戶端的發來的命令請求到達以及服務端發出命令應答這幾類事件;時間事件主要是Redis內部的定時處理器。
看下Redis對事件機制的代碼實現。按照正常的邏輯,Redis服務應該初始化一個事件分派器,而後將綁定了服務器IP,服務端口的鏈接套接字註冊到事件分派器上,以後便可啓動事件分派器。啓動後客戶端鏈接到Redis服務的請求就能夠被事件分派器偵聽。
Redis服務器初始化位於redis.c/initServer函數,貼出該函數中有關事件分派器初始化以及服務端口註冊的代碼:
void initServer() { int j; ...... createSharedObjects(); adjustOpenFilesLimit(); // 初始化事件分派器 server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR); server.db = zmalloc(sizeof(redisDb)*server.dbnum); /* Open the TCP listening socket for the user commands. */ // 打開 TCP 監聽端口,用於等待客戶端的命令請求 if (server.port != 0 && listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR) exit(1); /* Open the listening Unix domain socket. */ // 打開 UNIX 本地端口 if (server.unixsocket != NULL) { unlink(server.unixsocket); /* don't care if this fails */ server.sofd = anetUnixServer(server.neterr,server.unixsocket, server.unixsocketperm, server.tcp_backlog); if (server.sofd == ANET_ERR) { redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr); exit(1); } anetNonBlock(NULL,server.sofd); } /* Abort if there are no listening sockets at all. */ if (server.ipfd_count == 0 && server.sofd < 0) { redisLog(REDIS_WARNING, "Configured to not listen anywhere, exiting."); exit(1); } ...... updateCachedTime(); /* Create the serverCron() time event, that's our main way to process * background operations. */ // 爲 serverCron() 建立時間事件 if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { redisPanic("Can't create the serverCron time event."); exit(1); } /* Create an event handler for accepting new connections in TCP and Unix * domain sockets. */ // 爲 TCP 鏈接關聯鏈接應答(accept)處理器 // 用於接受並應答客戶端的 connect() 調用 for (j = 0; j < server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) { redisPanic( "Unrecoverable error creating server.ipfd file event."); } } // 爲本地套接字關聯應答處理器 if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event."); ...... }
aeCreateFileEvent函數至關於Reactor模型中的事件註冊函數register_handle(),這裏對Redis配置文件中每組IP綁定的server.ipfd[i]都建立了偵聽事件,偵聽事件對應的處理器爲鏈接應答處理器,即networking.c/acceptTcpHandler函數。偵聽事件處理器中調用了accept來處理用戶的鏈接請求;當客戶端調用connect發起鏈接請求時,Redis服務端的偵聽事件即變成可處理的狀態,Redis經過select/epoll檢查到偵聽事件可處理,就會調用其對應的處理器acceptTcpHandler函數來處理客戶端的鏈接請求。
acceptTcpHandler源碼以下:
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd, max = MAX_ACCEPTS_PER_CALL; char cip[REDIS_IP_STR_LEN]; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); REDIS_NOTUSED(privdata); redisClient *c; while(max--) { // accept 客戶端鏈接 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); if (cfd == ANET_ERR) { if (errno != EWOULDBLOCK) redisLog(REDIS_WARNING, "Accepting client connection: %s", server.neterr); return; } // snprintf() // 爲客戶端建立客戶端狀態(redisClient) c = acceptCommonHandler(cfd,0); if(c != NULL) { snprintf(c->cip, sizeof(c->cip), "%s", cip); c->cport = cport; } redisLog(REDIS_VERBOSE,"Accepted %s:%d %s:%d ", cip, cport, c->cip, c->cport); } }
anetTcpAccept函數內部使用accept建立好與客戶端的鏈接,返回cfd,後續與客戶端的消息收發都是創建在cfd上的。這裏很天然的就須要將cfd也註冊到Redis的事件分派器上。咱們注意到cfd的讀事件對應着客戶端發來了命令請求,須要服務端讀取後處理;寫事件對應着Redis服務端發出的命令處理應答,寫給客戶端。在剛剛創建鏈接的時候,服務端很顯然是要接收用戶的命令,因此這裏只能先註冊cfd的讀事件。
代碼中能夠看到acceptTcpHandler函數裏會調用networking.c/acceptCommonHandler建立客戶端,acceptCommonHandler中的createClient執行了對通訊fd可讀事件的註冊
redisClient *createClient(int fd) { //createClient,主 備全量同步完成後,備建立一個client來接收主到備的實時KV // 分配空間 redisClient *c = zmalloc(sizeof(redisClient)); /* passing -1 as fd it is possible to create a non connected client. * This is useful since all the Redis commands needs to be executed * in the context of a client. When commands are executed in other * contexts (for instance a Lua script) we need a non connected client. */ if (fd != -1) { // 非阻塞 anetNonBlock(NULL,fd); // 禁用 Nagle 算法 anetEnableTcpNoDelay(NULL,fd); // 設置 keep alive if (server.tcpkeepalive) anetKeepAlive(NULL,fd,server.tcpkeepalive); // 綁定讀事件到事件 loop (開始接收命令請求) //accept接收到客戶端鏈接的時候調用該函數把fd加入事件集中 if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR) { close(fd); zfree(c); return NULL; } } ... ... // 返回客戶端 return c; }
那麼cfd的寫事件是在何時註冊的呢? cfd可寫事件是服務器對客戶端發送命令應答的事件,應該在服務器執行了客戶端的命令以後再註冊上去。Redis也是在每一個命令處理器處理完成時調用addReply函數來註冊cfd寫事件的。客戶端準備好接收應答時就會產生cfd的寫事件,若是Redis註冊寫事件在客戶端準備好接收應答以後也沒有關係,Redis註冊寫事件以後,即發現該事件能夠處理,在下一個事件分派器輪詢週期便可被處理。命令應答處理器的函數是networking.c/sendReplyToClient,應答完成後就調用aeDeleteFileEvent函數釋放掉通訊fd的應答事件監控。
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { redisClient *c = privdata; int nwritten = 0, totwritten = 0, objlen; size_t objmem; robj *o; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); // 一直循環,直到回覆緩衝區爲空 // 或者指定條件知足爲止 while(c->bufpos > 0 || listLength(c->reply)) { if (c->bufpos > 0) { // c->bufpos > 0 // 寫入內容到套接字 // c->sentlen 是用來處理 short write 的 // 當出現 short write ,致使寫入未能一次完成時, // c->buf+c->sentlen 就會偏移到正確(未寫入)內容的位置上。 nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen); // 出錯則跳出 if (nwritten <= 0) break; // 成功寫入則更新寫入計數器變量 c->sentlen += nwritten; totwritten += nwritten; /* If the buffer was sent, set bufpos to zero to continue with * the remainder of the reply. */ // 若是緩衝區中的內容已經所有寫入完畢 // 那麼清空客戶端的兩個計數器變量 if (c->sentlen == c->bufpos) { c->bufpos = 0; c->sentlen = 0; } } else { // listLength(c->reply) != 0 // 取出位於鏈表最前面的對象 o = listNodeValue(listFirst(c->reply)); objlen = sdslen(o->ptr); objmem = getStringObjectSdsUsedMemory(o); // 略過空對象 if (objlen == 0) { listDelNode(c->reply,listFirst(c->reply)); c->reply_bytes -= objmem; continue; } // 寫入內容到套接字 // c->sentlen 是用來處理 short write 的 // 當出現 short write ,致使寫入未能一次完成時, // c->buf+c->sentlen 就會偏移到正確(未寫入)內容的位置上。 nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen); // 寫入出錯則跳出 if (nwritten <= 0) break; // 成功寫入則更新寫入計數器變量 c->sentlen += nwritten; totwritten += nwritten; /* If we fully sent the object on head go to the next one */ // 若是緩衝區內容所有寫入完畢,那麼刪除已寫入完畢的節點 if (c->sentlen == objlen) { listDelNode(c->reply,listFirst(c->reply)); c->sentlen = 0; c->reply_bytes -= objmem; } } /* Note that we avoid to send more than REDIS_MAX_WRITE_PER_EVENT * bytes, in a single threaded server it's a good idea to serve * other clients as well, even if a very large request comes from * super fast link that is always able to accept data (in real world * scenario think about 'KEYS *' against the loopback interface). * * 爲了不一個很是大的回覆獨佔服務器, * 當寫入的總數量大於 REDIS_MAX_WRITE_PER_EVENT , * 臨時中斷寫入,將處理時間讓給其餘客戶端, * 剩餘的內容等下次寫入就緒再繼續寫入 * * However if we are over the maxmemory limit we ignore that and * just deliver as much data as it is possible to deliver. * * 不過,若是服務器的內存佔用已經超過了限制, * 那麼爲了將回復緩衝區中的內容儘快寫入給客戶端, * 而後釋放回復緩衝區的空間來回收內存, * 這時即便寫入量超過了 REDIS_MAX_WRITE_PER_EVENT , * 程序也繼續進行寫入 */ if (totwritten > REDIS_MAX_WRITE_PER_EVENT && //最多寫64M (server.maxmemory == 0 || zmalloc_used_memory() < server.maxmemory)) break; } // 寫入出錯檢查 if (nwritten == -1) { if (errno == EAGAIN) { nwritten = 0; } else { redisLog(REDIS_VERBOSE, "Error writing to client: %s", strerror(errno)); freeClient(c, NGX_FUNC_LINE); return; } } if (totwritten > 0) { /* For clients representing masters we don't count sending data * as an interaction, since we always send REPLCONF ACK commands * that take some time to just fill the socket output buffer. * We just rely on data / pings received for timeout detection. */ if (!(c->flags & REDIS_MASTER)) c->lastinteraction = server.unixtime; } if (c->bufpos == 0 && listLength(c->reply) == 0) { c->sentlen = 0; // 刪除 write handler aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); /* Close connection after entire reply has been sent. */ // 若是指定了寫入以後關閉客戶端 FLAG ,那麼關閉客戶端 if (c->flags & REDIS_CLOSE_AFTER_REPLY) freeClient(c, NGX_FUNC_LINE); } }
Redis中的文件事件處理流程已經大致列出了,還有一個比較重要的環節就是Synchronous Event Demultiplexer的實現。在Redis中是根據操做系統支持的狀況選用效率最高的實現。同步事件分離器是封裝在ae.h/ae.c中的,使用統一的API供Redis來調用。分離器的具體實現是選用不一樣操做系統下效率最高的事件分離器,各實際的事件分離器實如今ae_epoll.c/ae_select.c/ae_evport.c/ae_kqueue.c中。
看下選取不一樣類型事件分離器的代碼(ae.c):
/* Include the best multiplexing layer supported by this system. * The following should be ordered by performances, descending. */ #ifdef HAVE_EVPORT #include "ae_evport.c" #else #ifdef HAVE_EPOLL #include "ae_epoll.c" #else #ifdef HAVE_KQUEUE #include "ae_kqueue.c" #else #include "ae_select.c" #endif #endif #endif