MongoDB Change Stream初體驗

Change Stream是MongoDB從3.6開始支持的新特性。這個新特性有哪些奇妙之處,會給咱們帶來什麼便利?本次的文章將就這個主題進行初步討論。javascript

Change Stream是什麼?

顧名思義,Change Stream即變動流,是MongoDB嚮應用發佈數據變動的一種方式。即當數據庫中有任何數據發生變化,應用端均可以獲得通知。咱們能夠將其理解爲在應用中執行的觸發器。至於應用想獲得什麼數據,以什麼形式獲得數據,則能夠經過聚合框架加以過濾和轉換。這點將在後文中討論。java

Change Stream的原理

咱們先來回顧一下MongoDB複製集大體是如何工做的:mongodb

  1. 應用經過驅動向數據庫發起寫入請求;
  2. 在同一個事務中,MongoDB完成oplog和集合的修改;
  3. oplog被其餘從節點拉走;
  4. 從節點應用獲得的oplog,一樣在一個事務中完成對oplog和集合的修改;

至此,複製集同步完成。能夠發現,整個同步過程是依賴於oplog來進行的。也就是說oplog實際上已經包含了咱們須要的全部變動數據。若是觀測oplog的變化,是否就可以獲得全部變動的數據了呢?對,change stream正是基於這個原理實現的。但事情並無這麼簡單!咱們來看一下問題有可能出在什麼地方。shell

如何從斷點恢復

現實世界中,沒有哪一個應用是能夠不間斷運行的。不考慮bug致使的問題,正常的應用升級也會致使應用中斷運行。那麼在應用恢復的時候,從哪裏開始繼續獲取變動呢?oplog固然是能夠幫咱們作到這點的,但你必須對MongoDB足夠了解,才知道有oplogReplay這樣的參數,以及其餘一些問題。數據庫

如何有效地處理訂閱

假設在一個應用中須要訂閱10個不一樣集合的變動狀況,是否須要開10個tailable cursor去獲取oplog的變動呢?若是是100個集合呢?出於效率考慮顯然不該該這麼作。那麼整個過程就會變成一個生產者-消費者模式,由一個線程負責從oplog獲取變動,由訂閱的線程負責消費這些變動。雖然實現也不是那麼複雜,而且多半能夠找到開源實現,可是涉及多線程就已經足夠讓初學者頭疼一陣的了。
公平地說,上面這些還不算嚴重的問題,下面這些問題可能會更讓人頭疼。安全

如何管理權限

想要tail oplog,必須對local.oplog.rs有讀權限。實際上這至關於對整個數據庫都有了讀權限,由於全部的變動都會在這裏體現出來。DBA可能會阻止你這麼作,由於這實在不是一個很安全的作法。多線程

如何數據回滾

極端狀況下,若是應用處理不當,MongoDB中可能發生數據回滾rollback的問題。若是僅僅經過跟蹤oplog,則會出現已經通知出去的變動被回滾的狀況。架構

幸運的是上面這些問題如今都不是問題了,由於change stream幫咱們規避了這些複雜的細節。框架

使用方法

因爲各類驅動都會有不一樣的語法和API,從shell中嘗試使用change stream多是最簡便的方法。這並不妨礙你隨後在各類驅動中的使用,由於shell中能實現的功能在驅動中必定有對應的語法。下面就以shell爲例看看change stream應該如何使用。性能

打開一個shell,訂閱你須要關注的集合
好比:

var cursor = db.bar.watch();

爲了便於演示,咱們在這個shell中不斷遍歷這個遊標以獲取新數據:

while(true) {
    if (cursor.hasNext()) {
        print(JSON.stringify(cursor.next()));
    }
}

打開另外一個shell,向bar集合中插入一條數據:

db.bar.insert({y: 1})

此時第一個shell中會當即輸出變動數據:

{"_id":{"_data":{"$binary":"glzquiIAAAACRmRfaWQAZFzquiK0lDNo+K0DpwBaEARUMrm0ruVACoftuxjt1RtCBA==","$type":"00"}},"operationType":"insert","fullDocument":{"_id":{"$oid":"5ceaba22b4943368f8ad03a7"},"y":1},"ns":{"db":"test","coll":"bar"},"documentKey":{"_id":{"$oid":"5ceaba22b4943368f8ad03a7"}}}

這裏的一些字段的簡單介紹。更完整的介紹請查閱文檔change events

  • _id: 用於恢復斷點時使用。即知道這個值,應用斷開後下次重啓裏就能夠從這個斷點以後開始恢復得到變動;
  • operationType: 操做類型,常見的值包括:

    • insert
    • update
    • delete
  • ns: 正在操做的命名空間
  • fullDocument: 完整的文檔

從斷點恢復

var cursor = db.bar.watch([], {resumeAfter: <\_id>})

此時使用hasNext()/next()便可獲取到隨後的變動。

注意事項

{readConcern: 'majority'}

爲了不被回滾的更新被髮布出去,change stream選擇只在一個變動到達大多數節點(不可能被回滾)時,纔會將這些變動發佈到應用。使用的方式即{readConcern: "majority"}。所以如下這些狀況下change stream都是不會嚮應用通知任何變動的:

  • 禁用了readConcern
  • 從舊版本升級,但沒有更新featureCompatibilityVersion
  • PSA架構中S宕機;

斷點可恢復時間

由於change stream是依賴於oplog工做的,天然也會面臨oplog面臨的全部問題。問題之一就是oplog被覆蓋。所以想要保證斷點能夠恢復,必須保證應用在oplog window的時間內請求斷點。

刪除集合

若是在訂閱集合變動過程當中集合被刪除,則會收到一條invalid信息通知,表示集合已再也不可用:

{
    "_id" : {
        "_data" : BinData(0,"glzqxCcAAAACFFoQBFQyubSu5UAKh+27GO3VG0IE")
    },
    "operationType" : "invalidate"
}

參考資料

做者簡介

張耀星,MongoDB亞太區首席技術諮詢服務顧問。在MongoDB的開發、應用和諮詢服務上有多年實踐經驗。做爲MongoDB認證專家,曾經爲不一樣行業的各種大型客戶提供過培訓、性能調優、架構設計等各種MongoDB相關技術服務。

相關文章
相關標籤/搜索