MongoDB 4.2 內核解析 – Change Stream

MongoDB 從3.6版本開始支持了 Change Stream 能力(4.0、4.2 版本在能力上作了不少加強),用於訂閱 MongoDB 內部的修改操做,change stream 可用於 MongoDB 之間的增量數據遷移、同步,也能夠將 MongoDB 的增量訂閱應用到其餘的關聯繫統;好比電商場景裏,MongoDB 裏存儲新的訂單信息,業務須要根據新增的訂單信息去通知庫存管理系統發貨。mongodb

Change Stream 與 Tailing Oplog 對比

在 change stream 功能以前,若是要獲取 MongoDB 增量的修改,能夠經過不斷 tailing oplog 的方式來 拉取增量的 oplog ,而後針對拉取到的 oplog 集合,來過濾知足條件的 oplog。這種方式也能知足絕大部分場景的需求,但存在以下的不足。shell

  • 使用門檻較高,用戶須要針對 oplog 集合,打開特殊選項的的 tailable cursor (「tailable」: true, 「awaitData」 : true)。
  • 用戶須要本身管理增量續傳,當拉取應用 crash 時,用戶須要記錄上一條拉取oplog的 ts、h 等字段,在下一次先定位到指定 oplog 再繼續拉取。
  • 結果過濾必須在拉取側完成,但只須要訂閱部分 oplog 時,好比針對某個 DB、某個 Collection、或某種類型的操做,必需要把左右的 oplog 拉取到再進行過濾。
  • 對於 update 操做,oplog 只包含操做的部份內容,好比 {$set: {x: 1}} ,而應用常常須要獲取到完整的文檔內容。
  • 不支持 Sharded Cluster 的訂閱,用戶必須針對每一個 shard 進行 tailing oplog,而且這個過程當中不能有 moveChunk 操做,不然結果可能亂序。

MongoDB Change Stream 解決了 Tailing oplog 存在的不足微信

  • 簡單易用,提供統一的 Change Stream API,一次 API 調用,便可從 MongoDB Server 側獲取增量修改。
  • 統一的進度管理,經過 resume token 來標識拉取位置,只需在 API 調用時,帶上上次結果的 resume token,便可從上次的位置接着訂閱。
  • 支持對結果在 Server 端進行 pipeline 過濾,減小網絡傳輸,支持針對 DB、Collection、OperationType 等維度進行結果過濾。
  • 支持 fullDocument: 「updateLookup」 選項,對於 update,返回當時對應文檔的完整內容。
  • 支持 Sharded Cluster 的修改訂閱,相同的 API 請求發到 mongos ,便可獲取集羣維度全局有序的修改。

Change Stream 實戰

以 Mongo shell 爲例,使用 Change Stream 很是簡單,mongo shell 封裝了針對整個實例、DB、Collection 級別的訂閱操做。網絡

`db.getMongo().watch() 訂閱整個實例的修改
db.watch() 訂閱指定DB的修改
db.collection.watch() 訂閱指定Collection的修改`app

新建鏈接1發起訂閱操做oop

mytest:PRIMARY>db.coll.watch([], {maxAwaitTimeMS: 60000}) 最多阻塞等待 1分鐘性能

新建鏈接2寫入新數據ui

`mytest:PRIMARY> db.coll.insert({x: 100})
WriteResult({ "nInserted" : 1 })
mytest:PRIMARY> db.coll.insert({x: 101})
WriteResult({ "nInserted" : 1 })
mytest:PRIMARY> db.coll.insert({x: 102})
WriteResult({ "nInserted" : 1 })`編碼

鏈接1上收到 Change Stream 更新code

`mytest:PRIMARY> db.watch([], {maxAwaitTimeMS: 60000})
{ "_id" : { "_data" : "825E0D5E35000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E353BE5C36D695042C90004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934389, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e353be5c36d695042c9"), "x" : 100 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e353be5c36d695042c9") } }
{ "_id" : { "_data" : "825E0D5E37000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E373BE5C36D695042CA0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934391, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca"), "x" : 101 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca") } }
{ "_id" : { "_data" : "825E0D5E39000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E393BE5C36D695042CB0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934393, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb"), "x" : 102 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb") } }`

上述 ChangeStream 結果裏,_id 字段的內容即爲 resume token,標識着 oplog 的某個位置,若是想從某個位置繼續訂閱,在 watch 時,經過 resumeAfter 指定便可。好比每一個應用訂閱了上述3條修改,但只有第一條已經成功消費了,下次訂閱時指定第一條的 resume token 便可再次訂閱到接下來的2條。

`mytest:PRIMARY> db.coll.watch([], {maxAwaitTimeMS: 60000, resumeAfter: { "_data" : "825E0D5E35000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E353BE5C36D695042C90004" }})
{ "_id" : { "_data" : "825E0D5E37000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E373BE5C36D695042CA0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934391, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca"), "x" : 101 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca") } }
{ "_id" : { "_data" : "825E0D5E39000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E393BE5C36D695042CB0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934393, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb"), "x" : 102 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb") } }`

Change Stream 內部實現

watch() wrapper

db.watch() 其實是一個 API wrapper,實際上 Change Stream 在 MongoDB 內部其實是一個 aggregation 命令,只是加了一個特殊的 $changestream 階段,在發起 change stream 訂閱操做後,可經過 db.currentOp() 看到對應的 aggregation/getMore 操做的詳細參數。

`{

"op" : "getmore",
  "ns" : "test.coll",
  "command" : {
    "getMore" : NumberLong("233479991942333714"),
    "collection" : "coll",
    "maxTimeMS" : 50000,
    "lsid" : {
      "id" : UUID("e4fffa71-e168-4527-be61-f0918849d107")
    },
  },
  "planSummary" : "COLLSCAN",
  "cursor" : {
    "cursorId" : NumberLong("233479991942333714"),
    "createdDate" : ISODate("2019-12-31T06:35:52.479Z"),
    "lastAccessDate" : ISODate("2019-12-31T06:36:09.988Z"),
    "nDocsReturned" : NumberLong(1),
    "nBatchesReturned" : NumberLong(1),
    "noCursorTimeout" : false,
    "tailable" : true,
    "awaitData" : true,
    "originatingCommand" : {
      "aggregate" : "coll",
      "pipeline" : [
        {
          "$changeStream" : {
            "fullDocument" : "default"
          }
        }
      ],
      "cursor" : {

      },
      "lsid" : {
        "id" : UUID("e4fffa71-e168-4527-be61-f0918849d107")
      },
      "$clusterTime" : {
        "clusterTime" : Timestamp(1577774144, 1),
        "signature" : {
          "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
          "keyId" : NumberLong(0)
        }
      },
      "$db" : "test"
    },
    "operationUsingCursorId" : NumberLong(7019500)
  },
  "numYields" : 2,
  "locks" : {

  }
}

`

resume token

resume token 用來描述一個訂閱點,本質上是 oplog 信息的一個封裝,包含 clusterTime、uuid、documentKey等信息,當訂閱 API 帶上 resume token 時,MongoDB Server 會將 token 轉換爲對應的信息,並定位到 oplog 起點繼續訂閱操做。

`struct ResumeTokenData {

Timestamp clusterTime;
int version = 0;
size_t applyOpsIndex = 0;
Value documentKey;
boost::optional uuid;

};`

ResumeTokenData 結構裏包含 version 信息,在 4.0.7 之前的版本,version 均爲0; 4.0.7 引入了一種新的 resume token 格式,version 爲 1; 另外在 3.6 版本里,Resume Token 的編碼與 4.0 也有所不一樣;因此在版本升級後,有可能出現不一樣版本 token 沒法識別的問題,因此儘可能要讓 MongoDB Server 全部組件(Replica Set 各個成員,ConfigServer、Mongos)都保持相同的內核版本。

更詳細的信息,參考 https://docs.mongodb.com/manu...

updateLookup

Change Stream 支持針對 update 操做,獲取當前的文檔完整內容,而不是僅更新操做自己,好比

`mytest:PRIMARY> db.coll.find({_id: 101})
{ "_id" : 101, "name" : "jack", "age" : 18 }
mytest:PRIMARY> db.coll.update({_id: 101}, {$set: {age: 20}})
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })`

上面的 update 操做,默認狀況下,change stream 會收到 {_id: 101}, {$set: {age: 20} 的內容,而並不會包含這個文檔其餘未更新字段的信息;而加上 fullDocument: 「updateLookup」 選項後,Change Stream 會根據文檔 _id 去查找文檔當前的內容並返回。

須要注意的是,updateLookup 選項只能保證最終一致性,好比針對上述文檔,若是連續更新100次,update 的 change stream 並不會按順序收到中間每一次的更新,由於每次都是去查找文檔當前的內容,而當前的內容可能已經被後續的修改覆蓋。

Sharded cluster

Change Stream 支持針對 sharded cluster 進行訂閱,會保證全局有序的返回結果;爲了達到全局有序這個目標,mongos 須要從每一個 shard 都返回訂閱結果按時間戳進行排序合併返回。

在極端狀況下,若是某些 shard 寫入量不多或者沒有寫入,change stream 的返回延時會受到影響,由於須要等到全部 shard 都返回訂閱結果;默認狀況下,mongod server 每10s會產生一條 Noop 的特殊oplog,這個機制會間接驅動 sharded cluster 在寫入量不高的狀況下也能持續運轉下去。

因爲須要全局排序,在 sharded cluster 寫入量很高時,Change Stream 的性能極可能跟不上;若是對性能要求很是高,能夠考慮關閉 Balancer,在每一個 shard 上各自創建 Change Stream。

參考資料
Change Stream Manual
Change Streams Production Recommendations
Tailable Cursors/

本文由MongoDB中文社區(mongoing.com)小英 (微信ID:mongoingcom)經過博客一文多發平臺 [OpenWrite]發佈!
相關文章
相關標籤/搜索