曹工說Redis源碼(5)-- redis server 啓動過程解析,以及EventLoop每次處理事件前的前置工做解析(下)

文章導航

Redis源碼系列的初衷,是幫助咱們更好地理解Redis,更懂Redis,而怎麼才能懂,光看是不夠的,建議跟着下面的這一篇,把環境搭建起來,後續能夠本身閱讀源碼,或者跟着我這邊一塊兒閱讀。因爲我用c也是好幾年之前了,些許錯誤在所不免,但願讀者能不吝指出。html

曹工說Redis源碼(1)-- redis debug環境搭建,使用clion,達到和調試java同樣的效果java

曹工說Redis源碼(2)-- redis server 啓動過程解析及簡單c語言基礎知識補充redis

曹工說Redis源碼(3)-- redis server 啓動過程完整解析(中)shell

曹工說Redis源碼(4)-- 經過redis server源碼來理解 listen 函數中的 backlog 參數數據庫

本講主題

本講將延續第三講的主題,將啓動過程的主體講完。爲了保證閱讀體驗,避免過於突兀,能夠先閱讀第三講。本講,主要講解餘下的部分:api

  1. 建立pid文件
  2. 加載rdb、aof,獲取數據
  3. 運行事件處理器,準備處理事件,EventLoop每次處理事件前的前置工做

建立pid文件

pid,也就是進程id,之後臺模式運行時,redis會把本身的pid,寫入到一個文件中,默認的文件路徑和名稱爲:/var/run/redis.pid緩存

配置文件可配:安全

# When running daemonized, Redis writes a pid file in /var/run/redis.pid by
# default. You can specify a custom pid file location here.
pidfile /var/run/redis.pid

這部分代碼很是簡潔:app

void createPidFile(void) {
    // 1
    FILE *fp = fopen(server.pidfile, "w");
    if (fp) {
        // 2
        fprintf(fp, "%d\n", (int) getpid());
        // 3
        fclose(fp);
    }
}
  • 1,打開文件,這裏的pidfile就是前面的文件名,/var/run/redis.pid,配置文件能夠對其修改。模式爲w,表示將對其寫入。
  • 2,調用pid,獲取當前進程的pid,寫入該文件描述符
  • 3,關閉文件。

加載rdb、aof

在啓動時,會檢查aof和rdb選項是否打開,若是打開,則會去加載數據,這裏要注意的是,redis老是先查看是否有 aof 開關是否打開;打開的話,則直接使用 aof;less

若是 aof 沒打開,則去加載 rdb 文件。

void loadDataFromDisk(void) {
    // 記錄開始時間
    long long start = ustime();

    // AOF 持久化已打開
    if (server.aof_state == REDIS_AOF_ON) {
        // 嘗試載入 AOF 文件
        if (loadAppendOnlyFile(server.aof_filename) == REDIS_OK)
            // 打印載入信息,並計算載入耗時長度
            redisLog(REDIS_NOTICE, "DB loaded from append only file: %.3f seconds",
                     (float) (ustime() - start) / 1000000);
        // AOF 持久化未打開
    } else {
        // 嘗試載入 RDB 文件
        if (rdbLoad(server.rdb_filename) == REDIS_OK) {
            // 打印載入信息,並計算載入耗時長度
            redisLog(REDIS_NOTICE, "DB loaded from disk: %.3f seconds",
                     (float) (ustime() - start) / 1000000);
        }
    }
}

加載的過程,如今來說,不太合適,好比以aof爲例,aof文件中存儲了一條條的命令,加載 aof 文件的過程,其實就會在進程內部建立一個 fake client(源碼中就是這樣命名,也就是一個假的客戶端),來一條條地發送 aof 文件中的命令進行執行。

這個命令執行的過程,如今講會有點早,因此 aof 也放後面吧,講了命令執行再回頭看這塊。

事件循環結構體講解

核心流程以下:

// 1
    aeSetBeforeSleepProc(server.el, beforeSleep);
    // 2
    aeMain(server.el);
  • 先看2處,這裏傳入server這個全局變量中的el屬性,該屬性就表明了當前事件處理器的狀態,其定義以下:

    // 事件狀態
        aeEventLoop *el;

    el,實際就是EventLoop的簡寫;結構體 aeEventLoop,裏面維護了:當前使用的多路複用庫的函數、當前註冊到多路複用庫,在發生讀寫事件時,須要被通知的socket 文件描述符、以及其餘一些東西。

    typedef struct aeEventLoop {
    
        // 目前已註冊的最大描述符
        int maxfd;   /* highest file descriptor currently registered */
    
        // 目前已追蹤的最大描述符
        int setsize; /* max number of file descriptors tracked */
    
        // 用於生成時間事件 id
        long long timeEventNextId;
    
        // 最後一次執行時間事件的時間
        time_t lastTime;     /* Used to detect system clock skew */
    
        // 1 已註冊的文件事件
        aeFileEvent *events; /* Registered events */
    
        // 2 已就緒的文件事件
        aeFiredEvent *fired; /* Fired events */
    
        // 3 時間事件
        aeTimeEvent *timeEventHead;
    
        // 事件處理器的開關
        int stop;
    
        // 4 多路複用庫的私有數據
        void *apidata; /* This is used for polling API specific data */
    
        // 5 在處理事件前要執行的函數
        aeBeforeSleepProc *beforesleep;
    
    } aeEventLoop;
    • 1處,註冊到多路複用庫,須要監聽的socket 文件描述符事件,好比,某socket的可讀事件;

    • 2處,以select或者epoll這類多路複用庫爲例,在一次 select 中,若是發現某些socket事件已經知足,則,這些ready的事件,會被存放到本屬性中。

      由於個人描述比較抽象,這裏拿一段 man select中的說明給你們看下:

      select() allow  a  program to monitor multiple file descriptors, waiting until one or more of the file descriptors become "ready" for some class of I/O operation (e.g., input possible).  A file descriptor is considered ready if it is possible to perform the corresponding I/O  operation  (e.g., read(2)) without blocking.

      直譯一下:select() 容許一個程序去監聽多個文件描述符,等待直到1個或多個文件描述符變成 ready狀態,該狀態下,能夠不阻塞地讀寫該文件描述符。

    • 3處,事件事件,主要用來週期執行,執行一些redis的後臺任務,如刪除過時key,後面細講。

    • 4處,指向當前正在使用的多路複用庫的相關數據,目前redis支持:select、epoll、kqueue、evport

    • 5處,在處理事件前,要執行的一個函數

再回頭來看前面的代碼:

// 1    
aeSetBeforeSleepProc(server.el, beforeSleep);
aeMain(server.el);

這裏的1處,就是設置前面第5點提到的,設置處理事件前,先要執行的一個函數。

事件循環處理器的主循環

void aeMain(aeEventLoop *eventLoop) {

    eventLoop->stop = 0;

    while (!eventLoop->stop) {

        // 若是有須要在事件處理前執行的函數,那麼運行它
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);

        // 開始處理事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

能夠看到,一共2個部分,首先執行eventLoop的事件處理前要執行的函數;接着再開始處理事件。

事件處理前的前置執行函數

這裏講解下面這一句:

eventLoop->beforesleep(eventLoop);

這個函數,在前面已經看到了,被賦值爲:

aeSetBeforeSleepProc(server.el, beforeSleep);

這個 beforeSleep以下:

void beforeSleep(struct aeEventLoop *eventLoop) {

    /* Run a fast expire cycle (the called function will return
     * ASAP if a fast cycle is not needed). */
    // 1 執行一次快速的主動過時檢查
    if (server.active_expire_enabled && server.masterhost == NULL)
        activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);

	// 2
    ...

    /* Write the AOF buffer on disk */
    // 3 將 AOF 緩衝區的內容寫入到 AOF 文件
    flushAppendOnlyFile(0);

    /* Call the Redis Cluster before sleep function. */
    // 在進入下個事件循環前,執行一些集羣收尾工做
    if (server.cluster_enabled) clusterBeforeSleep();
}
  • 1,這裏會去執行主動的過時檢查,大體流程代碼以下:

    void activeExpireCycle(int type) {
        /* This function has some global state in order to continue the work
         * incrementally across calls. */
        // 靜態變量,用來累積函數連續執行時的數據
        static unsigned int current_db = 0; /* Last DB tested. */
        ...
    
        unsigned int j, iteration = 0;
        // 默認每次處理的數據庫數量
        unsigned int dbs_per_call = REDIS_DBCRON_DBS_PER_CALL;
        // 函數開始的時間
        long long start = ustime(), timelimit;
    
        dbs_per_call = server.dbnum;
    
        timelimit = 1000000 * ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC / server.hz / 100;
        timelimit_exit = 0;
        if (timelimit <= 0) timelimit = 1;
    
        // 1 遍歷數據庫
        for (j = 0; j < dbs_per_call; j++) {
            int expired;
            // 指向要處理的數據庫
            redisDb *db = server.db + (current_db % server.dbnum);
            current_db++;
    
            do {
                unsigned long num, slots;
                long long now, ttl_sum;
                int ttl_samples;
    
                /* If there is nothing to expire try next DB ASAP. */
                // 2 獲取數據庫中帶過時時間的鍵的數量 若是該數量爲 0 ,直接跳過這個數據庫
                if ((num = dictSize(db->expires)) == 0) {
                    db->avg_ttl = 0;
                    break;
                }
                // 3 獲取數據庫中鍵值對的數量
                slots = dictSlots(db->expires);
                // 當前時間
                now = mstime();
    
                // 每次最多隻能檢查 LOOKUPS_PER_LOOP 個鍵
                if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP)
                    num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP;
    
                // 4 開始遍歷數據庫
                while (num--) {
                    dictEntry *de;
                    long long ttl;
    
                    // 從 expires 中隨機取出一個帶過時時間的鍵
                    if ((de = dictGetRandomKey(db->expires)) == NULL) break;
                    // 計算 TTL
                    ttl = dictGetSignedIntegerVal(de) - now;
                    // 5 若是鍵已通過期,那麼刪除它,並將 expired 計數器增一
                    if (activeExpireCycleTryExpire(db, de, now)) expired++;
                }
    
                // 6 爲這個數據庫更新平均 TTL 統計數據
                ...
                    
                // 更新遍歷次數
                iteration++;
    
                // 7 每遍歷 16 次執行一次
                if ((iteration & 0xf) == 0 && /* check once every 16 iterations. */
                    (ustime() - start) > timelimit) {
                    // 若是遍歷次數正好是 16 的倍數
                    // 而且遍歷的時間超過了 timelimit
                    // 那麼斷開 timelimit_exit
                    timelimit_exit = 1;
                }
    
                // 8 已經超時了,返回
                if (timelimit_exit) return;
    
                /* We don't repeat the cycle if there are less than 25% of keys
                 * found expired in the current DB. */
                // 若是已刪除的過時鍵佔當前總數據庫帶過時時間的鍵數量的 25 %
                // 那麼再也不遍歷
            } while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP / 4);
        }
    }

    這個函數,刪減了一部分,留下了主流程:

    • 1處,遍歷數據庫,通常就是遍歷16個庫
    • 2處,獲取當前庫中,過時鍵的數量,過時鍵都存儲在db->expires中,只須要算這個map的size便可;若是沒有要過時的,處理下一個庫
    • 3處,獲取過時鍵的數量
    • 4處,開始遍歷當前數據庫的過時鍵,最多遍歷20次,這裏的num,被ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP賦值,這個值定義爲20,也就是說,每次掃描一個庫中,20個過時鍵
    • 5處,若是鍵已過時,則將這個key過時掉,好比從當前數據庫刪除,發佈事件等等
    • 6處,計算一些統計數據
    • 7處,遍歷16次,檢查下是否已經執行了足夠長的時間;由於redis是單線程的,不能一直執行過時鍵清理任務,還要處理客戶端請求呢,因此,這裏每執行16次循環,就檢查下時間,看看是否已經超時,超時直接返回。
    • 8處,超時返回
  • 講完了主動過時,接着講前面的流程,2處,涉及一些主從複製相關的東西,這塊放到後面吧

  • 3處,將 aof 從緩存中,刷到磁盤

    這個方法比較長,在後面分段講解

刷新aof緩存到磁盤的執行過程

  • 判斷是否有正在進行中的任務
void flushAppendOnlyFile(int force) {
    ssize_t nwritten;
    int sync_in_progress = 0;

    // 緩衝區中沒有任何內容,直接返回
    if (sdslen(server.aof_buf) == 0) return;

    // 策略爲每秒 FSYNC 
    if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
        //1  是否有 SYNC 正在後臺進行?
        sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;

1處,會去判斷一個全局變量,該變量是一個隊列,用於存儲後臺任務。另一個後臺線程(沒錯,redis不是單純的單線程,仍是有其餘線程的),會去該隊列取任務,取不到就阻塞;取到了則執行。而刷新 aof 到磁盤這種重io的工做,就是被封裝爲一個任務,丟到這個隊列中的。因此,這裏去判斷隊列的大小是否爲0.

/* Return the number of pending jobs of the specified type. 
 *
 * 返回等待中的 type 類型的工做的數量
 */
unsigned long long bioPendingJobsOfType(int type) {
    unsigned long long val;

    pthread_mutex_lock(&bio_mutex[type]);
  	// 1
    val = bio_pending[type];
    pthread_mutex_unlock(&bio_mutex[type]);

    return val;
}

1處這裏的val,就是存儲指定類型的任務的數量。咱們這裏傳入的type爲 REDIS_BIO_AOF_FSYNC,因此就是看看:aof 刷盤的任務數量。

  • 調用write函數執行寫入

    // 1
    	nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
        if (nwritten != (signed)sdslen(server.aof_buf)) {
          // 2
          ...
        }else{
            // 3
            /* Successful write(2). If AOF was in error state, restore the
             * OK state and log the event. */
            // 寫入成功,更新最後寫入狀態
            if (server.aof_last_write_status == REDIS_ERR) {
                redisLog(REDIS_WARNING,
                    "AOF write error looks solved, Redis can write again.");
                server.aof_last_write_status = REDIS_OK;
            }
        }
    • 1處,執行寫入,將server.aof_buf這個緩衝區的內容,寫入aof文件,寫入的字節長度爲sdslen(server.aof_buf)。也就是,將整個緩衝區寫入。

    • 2處,若是寫入的長度,不等於緩衝區的長度,表示只寫了一部分,進入異常分支

      爲何寫入的會比預期的少,咱們看看官方說明:

      write() writes up to count bytes from the buffer pointed buf to the file referred to by the file descriptor fd.
      
      The  number of bytes written may be less than count if, for example, there is insufficient space on the underlying physical medium, or the RLIMIT_FSIZE resource limit is encountered (see setrlimit(2)), or the call was interrupted by a signal handler after having written less than count bytes.  (See also pipe(7).)

      這裏的第二段就說了,多是由於底層物理介質的空間不夠;進程的資源限制;或者被中斷。

    • 3處,寫入成功;更新狀態,若是上一次aof寫入狀態爲error,此次改成ok

  • flush到磁盤

    前面write是寫入到操做系統的os cache中,可是尚未落盤。必須執行flush以後,纔會刷盤。

    // 老是執行 fsnyc
        if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
            /* aof_fsync is defined as fdatasync() for Linux in order to avoid
             * flushing metadata. */
            // 1
            aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */
    
            // 更新最後一次執行 fsnyc 的時間
            server.aof_last_fsync = server.unixtime;
    
        // 策略爲每秒 fsnyc ,而且距離上次 fsync 已經超過 1 秒
        } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
                    server.unixtime > server.aof_last_fsync)) {
            // 2 放到後臺執行
            if (!sync_in_progress) aof_background_fsync(server.aof_fd);
            // 更新最後一次執行 fsync 的時間
            server.aof_last_fsync = server.unixtime;
        }
    • 1處,若是aof策略爲:AOF_FSYNC_ALWAYS,則調用fsync,刷盤

    • 2處,若是策略爲每秒刷盤:AOF_FSYNC_EVERYSEC,放到後臺去刷盤。這裏的放到後臺,就是放到前面提到的任務隊列中,由其餘線程去刷。

      void aof_background_fsync(int fd) {
          bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
      }
      void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
          struct bio_job *job = zmalloc(sizeof(*job));
      
          job->time = time(NULL);
          job->arg1 = arg1;
          job->arg2 = arg2;
          job->arg3 = arg3;
      
          pthread_mutex_lock(&bio_mutex[type]);
      
          // 1 將新工做推入隊列
          listAddNodeTail(bio_jobs[type],job);
          bio_pending[type]++;
      
          pthread_cond_signal(&bio_condvar[type]);
      
          pthread_mutex_unlock(&bio_mutex[type]);
      }

      這裏的1處,能夠看到,將任務丟到了隊列中,且先後進行了加鎖。由於這個隊列,是會被其餘線程訪問的,因此爲了線程安全,進行了加鎖。

todo

void aeMain(aeEventLoop *eventLoop) {

    eventLoop->stop = 0;

    while (!eventLoop->stop) {

        // 若是有須要在事件處理前執行的函數,那麼運行它
        if (eventLoop->beforesleep != NULL)
            // 1
            eventLoop->beforesleep(eventLoop);

        // 2開始處理事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

啓動作的事實在太多了,本篇把1這裏的這個函數講了,下篇才能講2.

總結

本篇主要講了,redis啓動過程當中,主循環的大流程,以及在主循環去處理一個事件以前,要執行的任務。這個主循環如何處理事件,放到下篇繼續。

相關文章
相關標籤/搜索