高性能服務開發之定時器

        在開發高性能服務器中,定時器老是不可或缺的。 常見的定時器實現三種,分別是:排序鏈表,最小堆,時間輪。 以前用的定時器是基於最小堆的,在定時器數量很少時可使用, 目前公司用的框架中的定時器是基於簡單時間輪的,可是爲了支持大範圍的時間,每一個齒輪的所維護的鏈表爲有序鏈表,每次插入時先mod出spoke,再從頭遍歷鏈表以便將定時器插入到合適位置, 因此本質上仍是基於有序鏈表的。時間複雜度並未減小。node

 

應用場景分析: 下面就一個實際例子來講定時器的使用。linux

場景: 客戶端發起的網絡請求,須要對每一個請求作超時檢查。c++

方案1:一個定時器,一個mulimap<endtime, request>保存請求超時列表, 每次超時時檢查mulimap。這樣,請求的插入時間複雜度爲O(lgn), 遍歷和刪除爲O(1)。且須要額外的編碼。git

方案2:一個請求一個定時器,如此便無需額外的開銷來保存請求。已無需額外的編碼,等待超時處理便可(請求的信息做爲參數給定時器node保存)。時間複雜度爲0。github

若是程序中的定時器數量比較少,基於最小堆的定時器通常能夠知足需求,且實現簡單。而對於方案2中的應用場景,對定時器的要求邊比較高了。算法

 

三種定時器算法複雜度分析:服務器

實現方式網絡

StartTimer數據結構

StopTimer框架

PerTickBookkeeping

基於排序鏈表

O(n)

O(1)

O(1)

基於最小堆

O(lgn)

O(1)

O(1)

基於時間輪

O(1)

O(1)

O(1)

 

實現分析:

排序鏈表:實現比較簡單,很少講。

最小堆: 用c++現成的multimap保存即可以。實現亦比較簡單。

時間輪: 時間輪的實現 有 簡單時間輪(一個時間輪),分級時間輪。

     簡單時間輪: 一個齒輪,每一個齒輪保存一個超時的node鏈表。一個齒輪表示一個時間刻度,好比鐘錶裏面一小格表明一秒,鐘錶的秒針每次跳一格。假設一個刻度表明10ms,則2^32 個格子可表示1.36年,2^16個格子可表示10.9分鐘。當要表示的時間範圍較大時,空間複雜度會大幅增長。

 

 

 

   分級時間輪: 相似於水錶,當小輪子裏的指針轉動滿一圈後,上一級輪子的指針進一格。  採用五個輪子每一個輪子爲一個簡單時間輪,大小分別爲 2^8, 2^6, 2^6, 2^6, 2^6,所需空間:2^8 + 2^6 + 2^6 + 2^6 + 2^6 = 512, 可表示的範圍爲 0  --  2^8 * 2^6 * 2^6* 2^6* 2^6 = 2^32 。

Linux底層的定時器實現即是基於此。下圖爲引用的linux內核定時器的實現。

 

 

 

基於分級時間輪的C++實現: 已通過測試

     所需數據結構:

     每一個spoke維護的node鏈表爲一個環,如此能夠簡化插入刪除的操做。spoke->next爲node鏈表中第一個節點,prev爲node鏈接的最後一個節點。

#define GRANULARITY 10 //10ms
#define WHEEL_BITS1 8
#define WHEEL_BITS2 6
#define WHEEL_SIZE1 (1 << WHEEL_BITS1) //256
#define WHEEL_SIZE2 (1 << WHEEL_BITS2) //64
#define WHEEL_MASK1 (WHEEL_SIZE1 - 1)
#define WHEEL_MASK2 (WHEEL_SIZE2 - 1)
#define WHEEL_NUM 5


typedef struct stNodeLink {
    stNodeLink *prev;
    stNodeLink *next;
    stNodeLink() {prev = next = this;} //circle
}SNodeLink;
typedef struct stTimerNode {
    SNodeLink link;
    uint64_t dead_time;
    CThreadTimer *timer;
    stTimerNode(CThreadTimer *t, uint64_t dt) :  dead_time(dt), timer(t) {}
}STimerNode;
typedef struct stWheel {
    SNodeLink *spokes;
    uint32_t size;
    uint32_t spokeindex;
    stWheel(uint32_t n) : size(n), spokeindex(0){ 
        spokes = new SNodeLink[n];
    }
    ~stWheel() { 
        /** clean **/
    }
}SWheel;

SWheel *wheels_[WHEEL_NUM];

 

     插入定時器:根據超時範圍選擇輪子,再經過mod/n求出要插入的spoke位置。

void CTimerManager::AddTimerNode(uint32_t milseconds, STimerNode *node) {
    SNodeLink *spoke = NULL;
    uint32_t interval = milseconds / GRANULARITY;
    uint32_t threshold1 = WHEEL_SIZE1;
    uint32_t threshold2 = 1 << (WHEEL_BITS1 + WHEEL_BITS2);
    uint32_t threshold3 = 1 << (WHEEL_BITS1 + 2 * WHEEL_BITS2);
    uint32_t threshold4 = 1 << (WHEEL_BITS1 + 3 * WHEEL_BITS2);
    
    if (interval < threshold1) {
        uint32_t index = (interval + wheels_[0]->spokeindex) & WHEEL_MASK1;
        spoke = wheels_[0]->spokes + index;
    } else if (interval < threshold2) {
        uint32_t index = ((interval - threshold1 + wheels_[1]->spokeindex * threshold1) >> WHEEL_BITS1) & WHEEL_MASK2;
        spoke = wheels_[1]->spokes + index;
    } else if (interval < threshold3) {
        uint32_t index = ((interval - threshold2 + wheels_[2]->spokeindex * threshold2) >> (WHEEL_BITS1 + WHEEL_BITS2)) & WHEEL_MASK2;
        spoke = wheels_[2]->spokes + index;
    } else if (interval < threshold4) {
        uint32_t index = ((interval - threshold3 + wheels_[3]->spokeindex * threshold3) >> (WHEEL_BITS1 + 2 * WHEEL_BITS2)) & WHEEL_MASK2;
        spoke = wheels_[3]->spokes + index;
    } else {
        uint32_t index = ((interval - threshold4 + wheels_[4]->spokeindex * threshold4) >> (WHEEL_BITS1 + 3 * WHEEL_BITS2)) & WHEEL_MASK2;
        spoke = wheels_[4]->spokes + index;
    }
    SNodeLink *nodelink = &(node->link);
    nodelink->prev = spoke->prev;
    spoke->prev->next = nodelink;
    nodelink->next = spoke;
    spoke->prev = nodelink;
}

 

     刪除定時器:其實是刪除一個雙向鏈表的元素。只需修改其先後節點的prev next指針指向而已。

void CTimerManager::RemoveTimer(STimerNode* node) {
    SNodeLink *nodelink = &(node->link);
    if (nodelink->prev) {
        nodelink->prev->next = nodelink->next;
    }
    if (nodelink->next) {
        nodelink->next->prev = nodelink->prev;
    }
    nodelink->prev = nodelink->next = NULL;
    
    delete node;
}

 

     定時間超時檢查:

void CTimerManager::DetectTimerList() {
    uint64_t now = GetCurrentMillisec();
    uint32_t loopnum = now > checktime_ ? (now - checktime_) / GRANULARITY : 0;
    
    SWheel *wheel =  wheels_[0];
    for (uint32_t i = 0; i < loopnum; ++i) {
        SNodeLink *spoke = wheel->spokes + wheel->spokeindex;
        SNodeLink *link = spoke->next;
        while (link != spoke) {
            STimerNode *node = (STimerNode *)link;
            link->prev->next = link->next;
            link->next->prev = link->prev;
            link = node->link.next;
            AddToReadyNode(node);
        }
        if (++(wheel->spokeindex) >= wheel->size) {
            wheel->spokeindex = 0;
            Cascade(1);
        }
        checktime_ += GRANULARITY;
    }
    DoTimeOutCallBack();
}

 

     降級:

uint32_t CTimerManager::Cascade(uint32_t wheelindex) {
    if (wheelindex < 1 || wheelindex >= WHEEL_NUM) {
        return 0;
    }
    SWheel *wheel =  wheels_[wheelindex];
    int casnum = 0;
    uint64_t now = GetCurrentMillisec();
    SNodeLink *spoke = wheel->spokes + (wheel->spokeindex++);
    SNodeLink *link = spoke->next;
    spoke->next = spoke->prev = spoke;
    while (link != spoke) {
        STimerNode *node = (STimerNode *)link;
        link = node->link.next;
        if (node->dead_time <= now) {
            AddToReadyNode(node);
        } else {
            uint32_t milseconds = node->dead_time - now;
            AddTimerNode(milseconds, node);
            ++casnum;
        }
        
    }
    
    if (wheel->spokeindex >= wheel->size) {
        wheel->spokeindex = 0;
        casnum += Cascade(++wheelindex);
    }
    return casnum;
}

 

 

項目開源地址:  https://github.com/ape2010/ape_cpp_server/tree/master/frame/common/

相關文章
相關標籤/搜索