咱們這裏先研究redis-server端的網絡通訊模塊。除去Redis自己的業務功能之外,Redis的網絡通訊模塊實現思路和細節很是有表明性。因爲網絡通訊模塊的設計也是Linux C++後臺開發一個很重要的模塊,雖然網絡上有不少現成的網絡庫,可是簡單易學且能夠做爲典範的並很少,而redis-server就是這方面值得借鑑學習的材料之一。node
*將偵聽socket綁定到須要的IP地址和端口上(調用Soket API bind函數);api
*啓動偵聽(調用socket API listen函數);數組
*無限等待客戶端鏈接到來,調用Socket API accept函數接受客戶端鏈接,並稱生一個與該客戶端對應的客戶端socket;服務器
static int (char *err, int s, struct sockaddr *sa, socklen_t len, int backlog) { if (bind(s,sa,len) == -1) { anetSetError(err, "bind: %s", strerror(errno)); close(s); return ANET_ERR; } if (listen(s, backlog) == -1) { anetSetError(err, "listen: %s", strerror(errno)); close(s); return ANET_ERR; } return ANET_OK; }
(gdb) b anetListen Breakpoint 1 at 0x555555588620: file anet.c, line 440. (gdb) info breakpoints Num Type Disp Enb Address What 1 breakpoint keep y 0x0000555555588620 in anetListen at anet.c:440 (gdb) r The program being debugged has been started already. Start it from the beginning? (y or n) y Starting program: /home/wzq/Desktop/redis-5.0.3/src/redis-server [Thread debugging using libthread_db enabled] Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1". 11580:C 14 Jan 2019 11:27:06.118 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo 11580:C 14 Jan 2019 11:27:06.118 # Redis version=5.0.3, bits=64, commit=00000000, modified=0, pid=11580, just started 11580:C 14 Jan 2019 11:27:06.118 # Warning: no config file specified, using the default config. In order to specify a config file use /home/wzq/Desktop/redis-5.0.3/src/redis-server /path/to/redis.conf 11580:M 14 Jan 2019 11:27:06.119 * Increased maximum number of open files to 10032 (it was originally set to 1024). Breakpoint 1, anetListen (err=0x5555559161e0 <server+576> "", s=6, sa=0x555555b2b240, len=28, backlog=511) at anet.c:440 warning: Source file is more recent than executable. 440 static int (char *err, int s, struct sockaddr *sa, socklen_t len, int backlog) { (gdb)
(gdb) bt #0 anetListen (err=0x5555559161e0 <server+576> "", s=6, sa=0x555555b2b240, len=28, backlog=511) at anet.c:440 #1 0x00005555555887a4 in _anetTcpServer (err=0x5555559161e0 <server+576> "", port=<optimized out>, bindaddr=<optimized out>, af=10, backlog=511) at anet.c:487 #2 0x000055555558cf07 in listenToPort (port=6379, fds=0x55555591610c <server+364>, count=0x55555591614c <server+428>) at server.c:1924 #3 0x0000555555591ed0 in initServer () at server.c:2055 #4 0x0000555555585103 in main (argc=<optimized out>, argv=0x7fffffffccc8) at server.c:4160
static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog) { int s = -1, rv; char _port[6]; /* strlen("65535") */ struct addrinfo hints, *servinfo, *p; snprintf(_port,6,"%d",port); memset(&hints,0,sizeof(hints)); hints.ai_family = af; hints.ai_socktype = SOCK_STREAM; hints.ai_flags = AI_PASSIVE; /* No effect if bindaddr != NULL */ if ((rv = getaddrinfo(bindaddr,_port,&hints,&servinfo)) != 0) { anetSetError(err, "%s", gai_strerror(rv)); return ANET_ERR; } for (p = servinfo; p != NULL; p = p->ai_next) { if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1) continue; if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error; if (anetSetReuseAddr(err,s) == ANET_ERR) goto error; if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) s = ANET_ERR; goto end; } if (p == NULL) { anetSetError(err, "unable to bind socket, errno: %d", errno); goto error; } error: if (s != -1) close(s); s = ANET_ERR; end: freeaddrinfo(servinfo); return s; }
將堆棧切換至#1,而後輸入info arg查看傳入給這個額函數的參數。
(gdb) f 1 #1 0x00005555555887a4 in _anetTcpServer (err=0x5555559161e0 <server+576> "", port=<optimized out>, bindaddr=<optimized out>, af=10, backlog=511) at anet.c:487 487 if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) s = ANET_ERR; (gdb) info args err = 0x5555559161e0 <server+576> "" port = <optimized out> bindaddr = <optimized out> af = 10 backlog = 511 (gdb)
使用系統API getaddrinfo 來解析獲得當前主機的IP地址和端口信息。這裏沒有選擇使用gethostbyname這個API是由於gethostbyname僅用於解析ipv4相關的主機信息,而getaddrinfo既能夠用於ipv4也能夠用於ipv6,這個函數的簽名以下:
int getaddrinfo(const char *node,const char *service,const struct addrinfo *hints,struct addrinfo **res);
這個函數的具體用法能夠在Linux man手冊上查看。一般服務器端在調用getaddrinfo以前,將hints參數的ai_flags設置爲AL_PASSIVE,用於bind,主機名nodename一般會設置爲NULL,返回通配地址[::]。固然,客戶端調用getaddrinfo時,hints參數的ai_flags通常不設置AL_PASSIVE,可是主機名node和服務名service(更願意稱之爲端口)則應該不爲空。
一樣的道理,要研究redis-server如何接受客戶端鏈接,只要搜索socket API accept函數便可。
static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) { int fd; while(1) { fd = accept(s,sa,len); if (fd == -1) { if (errno == EINTR) continue; else { anetSetError(err, "accept: %s", strerror(errno)); return ANET_ERR; } } break; } return fd; }
Thread 1 "redis-server" hit Breakpoint 2, anetGenericAccept (err=err@entry=0x5555559161e0 <server+576> "", s=s@entry=7, sa=sa@entry=0x7fffffffc9f0, len=len@entry=0x7fffffffc9ec) at anet.c:531 531 static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) { (gdb) bt #0 anetGenericAccept (err=err@entry=0x5555559161e0 <server+576> "", s=s@entry=7, sa=sa@entry=0x7fffffffc9f0, len=len@entry=0x7fffffffc9ec) at anet.c:531 #1 0x00005555555893e2 in anetTcpAccept (err=err@entry=0x5555559161e0 <server+576> "", s=s@entry=7, ip=ip@entry=0x7fffffffcab0 "", ip_len=ip_len@entry=46, port=port@entry=0x7fffffffcaac) at anet.c:552 #2 0x000055555559aad2 in acceptTcpHandler (el=<optimized out>, fd=7, privdata=<optimized out>, mask=<optimized out>) at networking.c:728 #3 0x000055555558806c in aeProcessEvents (eventLoop=eventLoop@entry=0x7ffff6c320a0, flags=flags@entry=11) at ae.c:443 #4 0x000055555558841b in aeMain (eventLoop=0x7ffff6c320a0) at ae.c:501 #5 0x00005555555851d4 in main (argc=<optimized out>, argv=0x7fffffffccc8) at server.c:4197
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); } }
循環的退出條件是eventLoop->stop 爲 1.事件處理的代碼以下:
int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents; /* Nothing to do? return ASAP */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; /* Note that we want call select() even if there are no * file events to process as long as we want to process time * events, in order to sleep until the next time event is ready * to fire. */ if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL; struct timeval tv, *tvp; if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) shortest = aeSearchNearestTimer(eventLoop); if (shortest) { long now_sec, now_ms; aeGetTime(&now_sec, &now_ms); tvp = &tv; /* How many milliseconds we need to wait for the next * time event to fire? */ long long ms = (shortest->when_sec - now_sec)*1000 + shortest->when_ms - now_ms; if (ms > 0) { tvp->tv_sec = ms/1000; tvp->tv_usec = (ms % 1000)*1000; } else { tvp->tv_sec = 0; tvp->tv_usec = 0; } } else { /* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to set the timeout * to zero */ if (flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ tvp = NULL; /* wait forever */ } } /* Call the multiplexing API, will return only on timeout or when * some event fires. */ numevents = aeApiPoll(eventLoop, tvp); /* After sleep callback. */ if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop); for (j = 0; j < numevents; j++) { aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int fired = 0; /* Number of events fired for current fd. */ /* Normally we execute the readable event first, and the writable * event laster. This is useful as sometimes we may be able * to serve the reply of a query immediately after processing the * query. * * However if AE_BARRIER is set in the mask, our application is * asking us to do the reverse: never fire the writable event * after the readable. In such a case, we invert the calls. * This is useful when, for instance, we want to do things * in the beforeSleep() hook, like fsynching a file to disk, * before replying to a client. */ int invert = fe->mask & AE_BARRIER; /* Note the "fe->mask & mask & ..." code: maybe an already * processed event removed an element that fired and we still * didn't processed, so we check if the event is still valid. * * Fire the readable event if the call sequence is not * inverted. */ if (!invert && fe->mask & mask & AE_READABLE) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } /* Fire the writable event. */ if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } /* If we have to invert the call, fire the readable event now * after the writable one. */ if (invert && fe->mask & mask & AE_READABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } processed++; } } /* Check time events */ if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */ }
/* Search the first timer to fire. * This operation is useful to know how many time the select can be * put in sleep without to delay any event. * If there are no timers NULL is returned. * * Note that's O(N) since time events are unsorted. * Possible optimizations (not needed by Redis so far, but...): * 1) Insert the event in order, so that the nearest is just the head. * Much better but still insertion or deletion of timers is O(N). * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)). */ static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop) { aeTimeEvent *te = eventLoop->timeEventHead; aeTimeEvent *nearest = NULL; while(te) { if (!nearest || te->when_sec < nearest->when_sec || (te->when_sec == nearest->when_sec && te->when_ms < nearest->when_ms)) nearest = te; te = te->next; } return nearest; }
接着獲取當前系統時間(aeGetTime(&now_sec,&now_ms);)將最先到期的定時器事件減去當前系統時間得到一個間隔。這個時間間隔做爲numevents = aeApiPoll(eventLoop,tvp);調用的參數,aeApiPoll()在Linux平臺上使用epoll技術,Redis在這個IO複用技術上、在不一樣的操做系統平臺上使用不一樣的系統函數,在Windows系統上使用select,在Mac系統上使用kqueue。這裏重點看下Linux平臺下的實現:
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, numevents = 0; retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); if (retval > 0) { int j; numevents = retval; for (j = 0; j < numevents; j++) { int mask = 0; struct epoll_event *e = state->events+j; if (e->events & EPOLLIN) mask |= AE_READABLE; if (e->events & EPOLLOUT) mask |= AE_WRITABLE; if (e->events & EPOLLERR) mask |= AE_WRITABLE; if (e->events & EPOLLHUP) mask |= AE_WRITABLE; eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; } } return numevents; }
int epoll_wait(int epfd,struct epoll_event *events,int maxevents,int timeout);
for (j = 0; j < numevents; j++) { aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int fired = 0; /* Number of events fired for current fd. */ /* Normally we execute the readable event first, and the writable * event laster. This is useful as sometimes we may be able * to serve the reply of a query immediately after processing the * query. * * However if AE_BARRIER is set in the mask, our application is * asking us to do the reverse: never fire the writable event * after the readable. In such a case, we invert the calls. * This is useful when, for instance, we want to do things * in the beforeSleep() hook, like fsynching a file to disk, * before replying to a client. */ int invert = fe->mask & AE_BARRIER; /* Note the "fe->mask & mask & ..." code: maybe an already * processed event removed an element that fired and we still * didn't processed, so we check if the event is still valid. * * Fire the readable event if the call sequence is not * inverted. */ if (!invert && fe->mask & mask & AE_READABLE) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } /* Fire the writable event. */ if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } /* If we have to invert the call, fire the readable event now * after the writable one. */ if (invert && fe->mask & mask & AE_READABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } processed++; }
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask); /* File event structure */ typedef struct aeFileEvent { int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */ aeFileProc *rfileProc; aeFileProc *wfileProc; void *clientData; } aeFileEvent;
static int aeApiCreate(aeEventLoop *eventLoop) { aeApiState *state = zmalloc(sizeof(aeApiState)); if (!state) return -1; state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize); if (!state->events) { zfree(state); return -1; } state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */ if (state->epfd == -1) { zfree(state->events); zfree(state); return -1; } eventLoop->apidata = state; return 0; }
(gdb) bt #0 aeCreateEventLoop (setsize=10128) at ae.c:79 #1 0x0000555555591aa0 in initServer () at server.c:2044 #2 0x0000555555585103 in main (argc=<optimized out>, argv=0x7fffffffccc8) at server.c:4160
struct redisServer server; /* Server global state */ struct redisServer { /* General */ int lua_repl; /* Script replication flags for redis.set_repl(). */ int lua_timedout; /* True if we reached the time limit for script execution. */ int lua_kill; /* Kill the script if true. */ int lua_always_replicate_commands; /* Default replication type. */ /* Lazy free */ int lazyfree_lazy_eviction; int lazyfree_lazy_expire; int lazyfree_lazy_server_del; /* Latency monitor */ long long latency_monitor_threshold; dict *latency_events; /* 省略部分 */ /* Mutexes used to protect atomic variables when atomic builtins are * not available. */ pthread_mutex_t lruclock_mutex; pthread_mutex_t next_client_id_mutex; pthread_mutex_t unixtime_mutex; };