天池中間件大賽——單機百萬消息隊列存儲分享

此次天池中間件性能大賽初賽和複賽的成績都正好是第五名,本次整理了複賽《單機百萬消息隊列的存儲設計》的思路方案分享給你們,實現方案上也是決賽隊伍中相對比較特別的。linux

賽題回顧

  • 實現一個進程內的隊列引擎,單機可支持100萬隊列以上
  • 實現消息put、get接口。
  • 在規定時間內完成數據發送、索引校檢、數據消費三個階段評測。

評測邏輯

  • 各個階段線程數在20~30左右。
  • 發送階段:消息大小在50字節左右,消息條數在20億條左右,也即發送總數據在100G左右。
  • 索引校驗階段:會對全部隊列的索引進行隨機校驗;平均每一個隊列會校驗1~2次。
  • 順序消費階段:挑選20%的隊列進行所有讀取和校驗;
  • 發送階段最大耗時不能超過1800s;索引校驗階段和順序消費階段加在一塊兒,最大耗時也不能超過1800s;超時會被判斷爲評測失敗。

評測環境

  • 測試環境爲4c8g的ECS虛擬機。
  • 帶一塊300G左右大小的SSD磁盤。SSD性能大體以下:iops 1w左右;塊讀寫能力(一次讀寫4K以上)在200MB/s左右。

賽題分析

對於單機幾百的大隊列來講業務已有成熟的方案,Kafka和RocketMQ。算法

方案 幾百個大隊列
Kafka 每一個隊列一個文件(獨立存儲)
RocketMQ 全部隊列共用一個文件(混合存儲)

若直接採用現有的方案,在百萬量級的小隊列場景都有極大的弊端。數組

方案 百萬隊列場景弊端
Kafka獨立存儲 單個小隊列數據量少,批量化程度徹底取決於內存大小,落盤時間長,寫數據容易觸發IOPS瓶頸
RocketMQ混合存儲 隨機讀嚴重,一個塊中連續數據很低,讀速度很慢,消費速度徹底受限於IOPS

爲了兼顧讀寫速度,咱們最終採用了折中的設計方案:多個隊列merge,共享一個塊存儲。緩存

設計核心思想

  • 設計上要支持邊寫邊讀
  • 多個隊列須要合併處理
  • 單個隊列的數據存儲部分連續
  • 索引稀疏,儘量常駐內存

架構設計

架構圖中Bucket Manager和Group Manager分別對百萬隊列進行分桶以及合併管理,而後左右兩邊是分別是寫模塊和讀模塊,數據寫入包括隊列merge處理,消息塊落盤。讀模塊包括索引管理和讀緩存。(見左圖)bash

bucket、group、queue的關係:對消息隊列進行bucket處理,每一個bucket包含多個group,group是咱們進行隊列merge的最小單元,每一個group管理固定數量的隊列。(見右圖)數據結構

存儲設計

  • 對百萬隊列進行分桶處理。
  • 每一個Bucket中分爲多個Group,每一個Group爲一個讀寫單位,對隊列進行merge,同時更新索引和數據文件。
  • 單個Group裏對M個隊列進行合併,超過16k或者壓縮超過16K(可配置)進行索引更新和落盤。
  • 索引部分針對每一個Block塊創建一個L2二級索引,而後每16個L2創建一個L1一級索引。
  • 數據文件採用混合存儲,對Block塊順序存儲。

接下來對整個存儲每一個階段的細節進行展開分析,包括隊列合併、索引管理和數據落盤。架構

MQ Merge

1. 百萬隊列數據Bucket Hash分桶

2. Bucket視角

  • 每一個Bucket分配多個Group
  • Group是管理多個隊列的最小單位

3. Group分配過程

  • 每一個bucket持有一把鎖,順序爲隊列分配group,這裏咱們假設merge的數量爲4個隊列。
  • 數據的達到是隨機的,根據隊列的前後順序加入當前Group。
  • 當Group達到M個後便造成一個固定分組。相同隊列會在Group內進行合併,新的隊列數據將繼續分配Group接收。

4. Group視角的數據寫入

  • 每一個Group會分配Memtable的Block塊用於實時寫入。
  • 當Block達到16k(可配置)時以隊列爲單位進行數據排序,保證單個隊列數據連續。
  • 字節對齊,Memtable變爲不可變的Immemtable準備落盤。
  • 開闢新的Block接收數據寫入。

索引管理

1. L2二級索引

L2二級索引與數據存儲的位置息息相關,見下圖。爲每一個排序後的Block塊創建一個L2索引,L2索引的結構分爲文件偏移(file offset),數據壓縮大小(size),原始大小(raw size),由於咱們是多個隊列merge,而後接下來是每一個隊列相對於起始位置的delta offset以及消息數量。併發

2. L1一級索引

爲了加快查詢速度,在L2基礎上創建L1一級索引,每16個L2創建一個L1,L1按照時間前後順序存放。L1和L2的組織關係以下:異步

L1索引的結構很是簡單,file id對應消息存儲的文件id,以及16個Block塊中每一個隊列消息的起始序列號seq num。例如MQ1從序列號1000開始,MQ2從序列號2000開始等等。高併發

3. Index Query

如何根據索引定位須要查找的數據?

對L1先進行二分查找,定位到上下界範圍,而後對範圍內的全部L2進行順序遍歷。

Data Flush

1. 同步Flush

當blcok超過指定大小後,根據桶的hashcode再進行一次mask操做將group中的隊列數據同步寫入到m個文件中。

同步刷盤主要嘗試了兩種方案:Nio和Dio。Dio大約性能比Nio提高約5%。CPP使用DIO是很是方便的,然而做爲Java Coder你也許是第一次據說DIO,在Java中並無提供直接使用DIO的接口,能夠經過JNA的方式調用。

DIO(DIRECT IO,直接IO),出於對系統cache和調度策略的不滿,用戶本身在應用層定製本身的文件讀寫。DIO最大的優勢就是可以減小OS內核緩衝區和應用程序地址空間的數據拷貝次數,下降文件讀寫時的CPU開銷以及內存的佔用。然而DIO的缺陷也很明顯,DIO在數據讀取時會形成磁盤大量的IO,它並無緩衝IO從PageCache獲取數據的優點。

這裏就遇到一個問題,一樣配置的阿里雲機器測試隨機數據同步寫入性能是很是高的,可是線上的評測數據都是58字節,數據過於規整致使同一時間落盤的機率很大,出現了大量的鎖競爭。因此這裏作了一個小的改進:按機率隨機4K、8K、16K進行落盤,寫性能雖有必定提高,可是效果也是不太理想,因而採用了第二種思路異步刷盤。

2. 異步Flush

採用RingBuffer接收block塊,使用AIO對多個block塊進行Batch刷盤,減小IO Copy的次數。異步刷盤寫性能有了顯著的提高。

如下是異步Flush的核心代碼:

while (gWriterThread) {
    if (taskQueue->pop(task)) {
        writer->mWriting.store(true);
        do {
            // 使用異步IO
            aiocb *pAiocb = aiocb_list[aio_size++];
            memset(pAiocb, 0, sizeof(aiocb));
            pAiocb->aio_lio_opcode = LIO_WRITE;
            pAiocb->aio_buf = task.mWriteCache.mCache;
            pAiocb->aio_nbytes = task.mWriteCache.mSize;
            pAiocb->aio_offset = task.mWriteCache.mStartOffset;
            pAiocb->aio_fildes = task.mBlockFile->mFd;
            pAiocb->aio_sigevent.sigev_value.sival_ptr = task.mBlockFile;
            task.mBlockFile->mWriting = true;
            
            if (aio_size >= MAX_AIO_TASK_COUNT) {
                break;
            }
        } while (taskQueue->pop(task));
        
        if (aio_size > 0) {
            if (0 != lio_listio(LIO_WAIT, aiocb_list, aio_size, NULL)) {
                aos_fatal_log("aio error %d %s.", errno, strerror(errno));
            }
            
            for (int i = 0; i < aio_size; ++i) {
                ((BlockFile *) aiocb_list[i]->aio_sigevent.sigev_value.sival_ptr)->mWriting = false;
                free((void *) aiocb_list[i]->aio_buf);
            }
            aio_size = 0;
        }
    } else {
        ++waitCount;
        sched_yield();
        if (waitCount > 100000) {
            usleep(10000);
        }
    }
}
複製代碼

讀緩存設計

數據讀取流程

  • 根據隊列Hash定位Bucket桶。
  • 二分查找定位L1索引和L2索引。
  • 在必定時機會執行預讀取操做。
  • 數據先從緩存中作查找,緩存命中直接返回,失效則回源到SSD。

整個流程主要有兩個優化點:預讀取和讀緩存。

預讀取優化

1. 記錄上一次讀取(消費)的offset

主要有兩個做用:

  • 加快查詢數據的速度。
  • 用於判斷預讀取時機。

2. 預讀取時機

順序消費且已經消費到當前block尾,則進行預讀取操做。如何判斷順序消費?判斷上次消費的結束位置是否與此次消費的起始位置相等。

if (msgCount >= destCount) {
    if (mLastGetSequeneNum == offsetCount &&
        beginIndex + 1 < mL2IndexCount &&
        beginOffsetCount + blockIndex.mMsgDeltaIndexCount <= offsetCount + msgCount + msgCount) {
        MessageBlockIndex &nextIndex = mL2IndexArray[beginIndex + 1];
        // 預讀取
#ifdef __linux__
        readahead(pManager->GetFd(hash), nextIndex.mFileOffset, PER_BLOCK_SIZE);
#endif
    }
    mLastGetSequeneNum = offsetCount + msgCount;
    return msgCount;
}
複製代碼

Read Cache

關於read cache作了一些精巧的小設計,保證足夠簡單高效。

  • 分桶(部分隔離),必定程度緩解緩存餓死現象。
  • 數組 + 自旋鎖 + 原子變量實現了一個循環分配緩存塊的方案。
  • 雙向指針綁定高效定位緩存節點。

1. Read Cache全貌

Read Cache一共分爲N=64(可配)個Bucket,每一個Bucket中包含M=3200(可配)個緩存塊,大概總計20w左右的緩存塊,每一個是4k,大約佔用800M的內存空間。

2. 核心數據結構

關於緩存的核心數據結構,咱們並無從隊列的角度出發,而是針對L2索引和緩存塊進行了綁定,這裏設計了一個雙向指針。判斷緩存是否有效的核心思路:check雙向指針是否相等。

CacheItem cachedItem = (CacheItem *) index->mCache;
cachedItem->mIndexPtr == (void *) index;
複製代碼

3. 算法實現

3.1 Bucket分桶

  • 獲取L2 Index。
  • 根據Manager Hash % N,找到對應的緩存Bucket。
  • L2尚未對應緩存塊,須要進行緩存塊分配。

3.2 Alloc Cache Block

  • 原子變量進行自加操做,同時對M=3200塊取模, count.fetch_add(1) % M = index
  • 分配下標爲index的Cache Block。
  • 而後將對應的緩存塊和咱們的隊列的L2索引進行雙向指針綁定,同時對緩存塊數據進行數據填充。

3.3 Cache Hit

  • index->mCache == index->mCache->index,雙向指針相等,緩存命中,而後作數據讀取。

3.4 Cache Page Replace

  • MQ1綁定的緩存塊已經被MQ2替換。
  • index->mCache != index->mCache->index,雙向指針已經不相等,緩存失效。須要爲MQ1分配新的緩存塊。

  • 原子變量進行自加操做,同時對M=3200塊取模, 例如:count.fetch_add(1) % M = M-1,找到新的緩存塊進行從新綁定。說明:整個分配的邏輯是一個循環使用的過程,當全部的緩存桶都被使用,那麼會從數組首地址開始從新分配、替換。

4. Read Cache & LRU & PageCache 對比

開始咱們嘗試了兩種讀緩存方案:最簡單的LRU緩存和直接使用PageCache讀取。PageCache所實現的實際上是高級版的LRU緩存。在順序讀的場景下,咱們本身實現的讀緩存(Cycle Cache Allocate,暫簡稱爲CCA)與LRU、PageCache的優劣分析對好比下:

  • LRU針對每次操做進行調整,CCA針對緩存塊須要分配時進行替換。
  • LRU從隊列角度創建映射表,CCA針對索引和緩存塊雙向指針綁定。
  • CCA中自旋鎖是針對每一個緩存塊加鎖,鎖粒度更小。LRU須要對整個鏈表加鎖。
  • 達到同等命中率的狀況下,CCA比Page Cache節省至少1~2倍的內存。

總結

創新點

  • 針對百萬小隊列,實際硬件資源,兼顧讀寫性能,提出多隊列Merge,保證隊列局部連續的存儲方式。
  • 針對隊列相對無關性及MQ連續讀取的場景,設計實現了O(1)的Read Cache,只須要約800M內存便可支持20W隊列的高效率的讀取,命中率高達85%。
  • 支持多隊列Merge的索引存儲方案,資源利用率低,約300~400MB索引便可支撐百萬隊列、100GB數據量高併發讀寫。

工程價值:

  • 通用性、隨機性、健壯性較好,支持對任意隊列進行merge。
  • 較少出現單隊列消息太少而致使block塊未刷盤的狀況,塊的填充會比較均勻。
  • 沒必要等待單個隊列滿而進行批量刷盤,減小內存佔用,更多的內存可支持更多的隊列。
  • 能夠讀寫同步進行,常駐內存的索引結構也適合落盤,應對機器重啓、持久化等場景。

思考

  • 爲何沒有使用mmap?爲何mmap寫入會出現卡頓?

轉載請註明出處,歡迎關注個人公衆號:亞普的技術輪子

亞普的技術輪子
相關文章
相關標籤/搜索