Redis源碼系列的初衷,是幫助咱們更好地理解Redis,更懂Redis,而怎麼才能懂,光看是不夠的,建議跟着下面的這一篇,把環境搭建起來,後續能夠本身閱讀源碼,或者跟着我這邊一塊兒閱讀。因爲我用c也是好幾年之前了,些許錯誤在所不免,但願讀者能不吝指出。html
曹工說Redis源碼(1)-- redis debug環境搭建,使用clion,達到和調試java同樣的效果java
曹工說Redis源碼(2)-- redis server 啓動過程解析及簡單c語言基礎知識補充linux
曹工說Redis源碼(3)-- redis server 啓動過程完整解析(中)redis
曹工說Redis源碼(4)-- 經過redis server源碼來理解 listen 函數中的 backlog 參數編程
曹工說Redis源碼(5)-- redis server 啓動過程解析,以及EventLoop每次處理事件前的前置工做解析(下)api
先給你們複習下前面一講的功課,你們知道,redis 基本是單線程,也就是說,假設咱們啓動main方法的,是線程A,那麼,最終,去處理客戶端socket鏈接、讀取客戶端請求、以及向客戶端socket寫數據,也仍是線程A。數組
同時,你們想必也知道,redis 裏仍是有一些後臺任務要作的,好比:網絡
字典的rehash(rehash的意思是,redis 裏,字典結構,實際上是包含了兩個hashtable,通常使用第一個;當須要擴充其size的時候,hashtable[1] 就會擴充內存到擴充後的size,而後,就須要把hashtable[0]裏面的數據,所有遷移到 hashtable[1] 來,這個過程,即所謂的rehash),rehash的過程,仍是比較耗時的;異步
redis 裏的鍵,若是設了過時時間,到了過時時間後,這個key,是否是就在redis裏不存在了呢?不必定,可是你去訪問的時候,確定是看不到了。但這個怎麼作到的呢?難道每次來一個這種key,就設置一個timer,在指定過時時間後執行清除任務嗎?這個想來,開銷太大了;socket
因此,其實分了兩種策略:
檢查當前的客戶端集合,看看哪些是一直空閒,且超過了必定時間的,這部分客戶端,被篩選出來,直接幹掉,關掉與該客戶端之間的長鏈接。
還有其餘一些任務,下邊再說。
因此,從上面可知,redis 主要要幹兩類活,一種是客戶端要它乾的,好比,我執行個get/set命令,這個優先級比較高;另外一類就是例行工做,每隔多久就得幹一次。
前面一講,咱們已經講到了下面這個代碼:
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { // 若是有須要在事件處理前執行的函數,那麼運行它 if (eventLoop->beforesleep != NULL) // 1 eventLoop->beforesleep(eventLoop); // 2 開始處理事件 aeProcessEvents(eventLoop, AE_ALL_EVENTS); } }
1處,咱們已經講完了;本講,主要講2處,這個主循環。ok,扯了一堆,let's go!
獲取有沒有周期任務要執行,若是有,則計算一下,要過多久,纔到週期任務的執行時間;把過多久這個時間,算出來後,定義爲 timeLeftToScheduledJobTime;若是沒有周期任務,這個時間能夠定義爲null;
若是發現時間已經到了,則表示如今就能夠執行這個週期任務了,把timeLeftToScheduledJobTime 設爲0
這部分代碼,以下所示:
if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL; // 1 struct timeval tv, *tvp; // 獲取最近的時間事件 if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) // 2 shortest = aeSearchNearestTimer(eventLoop); if (shortest) { // 若是時間事件存在的話 // 那麼根據最近可執行時間事件和如今時間的時間差來決定文件事件的阻塞時間 long now_sec, now_ms; // 計算距今最近的時間事件還要多久才能達到 // 並將該時間距保存在 tv 結構中 /** * 3 獲取當前時間,這裏把兩個long 局部變量的地址傳進去了,在裏面會去修改它 */ aeGetTime(&now_sec, &now_ms); tvp = &tv; // 4 tvp->tv_sec = shortest->when_sec - now_sec; if (shortest->when_ms < now_ms) { tvp->tv_usec = ((shortest->when_ms + 1000) - now_ms) * 1000; tvp->tv_sec--; } else { tvp->tv_usec = (shortest->when_ms - now_ms) * 1000; } // 5 時間差小於 0 ,說明事件已經能夠執行了,將秒和毫秒設爲 0 (不阻塞) if (tvp->tv_sec < 0) tvp->tv_sec = 0; if (tvp->tv_usec < 0) tvp->tv_usec = 0; } else { // 執行到這一步,說明沒有時間事件 if (flags & AE_DONT_WAIT) { // 6 tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ // 7 tvp = NULL; /* wait forever */ } }
timeLeftToScheduledJobTime
long now_sec, now_ms
tvp->tv_sec
AE_DONT_WAIT
選項,分出2個分支,一個設爲0,一個設爲null。說到網絡編程中的多路複用,select幾乎是繞不開的話題,在沒有epoll以前,基本就是使用select。固然,select有它的缺點,那就是:
下面,你們看看select的api,你們也能夠自行在linux機器上執行:man select 查看。
select, pselect, FD_CLR, FD_ISSET, FD_SET, FD_ZERO - synchronous I/O multiplexing int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
上面的第一行,是select的簡單說明,其中一個詞是,synchronous,同步的意思,說明select是同步的,不是異步的,只是進行了io多路複用。
下面那個是其api,簡單解釋三個參數:
參數fd_set *readfds
Those listed in readfds will be watched to see if characters become available for reading (more
precisely, to see if a read will not block
也就是說,這個集合中的fd,會被監測,看看哪些fd能夠無阻塞地讀取;怎麼才能無阻塞地讀取,那確定是這個fd的輸入緩衝區有內容啊,好比,客戶端發了數據過來
參數fd_set *writefds
those in writefds will be watched to see if a write will not block
這個集合,會被監測,看看是否能夠對這個fd,進行無阻塞地寫;何時,不能無阻塞地寫呢?確定是緩衝區滿了的時候。這種應該常見於:給對端發數據時,對方一直不ack這些數據,因此我方的緩衝區裏,一直不能刪這些數據,致使緩衝區滿。
struct timeval *timeout
The timeout argument specifies the minimum interval that select() should block waiting for a file descriptor to become ready. If both fields of the timeval structure are zero, then select() returns immediately. (This is useful for polling.) If timeout is NULL (no timeout), select() can block
indefinitely.
這個timeout參數,指定了select()操做,等待文件描述符變成ready過程當中,須要等待多長時間。若是這個timeout的兩個字段,都被設爲了0,則select()會立刻返回。若是timeout是null,這個操做會無限阻塞。
因此,select我就算你們瞭解了,其中的timeout參數,簡單來講,就是調用select時,最大阻塞多久就要返回。
若是設爲0,則立刻返回;若是爲null,則無限阻塞;若是爲正常的大於0的值,則阻塞對應的時長。
和前面的部分,聯繫起來,就是說:
有的函數,天生適合拿來說課。epoll,kqueue等,會單獨拿來說。
// 1 處理文件事件,阻塞時間由 tvp 決定,tvp:timevalue pointer numevents = aeApiPoll(eventLoop, tvp); for (j = 0; j < numevents; j++) { // 2 從已就緒數組中獲取事件 aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int rfired = 0; // 讀事件 if (fe->mask & mask & AE_READABLE) { // rfired 確保讀/寫事件只能執行其中一個 rfired = 1; fe->rfileProc(eventLoop, fd, fe->clientData, mask); } // 寫事件 if (fe->mask & mask & AE_WRITABLE) { if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop, fd, fe->clientData, mask); } processed++; }
1處,這裏就會根據當前的操做系統,決定調用select或是epoll,或是其餘的實現。(經過條件編譯實現)。
假設這裏的底層實現,就是前面講的select函數,那麼,select函數執行完後,eventLoop->fired 屬性,就會存放此次select篩選出來的那些,ready的文件描述符集合。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, j, numevents = 0; /** * 拷貝到帶_的變量中 */ memcpy(&state->_rfds,&state->rfds,sizeof(fd_set)); memcpy(&state->_wfds,&state->wfds,sizeof(fd_set)); // 1 retval = select(eventLoop->maxfd+1, &state->_rfds,&state->_wfds,NULL,tvp); if (retval > 0) { for (j = 0; j <= eventLoop->maxfd; j++) { int mask = 0; aeFileEvent *fe = &eventLoop->events[j]; if (fe->mask == AE_NONE) continue; if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds)) mask |= AE_READABLE; if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds)) mask |= AE_WRITABLE; // 2 eventLoop->fired[numevents].fd = j; eventLoop->fired[numevents].mask = mask; numevents++; } } return numevents; }
如上所示,1處,調用select;2處,賦值給fired。
2處,從fired中取出對應的文件描述符
3處,若是fired中的文件描述符,可讀,則執行對應的函數指針rfileProc指向的函數
4處,若是fired中的文件描述符,可寫,則執行對應的函數指針wfileProc指向的函數
/* Check time events */ // 執行時間事件 if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop);
這裏會調用processTimeEvents,其實現以下,其中涉及到複雜的時間計算,咱們能夠只看核心流程:
/* Process time events * * 處理全部已到達的時間事件 */ static int processTimeEvents(aeEventLoop *eventLoop) { int processed = 0; aeTimeEvent *te; long long maxId; time_t now = time(NULL); // 更新最後一次處理時間事件的時間 eventLoop->lastTime = now; // 遍歷鏈表 // 執行那些已經到達的事件 te = eventLoop->timeEventHead; maxId = eventLoop->timeEventNextId - 1; while (te) { long now_sec, now_ms; long long id; // 獲取當前時間 aeGetTime(&now_sec, &now_ms); // 若是當前時間等於或等於事件的執行時間,那麼說明事件已到達,執行這個事件 if (now_sec > te->when_sec || (now_sec == te->when_sec && now_ms >= te->when_ms)) { int retval; id = te->id; //1 執行事件處理器,並獲取返回值 retval = te->timeProc(eventLoop, id, te->clientData); processed++; // 記錄是否有須要循環執行這個事件時間 if (retval != AE_NOMORE) { // 2 是的, retval 毫秒以後繼續執行這個時間事件 aeAddMillisecondsToNow(retval, &te->when_sec, &te->when_ms); } else { // 不,將這個事件刪除 aeDeleteTimeEvent(eventLoop, id); } // 由於執行事件以後,事件列表可能已經被改變了 // 所以須要將 te 放回表頭,繼續開始執行事件 te = eventLoop->timeEventHead; } else { te = te->next; } } return processed; }
1處,執行timeProc這個函數指針,執行的函數,在初始化的時候,這個指針,被賦值爲serverCron;
初始化時,會調用一下代碼:
// 爲 serverCron() 建立時間事件 if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { redisPanic("Can't create the serverCron time event."); exit(1); }
這裏的serverCron,是一個函數指針。
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc) { // 更新時間計數器 long long id = eventLoop->timeEventNextId++; // 建立時間事件結構 aeTimeEvent *te; te = zmalloc(sizeof(*te)); if (te == NULL) return AE_ERR; // 設置 ID te->id = id; // 設定處理事件的時間 aeAddMillisecondsToNow(milliseconds, &te->when_sec, &te->when_ms); // 1 設置事件處理器 te->timeProc = proc; te->finalizerProc = finalizerProc; // 設置私有數據 te->clientData = clientData; // 將新事件放入表頭 te->next = eventLoop->timeEventHead; eventLoop->timeEventHead = te; return id; }
上面的1處,將傳入的serverCron,賦值給了te->timeProc。
2處,註冊下一次的週期任務
本講主要講解了主循環的最外層結構,若是有什麼不清楚的,能夠留言。