第08課:【實戰】Redis網絡通訊模塊源碼分析(1)

  咱們這裏先研究redis-server端的網絡通訊模塊。除去Redis自己的業務功能之外,Redis的網絡通訊模塊實現思路和細節很是有表明性。因爲網絡通訊模塊的設計也是Linux C++後臺開發一個很重要的模塊,雖然網絡上有不少現成的網絡庫,可是簡單易學且能夠做爲典範的並很少,而redis-server就是這方面值得借鑑學習的材料之一。node

8.1偵聽socket初始化工做linux

  經過前面課程的介紹,咱們知道網絡通訊在應用層上的大體流程以下:redis

  *服務器端偵聽socket;算法

  *將偵聽socket綁定到須要的IP地址和端口上(調用Soket API bind函數);api

  *啓動偵聽(調用socket API listen函數);數組

  *無限等待客戶端鏈接到來,調用Socket API accept函數接受客戶端鏈接,並稱生一個與該客戶端對應的客戶端socket;服務器

  *處理客戶端socket上網絡數據的收發,必要時關閉該socket。網絡

全局搜索了一下Redis的代碼,尋找調用了bind()函數的代碼,通過過濾和篩選,咱們肯定了位於anet.c的anetListen()函數。app

  

static int (char *err, int s, struct sockaddr *sa, socklen_t len, int backlog) {
    if (bind(s,sa,len) == -1) {
        anetSetError(err, "bind: %s", strerror(errno));
        close(s);
        return ANET_ERR;
    }

    if (listen(s, backlog) == -1) {
        anetSetError(err, "listen: %s", strerror(errno));
        close(s);
        return ANET_ERR;
    }
    return ANET_OK;
}

  用GDB的b命令在這個函數上加個斷點,而後從新運行redis-server:socket

(gdb) b anetListen
Breakpoint 1 at 0x555555588620: file anet.c, line 440.
(gdb) info breakpoints 
Num     Type           Disp Enb Address            What
1       breakpoint     keep y   0x0000555555588620 in anetListen at anet.c:440
(gdb) r
The program being debugged has been started already.
Start it from the beginning? (y or n) y
Starting program: /home/wzq/Desktop/redis-5.0.3/src/redis-server 
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".
11580:C 14 Jan 2019 11:27:06.118 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
11580:C 14 Jan 2019 11:27:06.118 # Redis version=5.0.3, bits=64, commit=00000000, modified=0, pid=11580, just started
11580:C 14 Jan 2019 11:27:06.118 # Warning: no config file specified, using the default config. In order to specify a config file use /home/wzq/Desktop/redis-5.0.3/src/redis-server /path/to/redis.conf
11580:M 14 Jan 2019 11:27:06.119 * Increased maximum number of open files to 10032 (it was originally set to 1024).

Breakpoint 1, anetListen (err=0x5555559161e0 <server+576> "", s=6, sa=0x555555b2b240, len=28, backlog=511) at anet.c:440
warning: Source file is more recent than executable.
440	static int (char *err, int s, struct sockaddr *sa, socklen_t len, int backlog) {
(gdb) 

  在GDB中斷在這個函數時,使用bt命令查看一下此時的調用堆棧:

(gdb) bt
#0  anetListen (err=0x5555559161e0 <server+576> "", s=6, sa=0x555555b2b240, len=28, backlog=511) at anet.c:440
#1  0x00005555555887a4 in _anetTcpServer (err=0x5555559161e0 <server+576> "", port=<optimized out>, bindaddr=<optimized out>, af=10, backlog=511)
    at anet.c:487
#2  0x000055555558cf07 in listenToPort (port=6379, fds=0x55555591610c <server+364>, count=0x55555591614c <server+428>) at server.c:1924
#3  0x0000555555591ed0 in initServer () at server.c:2055
#4  0x0000555555585103 in main (argc=<optimized out>, argv=0x7fffffffccc8) at server.c:4160

  經過這個堆棧,結合堆棧#1的6379端口號能夠確認這就是咱們要找的邏輯,而且這個邏輯在主線程(由於從堆棧上看,最頂層堆棧是main()函數)中進行。

咱們看下堆棧#1處的代碼:

static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog)
{
    int s = -1, rv;
    char _port[6];  /* strlen("65535") */
    struct addrinfo hints, *servinfo, *p;

    snprintf(_port,6,"%d",port);
    memset(&hints,0,sizeof(hints));
    hints.ai_family = af;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_flags = AI_PASSIVE;    /* No effect if bindaddr != NULL */

    if ((rv = getaddrinfo(bindaddr,_port,&hints,&servinfo)) != 0) {
        anetSetError(err, "%s", gai_strerror(rv));
        return ANET_ERR;
    }
    for (p = servinfo; p != NULL; p = p->ai_next) {
        if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1)
            continue;

        if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error;
        if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;
        if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) s = ANET_ERR;
        goto end;
    }
    if (p == NULL) {
        anetSetError(err, "unable to bind socket, errno: %d", errno);
        goto error;
    }

error:
    if (s != -1) close(s);
    s = ANET_ERR;
end:
    freeaddrinfo(servinfo);
    return s;
}

  將堆棧切換至#1,而後輸入info arg查看傳入給這個額函數的參數。

(gdb) f 1
#1  0x00005555555887a4 in _anetTcpServer (err=0x5555559161e0 <server+576> "", port=<optimized out>, bindaddr=<optimized out>, af=10, backlog=511)
    at anet.c:487
487	        if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) s = ANET_ERR;
(gdb) info args 
err = 0x5555559161e0 <server+576> ""
port = <optimized out>
bindaddr = <optimized out>
af = 10
backlog = 511
(gdb) 

  使用系統API getaddrinfo 來解析獲得當前主機的IP地址和端口信息。這裏沒有選擇使用gethostbyname這個API是由於gethostbyname僅用於解析ipv4相關的主機信息,而getaddrinfo既能夠用於ipv4也能夠用於ipv6,這個函數的簽名以下:

int getaddrinfo(const char *node,const char *service,const struct addrinfo *hints,struct addrinfo **res);

  這個函數的具體用法能夠在Linux man手冊上查看。一般服務器端在調用getaddrinfo以前,將hints參數的ai_flags設置爲AL_PASSIVE,用於bind,主機名nodename一般會設置爲NULL,返回通配地址[::]。固然,客戶端調用getaddrinfo時,hints參數的ai_flags通常不設置AL_PASSIVE,可是主機名node和服務名service(更願意稱之爲端口)則應該不爲空。

  解析完協議信息後,利用獲得的協議信息建立偵聽socket,並開啓該socket的reuseAddr選項。而後調用anetListen函數,在該函數中先bind後listen。至此,redis-server就能夠在6379端口上接受客戶端鏈接了。

8.2接受客戶端鏈接

  一樣的道理,要研究redis-server如何接受客戶端鏈接,只要搜索socket API accept函數便可。

  經定位,咱們最終在anet.c文件中找到anetGenericAccept函數:

static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {
    int fd;
    while(1) {
        fd = accept(s,sa,len);
        if (fd == -1) {
            if (errno == EINTR)
                continue;
            else {
                anetSetError(err, "accept: %s", strerror(errno));
                return ANET_ERR;
            }
        }
        break;
    }
    return fd;
}

  咱們用b命令在這個函數加個斷點,而後從新運行redis-server。一直到程序所有運行起來,GDB都沒有觸發該斷點,這時新打開一個redis-cli,以模擬新客戶端鏈接到redis-server上的行爲。斷點觸發了,此時查看一下調用堆棧。

Thread 1 "redis-server" hit Breakpoint 2, anetGenericAccept (err=err@entry=0x5555559161e0 <server+576> "", s=s@entry=7, sa=sa@entry=0x7fffffffc9f0, 
    len=len@entry=0x7fffffffc9ec) at anet.c:531
531	static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {
(gdb) bt
#0  anetGenericAccept (err=err@entry=0x5555559161e0 <server+576> "", s=s@entry=7, sa=sa@entry=0x7fffffffc9f0, len=len@entry=0x7fffffffc9ec)
    at anet.c:531
#1  0x00005555555893e2 in anetTcpAccept (err=err@entry=0x5555559161e0 <server+576> "", s=s@entry=7, ip=ip@entry=0x7fffffffcab0 "", 
    ip_len=ip_len@entry=46, port=port@entry=0x7fffffffcaac) at anet.c:552
#2  0x000055555559aad2 in acceptTcpHandler (el=<optimized out>, fd=7, privdata=<optimized out>, mask=<optimized out>) at networking.c:728
#3  0x000055555558806c in aeProcessEvents (eventLoop=eventLoop@entry=0x7ffff6c320a0, flags=flags@entry=11) at ae.c:443
#4  0x000055555558841b in aeMain (eventLoop=0x7ffff6c320a0) at ae.c:501
#5  0x00005555555851d4 in main (argc=<optimized out>, argv=0x7fffffffccc8) at server.c:4197

  分析這個調用堆棧,梳理一下這個調用流程。在main函數的initServer函數中建立偵聽socket、綁定地址而後開啓偵聽,接着調用aeMain函數啓動一個循環不斷地處理「事件」。

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}

  循環的退出條件是eventLoop->stop 爲 1.事件處理的代碼以下:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            long now_sec, now_ms;

            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;

            /* How many milliseconds we need to wait for the next
             * time event to fire? */
            long long ms =
                (shortest->when_sec - now_sec)*1000 +
                shortest->when_ms - now_ms;

            if (ms > 0) {
                tvp->tv_sec = ms/1000;
                tvp->tv_usec = (ms % 1000)*1000;
            } else {
                tvp->tv_sec = 0;
                tvp->tv_usec = 0;
            }
        } else {
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }

        /* Call the multiplexing API, will return only on timeout or when
         * some event fires. */
        numevents = aeApiPoll(eventLoop, tvp);

        /* After sleep callback. */
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; /* Number of events fired for current fd. */

            /* Normally we execute the readable event first, and the writable
             * event laster. This is useful as sometimes we may be able
             * to serve the reply of a query immediately after processing the
             * query.
             *
             * However if AE_BARRIER is set in the mask, our application is
             * asking us to do the reverse: never fire the writable event
             * after the readable. In such a case, we invert the calls.
             * This is useful when, for instance, we want to do things
             * in the beforeSleep() hook, like fsynching a file to disk,
             * before replying to a client. */
            int invert = fe->mask & AE_BARRIER;

            /* Note the "fe->mask & mask & ..." code: maybe an already
             * processed event removed an element that fired and we still
             * didn't processed, so we check if the event is still valid.
             *
             * Fire the readable event if the call sequence is not
             * inverted. */
            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }

            /* Fire the writable event. */
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            /* If we have to invert the call, fire the readable event now
             * after the writable one. */
            if (invert && fe->mask & mask & AE_READABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            processed++;
        }
    }
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

  這段代碼先經過flag參數檢查是否有事件須要處理。若是有定時器事件(AE_TIME_EVENTS標誌),則尋找最近要到期的定時器。

/* Search the first timer to fire.
 * This operation is useful to know how many time the select can be
 * put in sleep without to delay any event.
 * If there are no timers NULL is returned.
 *
 * Note that's O(N) since time events are unsorted.
 * Possible optimizations (not needed by Redis so far, but...):
 * 1) Insert the event in order, so that the nearest is just the head.
 *    Much better but still insertion or deletion of timers is O(N).
 * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)).
 */
static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
{
    aeTimeEvent *te = eventLoop->timeEventHead;
    aeTimeEvent *nearest = NULL;

    while(te) {
        if (!nearest || te->when_sec < nearest->when_sec ||
                (te->when_sec == nearest->when_sec &&
                 te->when_ms < nearest->when_ms))
            nearest = te;
        te = te->next;
    }
    return nearest;
}

  這段代碼有詳細的註釋,也很是好理解。註釋告訴咱們,因爲這裏的定時器集合是無序的,因此須要遍歷一下這個鏈表,算法複雜度是O(n)。同時,註釋中也「暗示」了咱們未來Redis在這塊的優化方向,即把這個鏈表按到期時間從小到大排序,這樣鏈表的頭部就是咱們要的最近時間點的定時器對象,算法複雜度是O(l)。或者使用Redis中的skiplist,算法複雜度是O(log(N))。

  接着獲取當前系統時間(aeGetTime(&now_sec,&now_ms);)將最先到期的定時器事件減去當前系統時間得到一個間隔。這個時間間隔做爲numevents = aeApiPoll(eventLoop,tvp);調用的參數,aeApiPoll()在Linux平臺上使用epoll技術,Redis在這個IO複用技術上、在不一樣的操做系統平臺上使用不一樣的系統函數,在Windows系統上使用select,在Mac系統上使用kqueue。這裏重點看下Linux平臺下的實現:

  

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

  epoll_wait這個函數的簽名以下:

int epoll_wait(int epfd,struct epoll_event *events,int maxevents,int timeout);

  最後一個參數timeout的設置很是有講究,若是傳入進來的tvp是NULL,根據上文的分析,說明定時器事件,則將等待時間設置爲-1,這會讓epoll_wait無限期地掛起來,直到有事件時纔會被喚醒。掛起的好處就是不浪費CPU時間片。反之,將timeout設置成最近的定時器事件間隔,將epoll_wait的等待時間設置爲最近的定時器事件來臨的時間間隔,能夠及時喚醒epoll_wait,這樣程序流能夠儘快處理這個到期的定時器事件(下文會介紹)。

  對於epoll_wait這種系統調用,全部的fd(對於網絡通訊,也叫socket)信息包括偵聽fd和普通客戶端fd都記錄在事件循環對象aeEventLoop的apidata字段中,當某個fd上有事件觸發時,從apidata中找到該fd,並把事件類型(mask字段)一塊兒記錄到aeEventLoop的fired字段中去。咱們先吧這個流程介紹完,再介紹epoll_wait函數中使用的epfd是在什麼時候何地建立的,偵聽fd、客戶端fd是如何掛載到epfd上去的。

  在獲得了有事件的fd之後,接下來就要處理這些事件了。在主循環aeProcessEvents中從aeEventLoop對象的fired數組中取出上一步記錄的fd,而後根據事件類型(讀事件和寫事件)分別進行處理。

for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; /* Number of events fired for current fd. */

            /* Normally we execute the readable event first, and the writable
             * event laster. This is useful as sometimes we may be able
             * to serve the reply of a query immediately after processing the
             * query.
             *
             * However if AE_BARRIER is set in the mask, our application is
             * asking us to do the reverse: never fire the writable event
             * after the readable. In such a case, we invert the calls.
             * This is useful when, for instance, we want to do things
             * in the beforeSleep() hook, like fsynching a file to disk,
             * before replying to a client. */
            int invert = fe->mask & AE_BARRIER;

            /* Note the "fe->mask & mask & ..." code: maybe an already
             * processed event removed an element that fired and we still
             * didn't processed, so we check if the event is still valid.
             *
             * Fire the readable event if the call sequence is not
             * inverted. */
            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }

            /* Fire the writable event. */
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            /* If we have to invert the call, fire the readable event now
             * after the writable one. */
            if (invert && fe->mask & mask & AE_READABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            processed++;
        }

  該事件字段rfileProc和寫事件字段wfileProc都是函數指針,在程序早期設置好,這裏直接調用就能夠了。

typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);


/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;

  咱們經過搜索關鍵字epoll_create在ae_poll.c文件中找到EPFD的建立函數aeApiCreate。

static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        zfree(state);
        return -1;
    }
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    eventLoop->apidata = state;
    return 0;
}

  使用GDB的b命令在這個函數上加個斷點,而後使用run命令從新運行一下redis-server,觸發斷點,使用bt命令查看此時的調用堆棧。發現EPFD也是在上文介紹的initServer函數中建立的。

(gdb) bt
#0  aeCreateEventLoop (setsize=10128) at ae.c:79
#1  0x0000555555591aa0 in initServer () at server.c:2044
#2  0x0000555555585103 in main (argc=<optimized out>, argv=0x7fffffffccc8) at server.c:4160

  在aeCreateEventLoop中不只建立了EPFD,也建立了整個事件循環須要的aeEventLoop對象,並把這個對象記錄在Redis的一個全局變量的el字段中。這個全局變量叫server,這是個結構體類型。其定義以下:

struct redisServer server; /* Server global state */

struct redisServer {
    /* General */
 
    int lua_repl;         /* Script replication flags for redis.set_repl(). */
    int lua_timedout;     /* True if we reached the time limit for script
                             execution. */
    int lua_kill;         /* Kill the script if true. */
    int lua_always_replicate_commands; /* Default replication type. */
    /* Lazy free */
    int lazyfree_lazy_eviction;
    int lazyfree_lazy_expire;
    int lazyfree_lazy_server_del;
    /* Latency monitor */
    long long latency_monitor_threshold;
    dict *latency_events;
     /*
        省略部分  
    */

    /* Mutexes used to protect atomic variables when atomic builtins are
     * not available. */
    pthread_mutex_t lruclock_mutex;
    pthread_mutex_t next_client_id_mutex;
    pthread_mutex_t unixtime_mutex;
};
相關文章
相關標籤/搜索