摘要: MongoShake是基於MongoDB的通用型平臺服務,做爲數據連通的橋樑,打通各個閉環節點的通道。經過MongoShake的訂閱消費,能夠靈活對接以適應不一樣場景,例如日誌訂閱、數據中心同步、監控審計等。其中,集羣數據同步做爲核心應用場景,可以靈活實現災備和多活的業務場景。golang
在當前的數據庫系統生態中,大部分系統都支持多個節點實例間的數據同步機制,如Mysql Master/Slave主從同步,Redis AOF主從同步等,MongoDB更是支持3節點及以上的副本集同步,上述機制很好的支撐了一個邏輯單元的數據冗餘高可用。算法
跨邏輯單元,甚至跨單元、跨數據中心的數據同步,在業務層有時候就顯得很重要,它使得同城多機房的負載均衡,多機房的互備,甚至是異地多數據中心容災和多活成爲可能。因爲目前MongoDB副本集內置的主從同步對於這種業務場景有較大的侷限性,爲此,咱們開發了MongoShake系統,能夠應用在實例間複製,機房間、跨數據中心複製,知足災備和多活需求。sql
另外,數據備份是做爲MongoShake核心但不是惟一的功能。MongoShake做爲一個平臺型服務,用戶能夠經過對接MongoShake,實現數據的訂閱消費來知足不一樣的業務場景。mongodb
MongoShake是一個以golang語言進行編寫的通用的平臺型服務,經過讀取MongoDB集羣的Oplog操做日誌,對MongoDB的數據進行復制,後續經過操做日誌實現特定需求。日誌能夠提供不少場景化的應用,爲此,咱們在設計時就考慮了把MongoShake作成通用的平臺型服務。經過操做日誌,咱們提供日誌數據訂閱消費PUB/SUB功能,可經過SDK、Kafka、MetaQ等方式靈活對接以適應不一樣場景(如日誌訂閱、數據中心同步、Cache異步淘汰等)。集羣數據同步是其中核心應用場景,經過抓取oplog後進行回放達到同步目的,實現災備和多活的業務場景。數據庫
1. MongoDB集羣間數據的異步複製,免去業務雙寫開銷。網絡
2. MongoDB集羣間數據的鏡像備份(當前1.0開源版本支持受限)架構
3. 日誌離線分析併發
4. 日誌訂閱負載均衡
5. 數據路由。根據業務需求,結合日誌訂閱和過濾機制,能夠獲取關注的數據,達到數據路由的功能。異步
6. Cache同步。日誌分析的結果,知道哪些Cache能夠被淘汰,哪些Cache能夠進行預加載,反向推進Cache的更新。
7. 基於日誌的集羣監控
MongoShake從源庫抓取oplog數據,而後發送到各個不一樣的tunnel通道。現有通道類型有:
1. Direct:直接寫入目的MongoDB
2. RPC:經過net/rpc方式鏈接
3. TCP:經過tcp方式鏈接
4. File:經過文件方式對接
5. Kafka:經過Kafka方式對接
6. Mock:用於測試,不寫入tunnel,拋棄全部數據
消費者能夠經過對接tunnel通道獲取關注的數據,例如對接Direct通道直接寫入目的MongoDB,或者對接RPC進行同步數據傳輸等。此外,用戶還能夠本身建立本身的API進行靈活接入。下面2張圖給出了基本的架構和數據流。
MongoShake對接的源數據庫支持單個mongod,replica set和sharding三種模式。目的數據庫支持mongod和mongos。若是源端數據庫爲replica set,咱們建議對接備庫以減小主庫的壓力;若是爲sharding模式,那麼每一個shard都將對接到MongoShake並進行並行抓取。對於目的庫來講,能夠對接多個mongos,不一樣的數據將會哈希後寫入不一樣的mongos。
MongoShake提供了並行複製的能力,複製的粒度選項(shard_key)能夠爲:id,collection或者auto,不一樣的文檔或表可能進入不一樣的哈希隊列併發執行。id表示按文檔進行哈希;collection表示按表哈希;auto表示自動配置,若是有表存在惟一鍵,則退化爲collection,不然則等價於id。
MongoShake按期將同步上下文進行存儲,存儲對象能夠爲第三方API(註冊中心)或者源庫。目前的上下文內容爲「已經成功同步的oplog時間戳」。在這種狀況下,當服務切換或者重啓後,經過對接該API或者數據庫,新服務可以繼續提供服務。
此外,MongoShake還提供了Hypervisor機制用於在服務掛掉的時候,將服務從新拉起。
提供黑名單和白名單機制選擇性同步db和collection。
支持oplog在發送前進行壓縮,目前支持的壓縮格式有gzip, zlib, 或deflate。
一個數據庫的數據可能會包含不一樣來源:本身產生的和從別處複製的數據。若是沒有相應的措施,可能會致使數據的環形複製,好比A的數據複製到B,又被從B複製到A,致使服務產生風暴被打掛了。或者從B回寫入A時由於惟一鍵約束寫入失敗。從而致使服務的不穩定。
在阿里雲上的MongoDB版本中,咱們提供了防止環形複製的功能。其主要原理是,經過修改MongoDB內核,在oplog中打入gid標識當前數據庫信息,並在複製過程當中經過op_command命令攜帶gid信息,那麼每條數據都有來源信息。若是隻須要當前數據庫產生的數據,那麼只抓取gid等於該數據庫id的oplog便可。因此,在環形複製的場景下,MongoShake從A數據庫抓取gid等於id_A(A的gid)的數據,從B數據庫抓取gid等於id_B(B的gid)的數據便可解決這個問題。
說明:因爲MongoDB內核gid部分的修改還沒有開源,因此開源版本下此功能受限,但在阿里雲MongoDB版本已支持。這也是爲何咱們前面提到的「MongoDB集羣間數據的鏡像備份」在目前開源版本下功能受限的緣由。
MongShake採用了ACK機制確保oplog成功回放,若是失敗將會引起重傳,傳輸重傳的過程相似於TCP的滑動窗口機制。這主要是爲了保證應用層可靠性而設計的,好比解壓縮失敗等等。爲了更好的進行說明,咱們先來定義幾個名詞:
LSN(Log Sequence Number),表示已經傳輸的最新的oplog序號。
LSN_ACK(Acked Log Sequence Number),表示已經收到ack確認的最大LSN,即寫入tunnel成功的LSN。
LSN_CKPT(Checkpoint Log Sequence Number),表示已經作了checkpoint的LSN,即已經持久化的LSN。
LSN、LSN_ACK和LSN_CKPT的值均來自於Oplog的時間戳ts字段,其中隱含約束是:LSN_CKPT<=LSN_ACK<=LSN。
如上圖所示,LSN=16表示已經傳輸了16條oplog,若是沒有重傳的話,下次將傳輸LSN=17;LSN_ACK=13表示前13條都已經收到確認,若是須要重傳,最先將從LSN=14開始;LSN_CKPT=8表示已經持久化checkpoint=8。持久化的意義在於,若是此時MongoShake掛掉重啓後,源數據庫的oplog將從LSN_CKPT位置開始讀取而不是從頭LSN=1開始讀。由於oplog DML的冪等性,同一數據屢次傳輸不會產生問題。但對於DDL,重傳可能會致使錯誤。
MongoShake對外提供Restful API,提供實時查看進程內部各隊列數據的同步狀況,便於問題排查。另外,還提供限速功能,方便用戶進行實時控制,減輕數據庫壓力。
目前MongoShake支持表級別(collection)和文檔級別(id)的併發,id級別的併發須要db沒有惟一索引約束,而表級別併發在表數量小或者有些表分佈很是不均勻的狀況下性能不佳。因此在表級別併發狀況下,須要既能均勻分佈的併發,又能解決表內惟一鍵衝突的狀況。爲此,若是tunnel類型是direct時候,咱們提供了寫入前的衝突檢測功能。
目前索引類型僅支持惟一索引,不支持前綴索引、稀疏索引、TTL索引等其餘索引。
衝突檢測功能的前提須要知足兩個前提約束條件:
1. MongoShake認爲同步的MongoDB Schema是一致的,也不會監聽Oplog的System.indexes表的改動
2. 衝突索引以Oplog中記錄的爲準,不以當前MongoDB中索引做爲參考。
另外,MongoShake在同步過程當中對索引的操做可能會引起異常狀況:
1. 正在建立索引。若是是後臺建索引,這段時間的寫請求是看不到該索引的,但內部對該索引可見,同時可能會致使內存使用率會太高。若是是前臺建索引,全部用戶請求是阻塞的,若是阻塞時間太久,將會引起重傳。
2. 若是目的庫存在的惟一索引在源庫沒有,形成數據不一致,不進行處理。
3. oplog產生後,源庫才增長或刪除了惟一索引,重傳可能致使索引的增刪存在問題,咱們也不進行處理。
爲了支持衝突檢測功能,咱們修改了MongoDB內核,使得oplog中帶入uk字段,標識涉及到的惟一索引信息,如:
{ "ts" : Timestamp(1484805725, 2), "t" : NumberLong(3), "h" : NumberLong("-6270930433887838315"), "v" : 2, "op" : "u", "ns" : "benchmark.sbtest10", "o" : { "_id" : 1, "uid" : 1111, "other.sid":"22222", "mid":8907298448, "bid":123 } "o2" : {"_id" : 1} "uk" : { "uid": "1110" "mid^bid": [8907298448, 123] "other.sid_1": "22221" } }
uk下面的key表示惟一鍵的列名,key用「^」鏈接的表示聯合索引,上面記錄中存在3個惟一索引:uid、mid和bid的聯合索引、other.sid_1。value在增刪改下具備不一樣意義:若是是增長操做,則value爲空;若是是刪除或者修改操做,則記錄刪除或修改前的值。
具體處理流程以下:將連續的k個oplog打包成一個batch,流水式分析每一個batch以內的依賴,劃分紅段。若是存在衝突,則根據依賴和時序關係,將batch切分紅多個段;若是不存在衝突,則劃分紅一個段。而後對段內進行併發寫入,段與段之間順序寫入。段內併發的意思是多個併發線程同時對段內數據執行寫操做,但同一個段內的同一個id必須保證有序;段之間保證順序執行:只有前面一個段所有執行完畢,纔會執行後續段的寫入。
若是一個batch中,存在不一樣的id的oplog同時操做同一個惟一鍵,則認爲這些oplog存在時序關係,也叫依賴關係。咱們必須將存在依賴關係的oplog拆分到2個段中。
MongoShake中處理存在依賴關係的方式有2種:
(1) 插入barrier
經過插入barrier將batch進行拆分,每一個段內進行併發。舉個例子,以下圖所示:
ID表示文檔id,op表示操做,i爲插入,u爲更新,d爲刪除,uk表示該文檔下的全部惟一鍵, uk={a:3} => uk={a:1}表示將惟一鍵的值從a=3改成a=1,a爲惟一鍵。
在開始的時候,batch中有9條oplog,經過分析uk關係對其進行拆分,好比第3條和第4條,在id不一致的狀況下操做了同一個uk={a:3},那麼第3條和第4條之間須要插入barrier(修改前或者修改後不管哪一個相同都算衝突),同理第5條和第6條,第6條和第7條。同一個id操做同一個uk是容許的在一個段內是容許的,因此第2條和第3條能夠分到同一個段中。拆分後,段內根據id進行併發,同一個id仍然保持有序:好比第一個段中的第1條和第2,3條能夠進行併發,可是第2條和第3條須要順序執行。
(2) 根據關係依賴圖進行拆分
每條oplog對應一個時間序號N,那麼每一個序號N均可能存在一個M使得:
因此這個圖就變成了一個有向無環圖,每次根據拓撲排序算法併發寫入入度爲0(沒有入邊)的點便可,對於入度非0的點等待入度變爲0後再寫入,即等待前序結點執行完畢後再執行寫入。
下圖給出了一個例子:一共有10個oplog結點,一個橫線表示文檔ID相同,右圖箭頭方向表示存在惟一鍵衝突的依賴關係。那麼,該圖一共分爲4次執行:併發處理寫入1,2,4,5,而後是3,6,8,其次是7,10,最後是9。
說明:因爲MongoDB中衝突檢測uk部分的修改還沒有開源,因此開源版本下此功能受限,但在阿里雲MongoDB版本已支持。
上圖展現了MongoShake內部架構和數據流細節。整體來講,整個MongoShake能夠大致分爲3大部分:Syncer、Worker和Replayer,其中Replayer只用於tunnel類型爲direct的狀況。
Syncer負責從源數據庫拉取數據,若是源是Mongod或者ReplicaSet,那麼Syncer只有1個,若是是Sharding模式,那麼須要有多個Syncer與Shard一一對應。在Syncer內部,首先fetcher用mgo.v2庫從源庫中抓取數據而後batch打包後放入PendingQueue隊列,deserializer線程從PendingQueue中抓取數據進行解序列化處理。Batcher將從LogsQueue中抓取的數據進行從新組織,將前往同一個Worker的數據彙集在一塊兒,而後hash發送到對應Worker隊列。
Worker主要功能就是從WorkerQueue中抓取數據,而後進行發送,因爲採用ack機制,因此會內部維持幾個隊列,分別爲未發送隊列和已發送隊列,前者存儲未發送的數據,後者存儲發送可是沒有收到ack確認的數據。發送後,未發送隊列的數據會轉移到已發送隊列;收到了對端的ack回覆,已發送隊列中seq小於ack的數據將會被刪除,從而保證了可靠性。
Worker能夠對接不一樣的Tunnel通道,知足用戶不一樣的需求。若是通道類型是direct,那麼將會對接Replayer進行直接寫入目的MongoDB操做,Worker與Replayer一一對應。首先,Replayer將收到的數據根據衝突檢測規則分發到不一樣的ExecutorQueue,而後executor從隊列中抓取進行併發寫入。爲了保證寫入的高效性,MongoShake在寫入前還會對相鄰的相同Operation和相同Namespace的Oplog進行合併。
高德地圖 App是國內數一數二的地圖及導航應用,阿里雲MongoDB數據庫服務爲該應用提供了部分功能的存儲支撐,存儲億級別數據。如今高德地圖使用國內雙中心的策略,經過地理位置等信息路由最近中心提高服務質量,業務方(高德地圖)經過用戶路由到三個城市數據中心,以下圖所示,機房數據之間無依賴計算。
這三個城市地理上從北到南橫跨了整個中國 ,這對多數據中心如何作好複製、容災提出了挑戰,若是某個地域的機房、網絡出現問題,能夠平滑的將流量切換到另外一個地方,作到用戶幾乎無感知?
目前咱們的策略是,拓撲採用機房兩兩互聯方式,每一個機房的數據都將同步到另外兩個機房。而後經過高德的路由層,將用戶請求路由到不一樣的數據中心,讀寫均發送在同一個數據中心,保證必定的事務性。而後再經過MongoShake,雙向異步複製兩個數據中心的數據,這樣保證每一個數據中心都有全量的數據(保證最終一致性) 。任意機房出現問題,另兩個機房中的一個能夠經過切換後提供讀寫服務。下圖展現了城市1和城市2機房的同步狀況。
遇到某個單元不能訪問的問題,經過MongoShake對外開放的Restful管理接口,能夠得到各個機房的同步偏移量和時間戳,經過判斷採集和寫入值便可判斷異步複製是否在某個時間點已經完成。再配合業務方的DNS切流,切走單元流量並保證原有單元的請求在新單元是能夠讀寫的,以下圖所示。
具體測試數據請參考性能測試文檔。
MongoShake將會長期維護,大版本和小版本將會進行持續迭代。歡迎提問留言以及加入一塊兒進行開源開發。