MongoDB 4.2 內核解析 - Change Stream

MongoDB 從3.6版本開始支持了 Change Stream 能力(4.0、4.2 版本在能力上作了不少加強),用於訂閱 MongoDB 內部的修改操做,change stream 可用於 MongoDB 之間的增量數據遷移、同步,也能夠將 MongoDB 的增量訂閱應用到其餘的關聯繫統;好比電商場景裏,MongoDB 裏存儲新的訂單信息,業務須要根據新增的訂單信息去通知庫存管理系統發貨。mongodb

Change Stream 與 Tailing Oplog 對比

在 change stream 功能以前,若是要獲取 MongoDB 增量的修改,能夠經過不斷 tailing oplog  的方式來 拉取增量的 oplog ,而後針對拉取到的 oplog 集合,來過濾知足條件的 oplog。這種方式也能知足絕大部分場景的需求,但存在以下的不足。shell

  1. 使用門檻較高,用戶須要針對 oplog 集合,打開特殊選項的的 tailable cursor  ("tailable": true, "awaitData" : true)。
  2. 用戶須要本身管理增量續傳,當拉取應用 crash 時,用戶須要記錄上一條拉取oplog的 ts、h 等字段,在下一次先定位到指定 oplog 再繼續拉取。
  3. 結果過濾必須在拉取側完成,但只須要訂閱部分 oplog 時,好比針對某個 DB、某個 Collection、或某種類型的操做,必需要把左右的 oplog 拉取到再進行過濾。
  4. 對於 update 操做,oplog 只包含操做的部份內容,好比 {$set: {x: 1}} ,而應用常常須要獲取到完整的文檔內容。
  5. 不支持 Sharded Cluster 的訂閱,用戶必須針對每一個 shard 進行 tailing oplog,而且這個過程當中不能有 moveChunk 操做,不然結果可能亂序。

MongoDB Change Stream 解決了 Tailing oplog 存在的不足網絡

  1. 簡單易用,提供統一的 Change Stream API,一次 API 調用,便可從 MongoDB Server 側獲取增量修改。
  2. 統一的進度管理,經過 resume token 來標識拉取位置,只需在 API 調用時,帶上上次結果的 resume token,便可從上次的位置接着訂閱。
  3. 支持對結果在 Server 端進行 pipeline 過濾,減小網絡傳輸,支持針對 DB、Collection、OperationType 等維度進行結果過濾。
  4. 支持 fullDocument: "updateLookup" 選項,對於 update,返回當時對應文檔的完整內容。
  5. 支持 Sharded Cluster 的修改訂閱,相同的 API 請求發到 mongos ,便可獲取集羣維度全局有序的修改。

Change Stream 實戰

以 Mongo shell 爲例,使用 Change Stream 很是簡單,mongo shell 封裝了針對整個實例、DB、Collection 級別的訂閱操做。app

db.getMongo().watch()    訂閱整個實例的修改
db.watch()               訂閱指定DB的修改
db.collection.watch()    訂閱指定Collection的修改

一、新建鏈接1發起訂閱操做oop

mytest:PRIMARY>db.coll.watch([], {maxAwaitTimeMS: 60000})  最多阻塞等待 1分鐘

二、新建鏈接2寫入新數據post

mytest:PRIMARY> db.coll.insert({x: 100})
WriteResult({ "nInserted" : 1 })
mytest:PRIMARY> db.coll.insert({x: 101})
WriteResult({ "nInserted" : 1 })
mytest:PRIMARY> db.coll.insert({x: 102})
WriteResult({ "nInserted" : 1 })

三、鏈接1上收到 Change Stream 更新性能

mytest:PRIMARY> db.watch([], {maxAwaitTimeMS: 60000})
{ "_id" : { "_data" : "825E0D5E35000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E353BE5C36D695042C90004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934389, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e353be5c36d695042c9"), "x" : 100 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e353be5c36d695042c9") } }
{ "_id" : { "_data" : "825E0D5E37000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E373BE5C36D695042CA0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934391, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca"), "x" : 101 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca") } }
{ "_id" : { "_data" : "825E0D5E39000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E393BE5C36D695042CB0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934393, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb"), "x" : 102 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb") } }

四、上述 ChangeStream 結果裏,_id 字段的內容即爲 resume token,標識着 oplog 的某個位置,若是想從某個位置繼續訂閱,在 watch 時,經過 resumeAfter 指定便可。好比每一個應用訂閱了上述3條修改,但只有第一條已經成功消費了,下次訂閱時指定第一條的 resume token 便可再次訂閱到接下來的2條。ui

mytest:PRIMARY> db.coll.watch([], {maxAwaitTimeMS: 60000, resumeAfter: { "_data" : "825E0D5E35000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E353BE5C36D695042C90004" }})
{ "_id" : { "_data" : "825E0D5E37000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E373BE5C36D695042CA0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934391, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca"), "x" : 101 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca") } }
{ "_id" : { "_data" : "825E0D5E39000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E393BE5C36D695042CB0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934393, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb"), "x" : 102 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb") } }

Change Stream 內部實現

watch() wrapper

db.watch() 其實是一個 API wrapper,實際上 Change Stream 在 MongoDB 內部其實是一個 aggregation 命令,只是加了一個特殊的 $changestream  階段,在發起 change stream 訂閱操做後,可經過 db.currentOp() 看到對應的 aggregation/getMore 操做的詳細參數。阿里雲

{
      "op" : "getmore",
      "ns" : "test.coll",
      "command" : {
        "getMore" : NumberLong("233479991942333714"),
        "collection" : "coll",
        "maxTimeMS" : 50000,
        "lsid" : {
          "id" : UUID("e4fffa71-e168-4527-be61-f0918849d107")
        },
      },
      "planSummary" : "COLLSCAN",
      "cursor" : {
        "cursorId" : NumberLong("233479991942333714"),
        "createdDate" : ISODate("2019-12-31T06:35:52.479Z"),
        "lastAccessDate" : ISODate("2019-12-31T06:36:09.988Z"),
        "nDocsReturned" : NumberLong(1),
        "nBatchesReturned" : NumberLong(1),
        "noCursorTimeout" : false,
        "tailable" : true,
        "awaitData" : true,
        "originatingCommand" : {
          "aggregate" : "coll",
          "pipeline" : [
            {
              "$changeStream" : {
                "fullDocument" : "default"
              }
            }
          ],
          "cursor" : {

          },
          "lsid" : {
            "id" : UUID("e4fffa71-e168-4527-be61-f0918849d107")
          },
          "$clusterTime" : {
            "clusterTime" : Timestamp(1577774144, 1),
            "signature" : {
              "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
              "keyId" : NumberLong(0)
            }
          },
          "$db" : "test"
        },
        "operationUsingCursorId" : NumberLong(7019500)
      },
      "numYields" : 2,
      "locks" : {

      }
    }

resume token

resume token 用來描述一個訂閱點,本質上是 oplog 信息的一個封裝,包含 clusterTime、uuid、documentKey等信息,當訂閱 API 帶上 resume token 時,MongoDB Server 會將 token 轉換爲對應的信息,並定位到 oplog 起點繼續訂閱操做。編碼

struct ResumeTokenData {
    Timestamp clusterTime;
    int version = 0;
    size_t applyOpsIndex = 0;
    Value documentKey;
    boost::optional<UUID> uuid;
};

ResumeTokenData 結構裏包含 version 信息,在 4.0.7 之前的版本,version 均爲0; 4.0.7 引入了一種新的 resume token 格式,version 爲 1; 另外在 3.6 版本里,Resume Token 的編碼與 4.0 也有所不一樣;因此在版本升級後,有可能出現不一樣版本 token 沒法識別的問題,因此儘可能要讓 MongoDB Server 全部組件(Replica Set 各個成員,ConfigServer、Mongos)都保持相同的內核版本。

更詳細的信息,參考 https://docs.mongodb.com/manual/reference/method/Mongo.watch/#resumability

updateLookup

Change Stream 支持針對 update 操做,獲取當前的文檔完整內容,而不是僅更新操做自己,好比

mytest:PRIMARY> db.coll.find({_id: 101})
{ "_id" : 101, "name" : "jack", "age" : 18 }
mytest:PRIMARY> db.coll.update({_id: 101}, {$set: {age: 20}})
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })

上面的 update 操做,默認狀況下,change stream 會收到  {_id: 101}, {$set: {age: 20}  的內容,而並不會包含這個文檔其餘未更新字段的信息;而加上 fullDocument: "updateLookup" 選項後,Change Stream 會根據文檔 _id 去查找文檔當前的內容並返回。

須要注意的是,updateLookup 選項只能保證最終一致性,好比針對上述文檔,若是連續更新100次,update 的 change stream 並不會按順序收到中間每一次的更新,由於每次都是去查找文檔當前的內容,而當前的內容可能已經被後續的修改覆蓋。

Sharded cluster

Change Stream 支持針對 sharded cluster 進行訂閱,會保證全局有序的返回結果;爲了達到全局有序這個目標,mongos 須要從每一個 shard 都返回訂閱結果按時間戳進行排序合併返回。

在極端狀況下,若是某些 shard 寫入量不多或者沒有寫入,change stream 的返回延時會受到影響,由於須要等到全部 shard 都返回訂閱結果;默認狀況下,mongod server 每10s會產生一條 Noop 的特殊oplog,這個機制會間接驅動 sharded cluster 在寫入量不高的狀況下也能持續運轉下去。

因爲須要全局排序,在 sharded cluster 寫入量很高時,Change Stream 的性能極可能跟不上;若是對性能要求很是高,能夠考慮關閉 Balancer,在每一個 shard 上各自創建 Change Stream。

參考資料


本文做者:張友東

閱讀原文

本文爲阿里雲內容,未經容許不得轉載。

相關文章
相關標籤/搜索