做者:李樂linux
本文主要講解服務器處理客戶端命令請求的整個流程,包括服務器啓動監聽,接收命令請求並解析,執行命令請求,返回命令回覆等,這也是本文的主題「命令處理的生命週期」。
Redis服務器做爲典型的事件驅動程序,事件處理顯得尤其重要,而Redis將事件分爲兩大類:文件事件與時間事件。文件事件即socket的可讀可寫事件,時間事件用於處理一些須要週期性執行的定時任務,本文將對這兩種事件做詳細介紹。c++
爲了更好的理解服務器與客戶端的交互,還須要學習一些基礎知識,好比客戶端信息的存儲,Redis對外支持的命令集合,客戶端與服務器socket讀寫事件的處理,Redis內部定時任務的執行等,本小節將對這些知識做簡要介紹。redis
Redis是一個Key-Value數據庫,key只能是字符串,value多是字符串、哈希表、列表、集合和有序集合,這5種數據類型用結構體robj表示,咱們稱之爲redis對象。結構體robj的type字段表示對象類型,5種對象類型在server.h文件定義:算法
#define OBJ_STRING 0 #define OBJ_LIST 1 #define OBJ_SET 2 #define OBJ_ZSET 3 #define OBJ_HASH 4
針對某一種類型的對象,redis在不一樣狀況下可能採用不一樣的數據結構存儲,結構體robj的的encoding字段表示當前對象底層存儲採用的數據結構,即對象的編碼,總共定義了10種encoding常量,以下表-1所示:
表-1 對象編碼類型表數據庫
encoding常量 | 數據結構 | 可存儲對象類型 |
---|---|---|
OBJ_ENCODING_RAW | 簡單動態字符串sds | 字符串 |
OBJ_ENCODING_INT | 整數 | 字符串 |
OBJ_ENCODING_HT | 字典dict | 集合、哈希表、有序集合 |
OBJ_ENCODING_ZIPMAP | 未使用 | |
OBJ_ENCODING_LINKEDLIST | 再也不使用 | |
OBJ_ENCODING_ZIPLIST | 壓縮列表ziplist | 哈希表、有序集合 |
BJ_ENCODING_INTSET | 整數集合intset | 集合 |
OBJ_ENCODING_SKIPLIST | 跳躍表skiplist | 有序集合 |
OBJ_ENCODING_EMBSTR | 簡單動態字符串sds | 字符串 |
OBJ_ENCODING_QUICKLIST | 快速鏈表quicklist | 列表 |
對象的整個生命週期中,編碼不是一成不變的,好比集合對象。當集合中全部元素均可以用整數表示時,底層數據結構採用整數集合;執行SADD命令往集合添加元素時,redis總會校驗待添加元素是否能夠解析爲整數,若是解析失敗,則會將集合存儲結構轉換爲字典。api
if (subject->encoding == OBJ_ENCODING_INTSET) { if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) { subject->ptr = intsetAdd(subject->ptr,llval,&success); } else { //編碼轉換 setTypeConvert(subject,OBJ_ENCODING_HT); } }
對象在不一樣狀況下可能採用不一樣的數據結構存儲,那對象可能同時採用多種數據結構存儲嗎?根據上面的表格,有序集合可能採用壓縮列表、跳躍表和字典存儲。使用字典存儲時,根據成員查找分值的時間複雜度爲O(1),而對於ZRANGE與ZRANK等命令,須要排序才能實現,時間複雜度至少爲O(NlogN);使用跳躍表存儲時,ZRANGE與ZRANK等命令的時間複雜度爲O(logN),而根據成員查找分值的時間複雜度一樣是O(logN)。字典與跳躍表各有優點,所以Redis會同時採用字典與跳躍表存儲有序集合。這裏有讀者可能會有疑問,同時採用兩種數據結構存儲不浪費空間嗎?數據都是經過指針引用的,兩種存儲方式只須要額外存儲一些指針便可,空間消耗是能夠接受的。有序集合存儲結構定義以下:數組
typedef struct zset { dict *dict; zskiplist *zsl; } zset;
觀察表-1,注意到編碼OBJ_ENCODING_RAW和OBJ_ENCODING_EMBSTR都表示的是簡單動態字符串,那麼這兩種編碼有什麼區別嗎?在回答此問題以前須要先了解結構體robj的定義:緩存
#define LRU_BITS 24 typedef struct redisObject { unsigned type:4; unsigned encoding:4; unsigned lru:LRU_BITS; //緩存淘汰使用 int refcount; //引用計數 void *ptr; } robj;
下面詳細分析結構體各字段含義:
ptr是void*類型的指針,指向實際存儲的某一種數據結構,可是當robj存儲的是數據能夠用long類型表示時,數據直接存儲在ptr字段。能夠看出,爲了建立一個字符串對象,必須分配兩次內存,robj與sds存儲空間;兩次內存分配效率低下,且數據分離存儲下降了計算機高速緩存的效率。所以提出OBJ_ENCODING_EMBSTR編碼的字符串,當字符串內容比較短時,只分配一次內存,robj與sds連續存儲,以此提高內存分配效率與數據訪問效率。OBJ_ENCODING_EMBSTR編碼的字符串內存結構以下圖-1所示:安全
圖-1 EMBSTR編碼字符串對象內存結構
refcount存儲當前對象的引用次數,用於實現對象的共享。共享對象時,refcount加1;刪除對象時,refcount減1,當refcount值爲0時釋放對象空間。刪除對象的代碼以下:服務器
void decrRefCount(robj *o) { if (o->refcount == 1) { switch(o->type) { //根據對象類型,釋放其指向數據結構空間 case OBJ_STRING: freeStringObject(o); break; case OBJ_LIST: freeListObject(o); break; case OBJ_SET: freeSetObject(o); break; ………… } zfree(o); //釋放對象空間 } else { //引用計數減1 if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount--; } }
lru字段佔24比特,用於實現緩存淘汰策略,能夠在配置文件中使用maxmemory-policy指令配置已用內存達到最大內存限制時的緩存淘汰策略。lru根據用戶配置緩存淘汰策略存儲不一樣數據,經常使用的策略就是LRU與LFU,LRU的核心思想是,若是數據最近被訪問過,那麼未來被訪問的概率也更高,此時lru字段存儲的是對象訪問時間;LFU的核心思想是,若是數據過去被訪問屢次,那麼未來被訪問的頻率也更高,此時lru字段存儲的是上次訪問時間與訪問次數。假如使用GET命令訪問數據時,會執行下面代碼更新對象的lru字段:
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { updateLFU(val); } else { val->lru = LRU_CLOCK(); }
LRU_CLOCK函數用於獲取當前時間,注意此時間不是實時獲取的,redis1秒爲週期執行系統調用獲取精確時間,緩存在全局變量server.lruclock,LRU_CLOCK函數獲取的只是緩存在此變量中的時間。
updateLFU函數用於更新對象的上次訪問時間與訪問次數,函數實現以下:
void updateLFU(robj *val) { unsigned long counter = LFUDecrAndReturn(val); counter = LFULogIncr(counter); val->lru = (LFUGetTimeInMinutes()<<8) | counter; }
能夠發現lru的低8比特存儲的是對象的訪問次數,高16比特存儲的是對象的上次訪問時間,以分鐘爲單位;須要特別注意的是函數LFUDecrAndReturn,其返回計數值counter,對象的訪問次數在此值上累加。爲何不直接累加呢?假設每次只是簡單的對訪問次數累加,那麼越老的數據通常狀況下訪問次數越大,即便該對象可能很長時間已經沒有訪問。所以訪問次數應該有一個隨時間衰減的過程,函數LFUDecrAndReturn實現了此衰減功能。
Redis是典型的客戶端服務器結構,客戶端經過socket與服務端創建網絡鏈接併發送命令請求,服務端處理命令請求並回復。Redis使用結構體client存儲客戶端鏈接的全部信息,包括但不限於客戶端的名稱、客戶端鏈接的套接字描述符、客戶端當前選擇的數據庫ID、客戶端的輸入緩衝區與輸出緩衝區等。結構體client字段較多,此處只介紹命令處理主流程所需的關鍵字段。
typedef struct client { uint64_t id; int fd; redisDb *db; robj *name; time_t lastinteraction sds querybuf; int argc; robj **argv; struct redisCommand *cmd; list *reply; unsigned long long reply_bytes; size_t sentlen; char buf[PROTO_REPLY_CHUNK_BYTES]; int bufpos; } client;
各字段含義以下:
typedef struct redisDb { int id; long long avg_ttl; dict *dict; dict *expires; dict *blocking_keys; dict *ready_keys; dict *watched_keys; } redisDb;
其中id爲數據庫序號,默認狀況下Redis有16個數據庫,id序號爲0~15;dict存儲數據庫全部鍵值對;expires存儲鍵的過時時間;avg_ttl存儲數據庫對象的平均TTL,用於統計;
使用命令BLPOP阻塞獲取列表元素時,若是鏈表爲空,會阻塞客戶端,同時將此列表鍵記錄在blocking_keys;當使用命令PUSH向列表添加元素時,會從字典blocking_keys中查找該列表鍵,若是找到說明有客戶端正阻塞等待獲取此列表鍵,因而將此列表鍵記錄到字典ready_keys,以便後續響應正在阻塞的客戶端;
Redis支持事務,命令用於MULTI開啓事務,命令EXEC用於執行事務;可是開啓事務到執行事務期間,如何保證關心的數據不會被修改呢?Redis採用樂觀鎖實現。開啓事務的同時可使用WATCH key命令監控關心的數據鍵,而watched_keys字典存儲的就是被WATCH命令監控的全部數據鍵,其中key-value分別爲數據鍵與客戶端對象。當Redis服務器接收到寫命令時,會從字典watched_keys中查找該數據鍵,若是找到說明有客戶端正在監控此數據鍵,因而會標記客戶端對象爲dirty;待Redis服務器收到客戶端EXEC命令時,若是客戶端帶有dirty標記,則會拒絕執行事務。
結構體redisServer存儲Redis服務器的全部信息,包括但不限於數據庫、配置參數、命令表、監聽端口與地址、客戶端列表、若干統計信息、RDB與AOF持久化相關信息、主從複製相關信息、集羣相關信息等。結構體redisServer的段很是多,這裏只對部分字段作簡要說明,以便讀者對於服務端有個粗略瞭解,至於其餘字段在講解各知識點時會作說明。
struct redisServer { char *configfile; int hz; int dbnum; redisDb *db; dict *commands; aeEventLoop *el; int port; char *bindaddr[CONFIG_BINDADDR_MAX]; int bindaddr_count; int ipfd[CONFIG_BINDADDR_MAX]; int ipfd_count; list *clients; int maxidletime; }
各字段含義以下:
#define run_with_period(_ms_) if ((_ms_ <= 1000/server.hz) || !(server.cronloops%((_ms_)/(1000/server.hz)))
固然因爲hz是用戶配置的,其並不能表明真實的serverCron函數執行頻率。
Redis支持的全部命令初始都存儲在全局變量redisCommandTable,類型爲struct redisCommand[ ],定義及初始化以下:
struct redisCommand redisCommandTable[] = { {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0}, {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0}, ………… }
結構體redisCommand相對簡單,主要定義了命令的名稱、命令處理函數以及命令標誌等:
struct redisCommand { char *name; redisCommandProc *proc; int arity; char *sflags; int flags; long long microseconds, calls; };
各字段含義以下:
表-2 命令標誌類型
字符標識 | 二進制標識 | 含義 | 相關命令 |
---|---|---|---|
w | CMD_WRITE | 寫命令 | set、del、incr、lpush |
r | CMD_READONLY | 讀命令 | get、exists、llen |
m | CMD_DENYOOM | 內存不足時,拒絕執行此類命令 | set、append、lpush |
a | CMD_ADMIN | 管理命令 | save、shutdown、slaveof |
p | CMD_PUBSUB | 發佈訂閱相關命令 | subscribe、unsubscribe |
s | CMD_NOSCRIPT | 命令不能夠在lua腳本使用 | auth、save、brpop |
R | CMD_RANDOM | 隨機命令,即便命令請求參數徹底相同,返回結果也可能不容 | srandmember、scan、time |
S | CMD_SORT_FOR_SCRIPT | 當在lua腳本使用此類命令時,須要對輸出結果作排序 | sinter、sunion、sdiff |
l | CMD_LOADING | 服務器啓動載入過程當中,只能執行此類命令 | select、auth、info |
t | CMD_STALE | 當從服務器與主服務器斷開連接,且從服務器配置slave-serve-stale-data no時,從服務器只能執行此類命令 | auth、shutdown、info |
M | CMD_SKIP_MONITOR | 此類命令不會傳播給監視器 | exec |
k | CMD_ASKING | restore-asking | |
F | CMD_FAST | 命令執行時間超過閾值時,會記錄延遲事件,此標誌用於區分延遲事件類型,F表示fast-command | get、setnx、strlen、exists |
當服務器接收到一條命令請求時,須要從命令表中查找命令,而redisCommandTable命令表是一個數組,意味着查詢命令的時間複雜度爲O(N),效率低下。所以Redis在服務器初始化時,會將redisCommandTable轉換爲一個字典存儲在redisServer對象的commands字段,key爲命令名稱,value爲命令redisCommand對象。populateCommandTable函數實現了命令表從數組到字典的轉化,同時解析sflags生成flags:
void populateCommandTable(void) { int numcommands = sizeof(redisCommandTable)/sizeof(structredisCommand); for (j = 0; j < numcommands; j++) { struct redisCommand *c = redisCommandTable+j; char *f = c->sflags; while(*f != '\0') { switch(*f) { case 'w': c->flags |= CMD_WRITE; break; case 'r': c->flags |= CMD_READONLY; break; } f++; } retval1 = dictAdd(server.commands, sdsnew(c->name), c); } }
同時對於常用的命令,Redis甚至會在服務器初始化的時候將命令緩存在redisServer對象,這樣使用的時候就不須要每次都從commands字典中查找了:
struct redisServer { struct redisCommand *delCommand,*multiCommand,*lpushCommand, *lpopCommand,*rpopCommand, *sremCommand, *execCommand, *expireCommand,*pexpireCommand; }
Redis服務器是典型的事件驅動程序,而事件又分爲文件事件(socket的可讀可寫事件)與時間事件(定時任務)兩大類。不管是文件事件仍是時間事件都封裝在結構體aeEventLoop:
typedef struct aeEventLoop { int stop; aeFileEvent *events; aeFiredEvent *fired; aeTimeEvent *timeEventHead; aeBeforeSleepProc *beforesleep; aeBeforeSleepProc *aftersleep; } aeEventLoop;
stop標識事件循環是否結束;events爲文件事件數組,存儲已經註冊的文件事件;fired存儲被觸發的文件事件;Redis有多個定時任務,所以理論上應該有多個時間事件節點,多個時間事件造成鏈表,timeEventHead即爲時間事件鏈表頭結點;Redis服務器須要阻塞等待文件事件的發生,進程阻塞以前會調用beforesleep函數,進程由於某種緣由被喚醒以後會調用aftersleep函數。
事件驅動程序一般存在while/for循環,循環等待事件發生並處理,Redis也不例外,其事件循環以下:
while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); }
函數aeProcessEvents爲事件處理主函數,其第二個參數是一個標誌位,AE_ALL_EVENTS表示函數須要處理文件事件與時間事件,AE_CALL_AFTER_SLEEP表示阻塞等待文件事件以後須要執行aftersleep函數。
Redis客戶端經過TCP socket與服務端交互,文件事件指的就是socket的可讀可寫事件。 socket讀寫操做有阻塞與非阻塞之分,採用阻塞模式時,一個進程只能處理一條網絡鏈接的讀寫事件,爲了同時處理多條網絡鏈接,一般會採用多線程或者多進程,效率低下;非阻塞模式下,可使用目前比較成熟的IO多路複用模型select/epoll/kqueue等,視不一樣操做系統而定。
這裏只對epoll做簡要介紹。epoll是linux內核爲處理大量併發網絡鏈接而提出的解決方案,能顯著提高系統CPU利用率。epoll使用很是簡單,總共只有三個API,epoll_create函數建立一個epoll專用的文件描述符,用於後續epoll相關API調用;epoll_ctl函數向epoll註冊、修改或刪除須要監控的事件;epoll_wait函數會阻塞進程,直到監控的某個網絡鏈接有事件發生。
int epoll_create(int size)
輸入參數size通知內核程序指望註冊的網絡鏈接數目,內核以此判斷初始分配空間大小;注意在linux2.6.8版本之後,內核動態分配空間,此參數會被忽略。返回參數爲epoll專用的文件描述符,再也不使用時應該及時關閉此文件描述符。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
函數執行成功時返回0,不然返回-1,錯誤碼設置在變量errno;輸入參數含義以下:
struct epoll_event { __uint32_t events; epoll_data_t data; }; typedef union epoll_data { void *ptr; int fd; __uint32_t u32; __uint64_t u64; } epoll_data_t;
其中events表示須要監控的事件類型或已觸發的事件類型,比較經常使用的是EPOLLIN文件描述符可讀事件,EPOLLOUT文件描述符可寫事件;data保存與文件描述符關聯的數據。
int epoll_wait(int epfd,struct epoll_event * events,int maxevents,int timeout)
函數執行成功時返回0,不然返回-1,錯誤碼設置在變量errno;輸入參數含義以下:
Redis並無直接使用epoll提供的的API,而是同時支持四種IO多路複用模型,並將每種模型的API進一步統一封裝,由文件ae_evport.c、ae_epoll.c、ae_kqueue.c和ae_select.c實現。
static int aeApiCreate(aeEventLoop *eventLoop); static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask); static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp);
以epoll爲例,aeApiCreate函數是對epoll_create的封裝;aeApiAddEvent函數用於添加事件,是對epoll_ctl的封裝;aeApiDelEvent函數用於刪除事件,是對epoll_ctl的封裝;aeApiPoll是對epoll_wait的封裝。
四個函數輸入參數含義以下:
以epoll模型爲例,apidata字段指向的IO多路複用模型對象定義以下:
typedef struct aeApiState { int epfd; struct epoll_event *events; } aeApiState;
其中epfd函數epoll_create返回的epoll文件描述符,events存儲epoll_wait函數返回時已觸發的事件數組。
這裏只對等待事件函數aeApiPoll實現做簡要介紹:
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; //阻塞等待事件的發生 retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); if (retval > 0) { int j; numevents = retval; for (j = 0; j < numevents; j++) { int mask = 0; struct epoll_event *e = state->events+j; //轉換事件類型爲Redis定義的 if (e->events & EPOLLIN) mask |= AE_READABLE; if (e->events & EPOLLOUT) mask |= AE_WRITABLE; //記錄已發生事件到fired數組 eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; } } return numevents; }
函數首先須要經過eventLoop->apidata字段獲取到epoll模型對應的aeApiState結構體對象,才能調用epoll_wait函數等待事件的發生;而epoll_wait函數將已觸發的事件存儲到aeApiState對象的events字段,Redis再次遍歷全部已觸發事件,將其封裝在eventLoop->fired數組,數組元素類型爲結構體aeFiredEvent,只有兩個字段,fd表示發生事件的socket文件描述符,mask表示發生的事件類型,如AE_READABLE可讀事件和AE_WRITABLE可寫事件。
上面簡單介紹了epoll的使用,以及Redis對epoll等IO多路複用模型的封裝,下面咱們回到本小節的主題,文件事件。結構體aeEventLoop有一個關鍵字段events,類型爲aeFileEvent數組,存儲全部須要監控的文件事件。文件事件結構體定義以下:
typedef struct aeFileEvent { int mask; aeFileProc *rfileProc; aeFileProc *wfileProc; void *clientData; } aeFileEvent;
其中mask存儲監控的文件事件類型,如AE_READABLE可讀事件和AE_WRITABLE可寫事件;rfileProc爲函數指針,指向讀事件處理函數;wfileProc一樣爲函數指針,指向寫事件處理函數;clientData指向對應的客戶端對象。
調用aeApiAddEvent函數添加事件以前以前,首先須要調用aeCreateFileEvent函數建立對應的文件事件,並存儲在aeEventLoop結構體的events字段,aeCreateFileEvent函數簡單實現以下:
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData){ aeFileEvent *fe = &eventLoop->events[fd]; if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; fe->clientData = clientData; return AE_OK; }
Redis服務器啓動時須要建立socket並監聽,等待客戶端鏈接;客戶端與服務器創建socket鏈接以後,服務器會等待客戶端的命令請求;服務器處理完成客戶端的命令請求以後,命令回覆會暫時緩存在client結構體的buf緩衝區,待客戶端文件描述符的可寫事件發生時,纔會真正往客戶端發送命令回覆。這些都須要建立對應的文件事件:
aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL); aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c); aeCreateFileEvent(server.el, c->fd, ae_flags, sendReplyToClient, c);
能夠發現接收客戶端鏈接的處理函數爲acceptTcpHandler,此時尚未建立對應的客戶端對象,所以函數aeCreateFileEvent第四個參數爲NULL;接收客戶端命令請求的處理函數爲readQueryFromClient;向發送命令回覆的處理函數爲sendReplyToClient。
最後思考一個問題, aeApiPoll函數的第二個參數是時間結構體timeval,存儲調用epoll_wait時傳入的超時時間,那麼這個函數怎麼計算出來的呢?咱們以前提過,Redis除了要處理各類文件事件外,還須要處理不少定時任務(時間事件),那麼當Redis因爲執行epoll_wait而阻塞時,恰巧定時任務到期而須要處理怎麼辦?要回答這個問題須要分析下Redis事件循環的執行函數aeProcessEvents,函數在調用aeApiPoll以前會遍歷Redis的時間事件鏈表,查找最先會發生的時間事件,以此做爲aeApiPoll須要傳入的超時時間。
int aeProcessEvents(aeEventLoop *eventLoop, int flags) { shortest = aeSearchNearestTimer(eventLoop); long long ms = shortest->when_sec - now_sec)*1000 + shortest->when_ms - now_ms; //阻塞等待文件事件發生 numevents = aeApiPoll(eventLoop, tvp); for (j = 0; j < numevents; j++) { aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; //處理文件事件,即根據類型執行rfileProc或wfileProc } //處理時間事件 processed += processTimeEvents(eventLoop); }
1.5.1節介紹了Redis文件事件,已經知道事件循環執行函數aeProcessEvents的主要邏輯:1)查找最先會發生的時間事件,計算超時時間;2)阻塞等待文件事件的產生;3)處理文件事件;4)處理時間事件。時間事件的執行函數爲processTimeEvents。
Redis服務器內部有不少定時任務須要執行,好比說定時清除超時客戶端鏈接,定時刪除過時鍵等,定時任務被封裝爲時間事件結構體aeTimeEvent存儲,多個時間事件造成鏈表,存儲在aeEventLoop結構體的timeEventHead字段,其指向鏈表首節點。時間事件aeTimeEvent定義以下:
typedef struct aeTimeEvent { long long id; long when_sec; long when_ms; aeTimeProc *timeProc; aeEventFinalizerProc *finalizerProc; void *clientData; struct aeTimeEvent *next; } aeTimeEvent;
各字段含義以下:
時間事件執行函數processTimeEvents的處理邏輯比較簡單,只是遍歷時間事件鏈表,判斷當前時間事件是否已經到期,若是到期則執行時間事件處理函數timeProc:
static int processTimeEvents(aeEventLoop *eventLoop) { te = eventLoop->timeEventHead; while(te) { aeGetTime(&now_sec, &now_ms); if (now_sec > te->when_sec || (now_sec == te->when_sec && now_ms >= te->when_ms)) { //處理時間事件 retval = te->timeProc(eventLoop, id, te->clientData); //從新設置時間事件到期時間 if (retval != AE_NOMORE) { aeAddMillisecondsToNow(retval, &te->when_sec,&te->when_ms); } } te = te->next; } }
注意時間事件處理函數timeProc返回值retval,其表示此時間事件下次應該被觸發的時間,單位毫秒,且是一個相對時間,即從當前時間算起,retval毫秒後此時間事件會被觸發。
其實Redis只有一個時間事件節點,看到這裏讀者可能會有疑惑,服務器內部不是有不少定時任務嗎,爲何只有一個時間事件呢?回答此問題以前咱們須要先分析這個惟一的時間事件節點。Redis建立時間事件節點的函數爲aeCreateTimeEvent,內部實現很是簡單,只是建立時間事件節點並添加到時間事件鏈表。aeCreateTimeEvent函數定義以下:
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc);
其中輸入參數eventLoop指向事件循環結構體;milliseconds表示此時間事件觸發時間,單位毫秒,注意這是一個相對時間,即從當前時間算起,milliseconds毫秒後此時間事件會被觸發;proc指向時間事件的處理函數;clientData指向對應的結構體對象;finalizerProc一樣是函數指針,刪除時間事件節點以前會調用此函數。
讀者能夠在代碼目錄全局搜索aeCreateTimeEvent,會發現確實只建立了一個時間事件節點:
aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
該時間事件在1毫秒後會被觸發,處理函數爲serverCron,參數clientData與finalizerProc都爲NULL。而函數serverCron實現了Redis服務器全部定時任務的週期執行。
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { run_with_period(100) { //100毫秒週期執行 } run_with_period(5000) { //5000毫秒週期執行 } //清除超時客戶端連接 clientsCron(); //處理數據庫 databasesCron(); server.cronloops++; return 1000/server.hz; }
變量server.cronloops用於記錄serverCron函數的執行次數,變量server.hz表示serverCron函數的執行頻率,用戶可配置,最小爲1最大爲500,默認爲10。假設server.hz取默認值10,函數返回1000/server.hz會更新當前時間事件的觸發時間爲100毫秒後,即serverCron的執行週期爲100毫秒。run_with_period宏定義實現了定時任務按照指定時間週期執行,其會被替換爲一個if條件判斷,條件爲真纔會執行定時任務,定義以下:
#define run_with_period(_ms_) if ((_ms_ <= 1000/server.hz) || !(server.cronloops%((_ms_)/(1000/server.hz))))
另外咱們能夠看到serverCron函數會無條件執行某些定時任務,好比清除超時客戶端鏈接,以及處理數據庫(清除數據庫過時鍵等)。須要特別注意一點,serverCron函數的執行時間不能過長,不然會致使服務器不能及時響應客戶端的命令請求。以過時鍵刪除爲例,分析下Redis是如何保證serverCron函數的執行時間。過時鍵刪除由函數activeExpireCycle實現,由函數databasesCron調用,其函數是實現以下:
#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 void activeExpireCycle(int type) { timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100; timelimit_exit = 0; for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) { do { //查找過時鍵並刪除 if ((iteration & 0xf) == 0) { elapsed = ustime()-start; if (elapsed > timelimit) { timelimit_exit = 1; break; } } }while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4) } }
函數activeExpireCycle最多遍歷dbs_per_call個數據庫,並記錄每一個數據庫刪除的過時鍵數目;當刪除過時鍵數目大於門限時,認爲此數據庫過時鍵較多,須要再次處理。考慮到極端狀況,當數據庫鍵數目很是多且基本都過時時,do-while循環會一直執行下去。所以咱們添加timelimit時間限制,每執行16次do-while循環,檢測函數activeExpireCycle執行時間是否超過timelimit,若是超過則強制結束循環。
初看timelimit的計算方式可能會比較疑惑,其計算結果使得函數activeExpireCycle的總執行時間佔CPU時間的25%。仍然假設server.hz取默認值10,即每秒鐘函數activeExpireCycle執行10次,那麼每秒鐘函數activeExpireCycle的總執行時間爲100000025/100,每次函數activeExpireCycle的執行時間爲100000025/100/10,單位微妙。
上一節咱們講述了客戶端,服務端,事件處理等基礎知識,下面開始學習Redis服務器的啓動過程,這裏主要分爲server初始化,監聽端口以及等待命令三個小節。
服務器初始化主流程能夠簡要分爲7個步驟:1)初始化配置,包括用戶可配置的參數,以及命令表的初始化;2)加載並解析配置文件;3)初始化服務端內部變量,其中就包括數據庫;4)建立事件循環eventLoop;5)建立socket並啓動監聽;6)建立文件事件與時間事件;7)開啓事件循環。下面詳細介紹步驟1~4,至於步驟5~7將會在2.2小節介紹。
圖-2 server初始化流程
步驟1)初始化配置,由函數initServerConfig實現,其實就是給配置參數賦初始值:
void initServerConfig(void) { //serverCron函數執行頻率,默認10 server.hz = CONFIG_DEFAULT_HZ; //監聽端口,默認6379 server.port = CONFIG_DEFAULT_SERVER_PORT; //最大客戶端數目,默認10000 server.maxclients = CONFIG_DEFAULT_MAX_CLIENTS; //客戶端超時時間,默認0,即永不超時 server.maxidletime = CONFIG_DEFAULT_CLIENT_TIMEOUT; //數據庫數目,默認16 server.dbnum = CONFIG_DEFAULT_DBNUM; //初始化命令表,1.4小節已經講過,這裏再也不詳述 populateCommandTable(); ………… }
步驟2)加載並解析配置文件,入口函數爲loadServerConfig,函數聲明以下:
void loadServerConfig(char *filename, char *options)
輸入參數filename表示配置文件全路徑名稱,options表示命令行輸入的配置參數,例如咱們一般以如下命令啓動Redis服務器:
/home/user/redis/redis-server /home/user/redis/redis.conf -p 4000
使用GDB啓動redis-server,打印函數 loadServerConfig輸入參數以下:
(gdb) p filename $1 = 0x778880 "/home/user/redis/redis.conf" (gdb) p options $2 = 0x7ffff1a21d33 "\"-p\" \"4000\" "
Redis的配置文件語法相對簡單,每一行是一條配置,格式如「配置 參數1 [參數2] [……]」,加載配置文件只須要一行一行將文件內容讀取到內存中便可,GDB打印加載到內存中的配置以下:
(gdb) p config "bind 127.0.0.1\n\nprotected-mode yes\n\nport 6379\ntcp-backlog 511\n\ntcp-keepalive 300\n\n………"
加載完成後會調用loadServerConfigFromString函數解析配置,輸入參數config即配置字符串,實現以下:
void loadServerConfigFromString(char *config) { //分割配置字符串多行,totlines記錄行數 lines = sdssplitlen(config,strlen(config),"\n",1,&totlines); for (i = 0; i < totlines; i++) { //跳過註釋行與空行 if (lines[i][0] == '#' || lines[i][0] == '\0') continue; argv = sdssplitargs(lines[i],&argc); //解析配置參數 //賦值 if (!strcasecmp(argv[0],"timeout") && argc == 2) { server.maxidletime = atoi(argv[1]); }else if (!strcasecmp(argv[0],"port") && argc == 2) { server.port = atoi(argv[1]); } //其餘配置 } }
函數首先將輸入配置字符串以「n」爲分隔符劃分爲多行,totlines記錄總行數,lines數組存儲分割後的配置,數組元素類型爲字符串SDS;for循環遍歷全部配置行,解析配置參數,並根據參數內容設置結構體server各字段。注意Redis配置文件中行開始「#」字符標識本行內容爲註釋,解析時須要跳過。
步驟3)初始化服務器內部變量,好比客戶端鏈表,數據庫,全局變量共享對象等;入口函數爲initServer,函數邏輯相對簡單,這裏只作簡要說明;
void initServer(void) { server.clients = listCreate(); //初始化客戶端鏈表 //建立數據庫字典 server.db = zmalloc(sizeof(redisDb)*server.dbnum); for (j = 0; j < server.dbnum; j++) { server.db[j].dict = dictCreate(&dbDictType,NULL); ………… } }
注意數據庫字典的dictType指向的是結構體dbDictType,其中定義了數據庫字典鍵的哈希函數,鍵比較函數,以及鍵與值的析構函數,定義以下:
dictType dbDictType = { dictSdsHash, NULL, NULL, dictSdsKeyCompare, dictSdsDestructor, dictObjectDestructor };
數據庫的鍵都是SDS類型,鍵哈希函數爲dictSdsHash,,鍵比較函數爲dictSdsKeyCompare,鍵析構函數爲dictSdsDestructor;數據庫的值是robj對象,值析構函數爲dictObjectDestructor;鍵和值的內容賦值函數都爲NULL。
1.1節提到對象robj的refcount字段存儲當前對象的引用次數,意味着對象是能夠共享的。要注意的是,只有當對象robj存儲的是0~10000之內的整數,對象robj纔會被共享,且這些共享整數對象的引用計數初始化爲INT_MAX,保證不會被釋放。執行命令時Redis會返回一些字符串回覆,這些字符串對象一樣在服務器初始化時建立,且永遠不會嘗試釋放這類對象。全部共享對象都存儲在全局結構體變量shared。
void createSharedObjects(void) { //建立命令回覆字符串對象 shared.ok = createObject(OBJ_STRING,sdsnew("+OK\r\n")); shared.err = createObject(OBJ_STRING,sdsnew("-ERR\r\n")); //建立0~10000整數對象 for (j = 0; j < OBJ_SHARED_INTEGERS; j++) { shared.integers[j] = makeObjectShared(createObject(OBJ_STRING,(void*)(long)j)); shared.integers[j]->encoding = OBJ_ENCODING_INT; } }
步驟4)建立事件循環eventLoop,即分配結構體所需內存,並初始化結構體各字段;epoll就是在此時建立的:
aeEventLoop *aeCreateEventLoop(int setsize) { if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err; eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); if (aeApiCreate(eventLoop) == -1) goto err; }
輸入參數setsize理論上等於用戶配置的雖大客戶端數目便可,可是爲了確保安全,這裏設置setsize等於最大客戶端數目加128。函數aeApiCreate內部調用epoll_create建立epoll,並初始化結構體eventLoop的字段apidata。
上節介紹了服務器初始化的前面4個步驟,初始化配置;加載並解析配置文件;初始化服務端內部遍歷,包括數據庫,全局共享變量等;建立時間循環eventLoop。完成這些操做以後,Redis將建立socket並啓動監聽,同時建立對應的文件事件與時間事件並開始事件循環。下面將詳細介紹步驟5~7。
步驟5)建立socket並啓動監聽;
用戶可經過指令port配置socket綁定端口號,指令bind配置socket綁定IP地址;注意指令bind可配置多個IP地址,中間用空格隔開;建立socket時只須要循環全部IP地址便可。
int listenToPort(int port, int *fds, int *count) { for (j = 0; j < server.bindaddr_count || j == 0; j++) { //建立socket並啓動監聽,文件描述符存儲在fds數組做爲返回參數 fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j], server.tcp_backlog); //設置socket非阻塞 anetNonBlock(NULL,fds[*count]); (*count)++; } }
輸入參數port表示用戶配置的端口號,server結構體的bindaddr_count字段存儲用戶配置的IP地址數目,bindaddr字段存儲用戶配置的全部IP地址。函數anetTcpServer實現了socket的建立,綁定,以及監聽流程,這裏不作過多詳述。參數fds與count可用做輸出參數,fds數組存儲建立的全部socket文件描述符,count存儲socket數目。
注意到全部建立的socket都會設置爲非阻塞模式,緣由在於Redis使用了IO多路複用模式,其要求socket讀寫必須是非阻塞的,函數anetNonBlock經過系統調用fcntl設置socket非阻塞模式。
步驟6)建立文件事件與時間事件;
步驟5中已經完成了socket的建立與監聽,1.5.1節提到socket的讀寫事件被抽象爲文件事件,由於對於監聽的socket還須要建立對應的文件事件。
for (j = 0; j < server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR){ } }
server結構體的ipfd_count字段存儲建立的監聽socket數目,ipfd數組存儲建立的全部監聽socket文件描述符,須要循環全部的監聽socket,爲其建立對應的文件事件。能夠看到監聽事件的處理函數爲acceptTcpHandler,實現了socket鏈接請求的accept,以及客戶端對象的建立。
1.5.2小節提到定時任務被抽象爲時間事件,且Redis只建立了一個時間事件,在服務端初始化時建立。此時間事件的處理函數爲serverCron,初次建立時1毫秒後備觸發。
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { exit(1); }
步驟7)開啓事件循環;
前面6個步驟已經完成了服務端的初始化工做,並在指定IP地址、端口監聽客戶端鏈接,同時建立了文件事件與時間事件;此時只須要開啓事件循環等待事件發生便可。
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; //開始事件循環 while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); //事件處理主函數 aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); } }
事件處理主函數aeProcessEvents已經詳細介紹過,這裏須要重點關注函數beforesleep,其在每次事件循環開始,即Redis阻塞等待文件事件以前執行。函數beforesleep會執行一些不是很費時的操做,集羣相關操做,過時鍵刪除操做(這裏可稱爲快速過時鍵刪除),向客戶端返回命令回覆等。這裏簡要介紹下快速過時鍵刪除操做。
void beforeSleep(struct aeEventLoop *eventLoop) { if (server.active_expire_enabled && server.masterhost == NULL) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); }
Redis過時鍵刪除有兩種策略:1)訪問數據庫鍵時,校驗該鍵是否過時,若是過時則刪除;2)週期性刪除過時鍵,beforeSleep函數與serverCron函數都會執行。server結構體的active_expire_enabled字段表示是否開啓週期性刪除過時鍵策略,用戶可經過set-active-expire指令配置;masterhost字段存儲當前Redis服務器的master服務器的域名,若是爲NULL說明當前服務器不是某個Redis服務器的slaver。注意到這裏依然是調用函數activeExpireCycle執行過時鍵刪除,只是參數傳遞的是ACTIVE_EXPIRE_CYCLE_FAST,表示快速過時鍵刪除。
回顧下1.5.2節講述函數activeExpireCycle的實現,函數計算出timelimit,即函數最大執行時間,循環刪除過時鍵時會校驗函數執行時間是否超過此限制,超過則結束循環。顯然快速過時鍵刪除時只須要縮短timelimit便可,計算策略以下:
void activeExpireCycle(int type) { static int timelimit_exit = 0; static long long last_fast_cycle = 0 if (type == ACTIVE_EXPIRE_CYCLE_FAST) { //上次activeExpireCycle函數是否已經執行完畢 if (!timelimit_exit) return; //當前時間距離上次執行快速過時鍵刪除是否已經超過2000微妙 if (start < last_fast_cycle + 1000*2) return; last_fast_cycle = start; } //快速過時鍵刪除時,函數執行時間不超過1000微妙 if (type == ACTIVE_EXPIRE_CYCLE_FAST) timelimit = 1000; }
執行快速過時鍵刪除有不少限制,當函數activeExpireCycle正在執行時直接返回;當上次執行快速過時鍵刪除的時間距離當前時間小於2000微妙時直接返回。思考下爲何能夠經過變量timelimit_exit判斷函數activeExpireCycle是否正在執行呢?注意到變量timelimit_exit聲明爲static,即函數執行完畢不會釋放變量空間。那麼能夠在函數activeExpireCycle入口賦值timelimit_exit爲0,返回以前賦值timelimit_exit爲1,由此即可經過變量timelimit_exit判斷函數activeExpireCycle是否正在執行。變量last_fast_cycle聲明爲static也是一樣的緣由。同時能夠看到當執行快速過時鍵刪除時,設置函數activeExpireCycle的最大執行時間爲1000微妙。
函數aeProcessEvents爲事件處理主函數,首先查找最近發生的時間事件,調用epoll_wait阻塞等待文件事件的發生並設置超時事件;待epoll_wait返回時,處理觸發的文件事件;最後處理時間事件。步驟6中已經建立了文件事件,爲監聽socket的讀事件,事件處理函數爲acceptTcpHandler,即當客戶端發起socket鏈接請求時,服務端會執行函數acceptTcpHandler處理。acceptTcpHandler函數主要作了兩件事:1)accept客戶端的鏈接請求;2)建立客戶端對象;3)建立文件事件。步驟2與步驟3由函數createClient實現,輸入參數fd爲accept客戶端鏈接請求後生成的socket文件描述符。
client *createClient(int fd) { client *c = zmalloc(sizeof(client)); //設置socket爲非阻塞模式 anetNonBlock(NULL,fd); //設置TCP_NODELAY anetEnableTcpNoDelay(NULL,fd); //若是服務端配置了tcpkeepalive,則設置SO_KEEPALIVE if (server.tcpkeepalive) anetKeepAlive(NULL,fd,server.tcpkeepalive); if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR){ } }
爲了使用IO多路複用模式,此處一樣須要設置socket爲非阻塞模式。
TCP是基於字節流的可靠傳輸層協議,爲了提高網絡利用率,通常默認都會開啓Nagle。當應用層調用write函數發送數據時,TCP並不必定會馬上將數據發送出去,根據Nagle算法,還必須知足必定條件才行。Nagle是這樣規定的:若是數據包長度大於必定門限時,則當即發送;若是數據包中含有FIN(表示斷開TCP連接)字段,則當即發送;若是當前設置了TCP_NODELAY選項,則當即發送;若是全部條件都不知足,默認須要等待200毫秒超時後纔會發送。Redis服務器向客戶端返回命令回覆時,但願TCP能當即將該回復發送給客戶端,所以須要設置TCP_NODELAY。思考下若是不設置會怎麼樣呢?從客戶端分析,命令請求的響應時間會大大加長。
TCP是可靠的傳輸層協議,每次都須要經歷三次握手與四次揮手,爲了提高效率,能夠設置SO_KEEPALIVE,即TCP長鏈接,這樣TCP傳輸層會定時發送心跳包確認該鏈接的可靠性。應用層也再也不須要頻繁的建立於釋放TCP鏈接了。server結構體的tcpkeepalive字段表示是否啓用TCP長鏈接,用戶可經過參數tcp-keepalive配置。
接收到客戶端鏈接請求以後,服務器須要建立文件事件等待客戶端的命令請求,能夠看到文件事件的處理函數爲readQueryFromClient,當服務器接收到客戶端的命令請求時,會執行此此函數。
上一節分析了服務器的啓動過程,包括配置文件的解析,建立socket並啓動監聽,建立文件事件與時間事件並開啓事件循環。服務器啓動完成後,只須要等待客戶端鏈接併發送命令請求便可。本小節主要介紹命令的處理過程,能夠分爲三個階段,解析命令請求,命令調用和返回結果給客戶端。
TCP是一種基於字節流的傳輸層通訊協議,所以接收到的TCP數據不必定是一個完整的數據包,其有多是多個數據包的組合,也有多是某一個數據包的部分,這種現象被稱爲半包與粘包。如圖-3所示。
圖-3 TCP半包與粘包
客戶端應用層分別發送三個數據包,data三、data2和data1,可是TCP傳輸層在真正發送數據時,將data3數據包分割爲data3_1與data3_2,而且將data1與data2數據合併,此時服務器接收到的數據包就不是一個完整的數據包。
爲了區分一個完整的數據包,一般有以下三種方法:1)數據包長度固定;2)經過特定的分隔符區分,好比HTTP協議就是經過換行符區分的;3)經過在數據包頭部設置長度長度字段區分數據包長度,好比FastCGI協議。
Redis採用自定義協議格式實現不一樣命令請求的區分,例如當用戶在redis-cli客戶端鍵入下面命令:
SET redis-key value1 vlaue2 value3
客戶端會將該命令請求轉換爲如下協議格式,而後發送給服務器:
*5\r\n$3\r\n$9redis-key\r\n$6value1\r\n$6vlaue2\r\n$6value3\r\n
其中,換行符rn用於區分命令請求的若干參數,「*5」表示該命令請求有5個參數,「$3」、「$9」和「$6」等表示該參數字符串長度,多個請求參數之間用「rn」分隔開
須要注意的是,Redis還支持在telnet會話輸入命令的方式,只是此時沒有了請求協議中的「*」來聲明參數的數量,所以必須使用空格來分割各個參數,服務器在接收到數據以後,會將空格做爲參數分隔符解析命令請求。這種方式的命令請求稱爲內聯命令。
Redis服務器接收到的命令請求首先存儲在客戶端對象的querybuf輸入緩衝區,而後解析命令請求各個參數,並存儲在客戶端對象的argv(參數對象數組)和argc(參數數目)字段。參考2.2小節能夠知道解析客戶端命令請求的入口函數爲readQueryFromClient,會讀取socket數據存儲到客戶端對象的輸入緩衝區,並調用函數processInputBuffer解析命令請求。processInputBuffer函數主要邏輯如圖-4所示。
圖-4 命令解析流程圖
下面簡要分析經過redis-cli客戶端發送的命令請求的解析過程。假設客戶端命令請求爲「SET redis-key value1」,在函數processMultibulkBuffer添加斷點,GDB打印客戶端輸入緩衝區內容以下:
(gdb) p c->querybuf $3 = (sds) 0x7ffff1b45505 "*3\r\n$3\r\nSET\r\n$9\r\nredis-key\r\n$6\r\nvalue1\r\n"
解析該命令請求能夠分爲2個步驟,1)解析命令請求參數數目;2)循環解析每一個請求參數。下面詳細分析每一個步驟的源碼實現
步驟1)解析命令請求參數數目;
querybuf指向命令請求首地址,命令請求參數數目的協議格式爲「3rn」,即首字符必須是「」,而且可使用字符「r」定位到行尾位置;解析後的參數數目暫存在客戶端對象的multibulklen字段,表示等待解析的參數數目,變量pos記錄已解析命令請求的長度。
//定位到行尾 newline = strchr(c->querybuf,'\r'); //解析命令請求參數數目,並存儲在客戶端對象的multibulklen字段 serverAssertWithInfo(c,NULL,c->querybuf[0] == '*'); string2ll(c->querybuf+1,newline-(c->querybuf+1),&ll); c->multibulklen = ll; //記錄已解析位置偏移量 pos = (newline-c->querybuf)+2; //分配請求參數存儲空間 c->argv = zmalloc(sizeof(robj*)*c->multibulklen);
GDB打印主要變量內容以下:
(gdb) p c->multibulklen $9 = 3 (gdb) p pos $10 = 4
步驟2)循環解析每一個請求參數:
命令請求各參數的協議格式爲「$3\r\nSET\r\n」,即首字符必須是「$」。解析當前參數以前須要解析出參數的字符串長度,可使用字符「r」定位到行尾位置;注意到解析參數長度時,字符串開始位置爲querybuf+pos+1;字符串參數長度暫存在客戶端對象的bulklen字段,同時更新已解析字符串長度pos。
//定位到行尾 newline = strchr(c->querybuf+pos,'\r'); //解析當前參數字符串長度,字符串首字符偏移量爲pos if (c->querybuf[pos] != '$') { return C_ERR; } ok = string2ll(c->querybuf+pos+1,newline-(c->querybuf+pos+1),&ll); pos += newline-(c->querybuf+pos)+2; c->bulklen = ll;
GDB打印主要變量內容以下:
(gdb) p c->querybuf+pos $13 = 0x7ffff1b4550d "SET\r\n$9\r\nredis-key\r\n$6\r\nvalue1\r\n" (gdb) p c->bulklen $15 = 3 (gdb) p pos $16 = 8
解析出參數字符串長度以後,可直接讀取該長度的參數內容,並建立字符串對象;同時須要更新待解析參數multibulklen。
//解析參數 c->argv[c->argc++] = createStringObject(c->querybuf+pos,c->bulklen); pos += c->bulklen+2; //待解析參數數目減一 c->multibulklen--;
當multibulklen值更新尾0時,說明參數解析完成,結束循環。讀者能夠思考下,待解析參數數目,當前參數長度爲何都須要暫存在客戶端結構體,使用函數局部變量行不行?確定是不行的,緣由就在於上面提到的TCP半包與粘包現象,服務器可能只接收到部分命令請求,例如「3rn$3\r\nSET\r\n$9rnredis」。當函數processMultibulkBuffer執行完畢時,一樣只會解析部分命令請求「3rn$3\r\nSET\r\n$9rn」,此時就須要記錄該命令請求待解析的參數數目,以及待解析參數的長度;而剩餘待解析的參數「redis」會繼續緩存在客戶端的輸入緩衝區。
參考圖-4,解析完成命令請求以後,會調用函數processCommand處理該命令請求,而處理命令請求以前還有不少校驗邏輯,好比說客戶端是否已經完成認證,命令請求參數是否合法等。下面簡要列出若干校驗規則。
校驗1)若是是quit命令直接返回並關閉客戶端;
if (!strcasecmp(c->argv[0]->ptr,"quit")) { addReply(c,shared.ok); c->flags |= CLIENT_CLOSE_AFTER_REPLY; return C_ERR; }
校驗2)執行函數lookupCommand查找命令後,若是命令不存在返回錯誤;
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); if (!c->cmd) { addReplyErrorFormat(c,"unknown command '%s'",(char*)c->argv[0]->ptr); return C_OK; }
校驗3)若是命令參數數目不合法,返回錯誤。命令結構體的arity用於校驗參數數目是否合法,當arity小於0時,表示命令參數數目大於等於arity;當arity大於0時,表示命令參數數目必須爲arity;注意命令請求中命令的名稱自己也是一個參數。
if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || (c->argc < -c->cmd->arity)) { addReplyErrorFormat(c,"wrong number of arguments for '%s' command", c->cmd->name); return C_OK; }
校驗4)若是使用指令「requirepass password」設置了密碼,且客戶端沒未認證經過,只能執行auth命令,auth命令格式爲「AUTH password」。
if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand){ addReply(c,shared.noautherr); return C_OK; }
校驗5)若是使用指令「maxmemory <bytes>」設置了最大內存限制,且當前內存使用量超過了該配置門限,服務器會拒絕執行帶有「m」(CMD_DENYOOM)標識的命令,如SET命令、APPEND命令和LPUSH命令等。命令標識參見1.4小節。
if (server.maxmemory) { int retval = freeMemoryIfNeeded(); if ((c->cmd->flags & CMD_DENYOOM) && retval == C_ERR) { addReply(c, shared.oomerr); return C_OK; } }
校驗6)除了上面的5種校驗,還有不少校驗規則,好比集羣相關校驗,持久化相關校驗,主從複製相關校驗,發佈訂閱相關校驗,以及事務操做等。這些校驗規則會在相關章節會做詳細介紹。
當全部校驗規則都經過後,纔會調用命令處理函數執行命令,代碼以下:
start = ustime(); c->cmd->proc(c); duration = ustime()-start; //更新統計信息:當前命令執行時間與調用次數 c->lastcmd->microseconds += duration; c->lastcmd->calls++; //記錄慢查詢日誌 slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
執行命令完成後,若是有必要,還須要更新統計信息,記錄慢查詢日誌,AOF持久化該命令請求,傳播命令請求給全部的從服務器等。持久化與主從複製會在相關章節會做詳細介紹,這裏主要介紹慢查詢日誌的實現方式。
void slowlogPushEntryIfNeeded(client *c, robj **argv, int argc, long long duration) { //執行時間超過門限,記錄該命令 if (duration >= server.slowlog_log_slower_than) listAddNodeHead(server.slowlog, slowlogCreateEntry(c,argv,argc,duration)); //慢查詢日誌最多記錄條數爲slowlog_max_len,超過需刪除 while (listLength(server.slowlog) > server.slowlog_max_len) listDelNode(server.slowlog,listLast(server.slowlog)); }
可使用指令「slowlog-log-slower-than 10000」配置執行時間超過多少毫秒纔會記錄慢查詢日誌,指令「slowlog-max-len 128」配置慢查詢日誌最大數目,超過會刪除最先的日誌記錄。能夠看到慢查詢日誌記錄在服務端結構體的slowlog字段,即存儲速度很是快,並不會影響命令執行效率。用戶可經過「SLOWLOG subcommand [argument]」命令查看服務器記錄的慢查詢日誌。
Redis服務器返回結果類型不一樣,協議格式不一樣,而客戶端能夠根據返回結果的第一個字符判斷返回類型。Redis的返回結果能夠分爲5類:
addReply(c, ok_reply ? ok_reply : shared.ok);
變量ok_reply一般爲NULL,則返回的是共享變量shared.ok,在服務器啓動時就完成了共享變量的初始化。
shared.ok = createObject(OBJ_STRING,sdsnew("+OK\r\n"));
addReplyErrorFormat(c,"unknown command '%s'",(char*)c->argv[0]->ptr);
而函數addReplyErrorFormat內部實現會拼裝錯誤回覆字符串。
addReplyString(c,"-ERR ",5); addReplyString(c,s,len); addReplyString(c,"\r\n",2);
addReply(c,shared.colon); addReply(c,new); addReply(c,shared.crlf);
其中共享變量shared.colon與shared.crlf一樣都是在服務器啓動時就完成了初始化。
shared.colon = createObject(OBJ_STRING,sdsnew(":")); shared.crlf = createObject(OBJ_STRING,sdsnew("\r\n"));
//計算返回對象obj長度,並拼接爲字符串「$5\r\n」 addReplyBulkLen(c,obj); addReply(c,obj); addReply(c,shared.crlf);
//拼接返回值數目「*3\r\n」 addReplyMultiBulkLen(c,rangelen); //循環輸出全部返回值 while(rangelen--) { //拼接當前返回值長度「$6\r\n」 addReplyLongLongWithPrefix(c,len,'$'); addReplyString(c,p,len); addReply(c,shared.crlf); }
能夠看到5種類型的返回結果都是調用相似於addReply函數返回的,那麼是這些方法將返回結果發送給客戶端的嗎?其實不是。回顧1.2小節講述的客戶端結構體client,其中有兩個關鍵字段reply和buf,分別表示輸出鏈表與輸出緩衝區,而函數addReply會直接或者間接的調用如下兩個函數將返回結果暫時緩存在reply或者buf字段。
//添加字符串都輸出緩衝區 int _addReplyToBuffer(client *c, const char *s, size_t len) //添加各類類型的對象到輸出鏈表 void _addReplyObjectToList(client *c, robj *o) void _addReplySdsToList(client *c, sds s) void _addReplyStringToList(client *c, const char *s, size_t len)
須要特別注意的是,reply和buf字段不可能同時緩存待返回給客戶端的數據。從客戶端結構體的sentlen字段就能看出,當輸出數據緩存在reply字段時,sentlen表示已返回給客戶端的對象數目;當輸出數據緩存在buf字段時,sentlen表示已返回給客戶端的字節數目。那麼當reply和buf字段同時緩存有輸出數據呢?只有sentlen字段顯然是不夠的。從_addReplyToBuffer函數實現一樣能夠看出該結論。
int _addReplyToBuffer(client *c, const char *s, size_t len) { if (listLength(c->reply) > 0) return C_ERR; }
調用函數_addReplyToBuffer緩存數據到輸出緩衝區時,若是檢測到reply字段有待返回給客戶端的數據,函數返回錯誤。而一般緩存數據時都會先嚐試緩存到buf輸出緩衝區,若是失敗會再次嘗試緩存到reply輸出鏈表。
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK) _addReplyObjectToList(c,obj);
而函數addReply在將待返回給客戶端的數據暫時緩存在輸出緩衝區或者輸出鏈表的同時,會將當前客戶端添加到服務端結構體的clients_pending_write鏈表,以便後續能快速查找出哪些客戶端有數據須要發送。
listAddNodeHead(server.clients_pending_write,c);
看到這裏讀者可能會有疑問,函數addReply只是將待返回給客戶端的數據暫時緩存在輸出緩衝區或者輸出鏈表,那麼何時將這些數據發送給客戶端呢?讀者是否還記得在介紹開啓事件循環時,提到函數beforesleep在每次事件循環阻塞等待文件事件以前執行,主要執行一些不是很費時的操做,好比過時鍵刪除操做,向客戶端返回命令回覆等。
函數beforesleep會遍歷clients_pending_write鏈表中每個客戶端節點,併發送輸出緩衝區或者輸出鏈表中的數據。
//遍歷clients_pending_write鏈表 listRewind(server.clients_pending_write,&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); listDelNode(server.clients_pending_write,ln); //向客戶端發送數據 if (writeToClient(c->fd,c,0) == C_ERR) continue; }
看到這裏我想大部分讀者可能都會認爲返回結果已經發送給客戶端,命令請求也已經處理完成了。其實否則,讀者能夠思考這麼一個問題,當返回結果數據量很是大時,是沒法一次性將全部數據都發送給客戶端的,即函數writeToClient執行以後,客戶端輸出緩衝區或者輸出鏈表中可能還有部分數據未發送給客戶端。這時候怎麼辦呢?很簡單,只須要添加文件事件,監聽當前客戶端socket文件描述符的可寫事件便可。
if (aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, sendReplyToClient, c) == AE_ERR){ }
能夠看到該文件事件的事件處理函數爲sendReplyToClient,即當客戶端可寫時,函數sendReplyToClient會發送剩餘部分的數據給客戶端。
至此,命令請求才算是真正處理完成了。
爲了更好的理解服務器與客戶端的交互,本文首先介紹了一些基礎結構體,如對象結構體robj,客戶端結構體client,服務端結構體redisServer以及命令結構體redisCommand。Redis服務器是典型的事件驅動程序,將事件處理分爲兩大類:文件事件與時間事件。文件事件即socket的可讀可寫事件,時間事件即須要週期性執行的一些定時任務。Redis採用比較成熟的IO多路複用模型(select/epoll等)處理文件事件,並對這些IO多路複用模型作了簡單封裝。Redis服務器只維護了一個時間事件節點,該時間事件處理函數爲serverCron,執行了全部須要週期性執行的一些定時任務。事件是理解Redis的基石,但願讀者能認真學習。最後本文介紹了服務器處理客戶端命令請求的整個流程,包括服務器啓動監聽,接收命令請求並解析,執行命令請求,返回命令回覆等。