在MongoDB3.6引入的新feature中,change stream無疑是很是吸引人的。html
Change streams allow applications to access real-time data changes without the complexity and risk of tailing the oplog.mongodb
Change stream容許應用實時獲取mongodb數據的變動,這是個呼聲很高的一個的需求,能夠用於ETL、跨平臺數據同步、通知服務等。之前沒有change stream的時候,也能夠經過tail oplog來追蹤修改,但這是複雜、危險的野路子。shell
本文地址:http://www.javashuo.com/article/p-fnwcvuof-u.html數據庫
Targeted changesexpress
Changes can be filtered to provide relevant and targeted changes to listening applications.網絡
Resumablilityapp
Resumability was top of mind when building change streams to ensure that applications can see every change in a collection. resume tokenide
Total orderingpost
MongoDB 3.6 has a global logical clock that enables the server to order all changes across a sharded cluster.性能
Durability
Change streams only include majority-committed changes.
Security
Change streams are secure – users are only able to create change streams on collections to which they have been granted read access.
Ease of use
Change streams are familiar – the API syntax takes advantage of the established MongoDB drivers and query language, and are independent of the underlying oplog format.
Idempotence
相比自動tail oplog,change stream 有如下優勢:
Change stream對MongoDB的部署有一些需求:
change event包括:
有意思的是,相比CRUD,多了一個replace事件。update 與 replace的區別在於
測試方法:啓動兩個Mongo shell,一個操做數據庫,一個watch。爲了方便區分,淺綠色背景爲Operate,灰色背景爲Watch
Operate
MongoDB Enterprise free-shard-0:PRIMARY> use engineering switched to db engineering
Watch
MongoDB Enterprise free-shard-0:PRIMARY> use engineering switched to db engineering order 2 MongoDB Enterprise free-shard-0:PRIMARY> cursor = db.users.watch() assert: command failed: { "operationTime" : Timestamp(1533888296, 2), "ok" : 0, "errmsg" : "cannot open $changeStream for non-existent database: engineering", "code" : 26, "codeName" : "NamespaceNotFound", "$clusterTime" : { "clusterTime" : Timestamp(1533888296, 2), "signature" : { "hash" : BinData(0,"fWTN4Kuv7cq9xCcC0vCF4AkTxuU="), "keyId" : NumberLong("6563302068054917121") } } } : aggregate failed
從watch報錯能夠看出,只能對已經存在的db watch,所以能夠先插入一條數據,建立對應的DB、Collection
Operate
MongoDB Enterprise free-shard-0:PRIMARY> db.users.insert({'username': 'test1', age: 18, 'email':'test1@gmail.con'}) WriteResult({ "nInserted" : 1 })
MongoDB Enterprise free-shard-0:PRIMARY> cursor = db.users.watch() MongoDB Enterprise free-shard-0:PRIMARY> cursor.next() 2018-08-10T16:08:49.200+0800 E QUERY [thread1] Error: error hasNext: false : DBCommandCursor.prototype.next@src/mongo/shell/query.js:853:1 @(shell):1:1
此時已經建立好用於監聽的cursor,此時尚未change event。
Operate
MongoDB Enterprise free-shard-0:PRIMARY> db.users.insert({'username': 'test2', age: 19, 'email':'test2@gmail.con'})
WriteResult({ "nInserted" : 1 })
Watch
MongoDB Enterprise free-shard-0:PRIMARY> cursor.next() { "_id" : { "_data" : BinData(0,"glttSC0AAAADRmRfaWQAZFttSCb45nBxa/FSsABaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "insert", "fullDocument" : { "_id" : ObjectId("5b6d4826f8e670716bf152b0"), "username" : "test2", "age" : 19, "email" : "test2@gmail.con" }, "ns" : { "db" : "engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6d4826f8e670716bf152b0") } }
Operate
MongoDB Enterprise free-shard-0:PRIMARY> db.users.update({username: "test1"}, {age: 19}) WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })
Watch
MongoDB Enterprise free-shard-0:PRIMARY> cursor.next() { "_id" : { "_data" : BinData(0,"glttSSMAAAACRmRfaWQAZFttR+r45nBxa/FSrwBaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "replace", "fullDocument" : { "_id" : ObjectId("5b6d47eaf8e670716bf152af"), "age" : 19 }, "ns" : { "db" : "engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6d47eaf8e670716bf152af") } }
能夠看到,操做的時候使用的是db.collection.update,但change event 倒是replace,緣由在eplace-a-document-entirely中有介紹
If the
<update>
document contains onlyfield:value
expressions, then:
- The
update()
method replaces the matching document with the<update>
document. Theupdate()
method does not replace the_id
value. For an example, see Replace All Fields.update()
cannot update multiple documents.
Operate
MongoDB Enterprise free-shard-0:PRIMARY> db.users.remove({ "_id" : ObjectId("5b6d47eaf8e670716bf152af")}) WriteResult({ "nRemoved" : 1 })
watch
MongoDB Enterprise free-shard-0:PRIMARY> cursor.next() { "_id" : { "_data" : BinData(0,"glttSfAAAAAFRmRfaWQAZFttR+r45nBxa/FSrwBaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "delete", "ns" : { "db" : "engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6d47eaf8e670716bf152af") } }
Operate
MongoDB Enterprise free-shard-0:PRIMARY> db.users.insert({'username': 'test1', age: 18, 'email':'test1@gmail.con'}) WriteResult({ "nInserted" : 1 }) MongoDB Enterprise free-shard-0:PRIMARY> db.users.update({username: "test1"}, {$set: {age: 19}}) WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })
Watch
MongoDB Enterprise free-shard-0:PRIMARY> cursor.next() { "_id" : { "_data" : BinData(0,"glttSmQAAAAERmRfaWQAZFttSlz45nBxa/FSsgBaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "insert", "fullDocument" : { "_id" : ObjectId("5b6d4a5cf8e670716bf152b2"), "username" : "test1", "age" : 18, "email" : "test1@gmail.con" }, "ns" : { "db" : "engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6d4a5cf8e670716bf152b2") } } MongoDB Enterprise free-shard-0:PRIMARY> cursor.next() { "_id" : { "_data" : BinData(0,"glttSn0AAAABRmRfaWQAZFttSlz45nBxa/FSsgBaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "update", "ns" : { "db" : "engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6d4a5cf8e670716bf152b2") }, "updateDescription" : { "updatedFields" : { "age" : 19 }, "removedFields" : [ ] } }
db.collection.watch 能夠設置選項fullDocument參數,這個在change event:update的時候就能夠返回對用documents的完整信息。
MongoDB Enterprise free-shard-0:PRIMARY> cursor = db.users.watch([], {fullDocument:'updateLookup'} )
Operate
MongoDB Enterprise free-shard-0:PRIMARY> db.users.update({username: "test1"}, {$set: {age: 29}}) WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })
Watch
MongoDB Enterprise free-shard-0:PRIMARY> cursor.next() { "_id" : { "_data" : BinData(0,"glttS88AAAAERmRfaWQAZFttSlz45nBxa/FSsgBaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "update", "fullDocument" : { "_id" : ObjectId("5b6d4a5cf8e670716bf152b2"), "username" : "test1", "age" : 29, "email" : "test1@gmail.con" }, "ns" : { "db" : "engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6d4a5cf8e670716bf152b2") }, "updateDescription" : { "updatedFields" : { "age" : 29 }, "removedFields" : [ ] } }
Operate
MongoDB Enterprise free-shard-0:PRIMARY> db.users.insert({"username": "test3", "age": 14}) WriteResult({ "nInserted" : 1 }) MongoDB Enterprise free-shard-0:PRIMARY> db.users.insert({"username": "test3", "age": 14}) WriteResult({ "nInserted" : 1 }) MongoDB Enterprise free-shard-0:PRIMARY> db.users.remove({"username": "test3"}) WriteResult({ "nRemoved" : 2 })
Watch
MongoDB Enterprise free-shard-0:PRIMARY> ret = cursor.next() { "_id" : { "_data" : BinData(0,"gltusJ4AAAABRmRfaWQAZFtusJ5f9Jy7Q0jALABaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "insert", "fullDocument" : { "_id" : ObjectId("5b6eb09e5ff49cbb4348c02c"), "username" : "test3", "age" : 14 }, "ns" : { "db" : "engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6eb09e5ff49cbb4348c02c") } } MongoDB Enterprise free-shard-0:PRIMARY> cursor.next() { "_id" : { "_data" : BinData(0,"gltusKAAAAABRmRfaWQAZFtusJ9f9Jy7Q0jALQBaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "insert", "fullDocument" : { "_id" : ObjectId("5b6eb09f5ff49cbb4348c02d"), "username" : "test3", "age" : 14 }, "ns" : { "db" : "engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6eb09f5ff49cbb4348c02d") } } MongoDB Enterprise free-shard-0:PRIMARY> cursor.next() { "_id" : { "_data" : BinData(0,"gltusK8AAAABRmRfaWQAZFtusJ5f9Jy7Q0jALABaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "delete", "ns" : { "db" : "engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6eb09e5ff49cbb4348c02c") } } MongoDB Enterprise free-shard-0:PRIMARY> cursor.next() { "_id" : { "_data" : BinData(0,"gltusK8AAAACRmRfaWQAZFtusJ9f9Jy7Q0jALQBaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "delete", "ns" : { "db" : "engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6eb09f5ff49cbb4348c02d") } } Mongo
Resume Watch
MongoDB Enterprise free-shard-0:PRIMARY> resume_cursor = db.users.watch([], {"resumeAfter": ret['_id']}) { "_id" : { "_data" : BinData(0,"gltusKAAAAABRmRfaWQAZFtusJ9f9Jy7Q0jALQBaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "insert", "fullDocument" : { "_id" : ObjectId("5b6eb09f5ff49cbb4348c02d"), "username" : "test3", "age" : 14 }, "ns" : { "db" : "5b6d2180df9db10e4ba91d60_engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6eb09f5ff49cbb4348c02d") } } { "_id" : { "_data" : BinData(0,"gltusK8AAAABRmRfaWQAZFtusJ5f9Jy7Q0jALABaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "delete", "ns" : { "db" : "5b6d2180df9db10e4ba91d60_engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6eb09e5ff49cbb4348c02c") } } { "_id" : { "_data" : BinData(0,"gltusK8AAAACRmRfaWQAZFtusJ9f9Jy7Q0jALQBaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "delete", "ns" : { "db" : "5b6d2180df9db10e4ba91d60_engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6eb09f5ff49cbb4348c02d") } } MongoDB Enterprise free-shard-0:PRIMARY> resume_cursor.next() 2018-08-11T17:49:13.127+0800 E QUERY [thread1] Error: error hasNext: false : DBCommandCursor.prototype.next@src/mongo/shell/query.js:853:1 @(shell):1:1
在resume_cursor中,resumeAfter的參數設置爲了以前的watch document,在watch的時候會一次性返回已經被消費過的change event
在Designing Data-Intensive Applications一書中,有一節Change Data Capture(cdc),講述得就是複製集(replica set)中replication log的使用,對於MongoDB, replication log其實就是oplog。書中提到:
The problem with most databases’ replication logs is that they have long been considered to be an internal implementation detail of the database, not a public API.
也就是說,應用(client)只能按照db的約束來使用db,而不是直接讀取、解析replication log。但直接使用replic log直接用來建立serach index,cache,data warehouse。以下圖所示:
change data capture (CDC), which is the process of observing all data changes written to a database and extracting them in a form in which they can be replicated to other systems.
CDC使得Search index, Data warehouse成爲了派生數據系統(derived data systems),也能夠理解爲是DB數據的視圖。另外,有意思的是,上圖db、replication log、derived data system組成的系統看起來很像一箇中心化複製集(single leader):DB是leader(Primary),derived data system(cache, data warehouse)是follower(Secondary)。
Change stream應用前景很是普遍,在 完美數據遷移-MongoDB Stream的應用 一文中,介紹了使用change stream來在服務化改造的時候作數據遷移,且給出了一個完整的示範。在USING MONGODB AS A REALTIME DATABASE WITH CHANGE STREAMS一文中,也結合NodeJs給出了一個簡單的使用案列。
官方對在Sharded Cluster上使用change stream有一些說明,能夠參考文檔,有如下幾點值得注意:
(1)
To guarantee total ordering of changes, for each change notification the mongos checks with each shard to see if the shard has seen more recent changes.
無論有沒有數據變動,mongos都須要在全部shards上check,影響了change steam的響應時間。若是網絡延時大,如geographically distributed shard,問題會更明顯。若是數據變動特別頻繁,那麼Change stream可能跟不上變化
(2)
For sharded collections, update operations with multi : true may cause any change streams opened against that collection to send notifications for orphaned documents.
對於update操做,若是設置 multi:True,那麼操做也可能在 orphaned documents.上執行,這樣也會產生多餘的change stream,應用可能須要處理這種情侶。BTW,ofphaned document是很使人頭疼的問題。
另外,MongoDB3.6只能針對單個collection進行watch,這樣若是要關注多個collection或者多個db的write event時,須要分別創建鏈接進行watch,在 MongoDB 3.6 Change Streams: A Nest Temperature and Fan Control Use Case一文中提到這可能帶來性能問題
It’s estimated that after 1000 streams you will start to see very measurable performance drops
本文介紹了MongoDB Change Stream這一新特性,以及其在具體應用中須要注意到的一些問題,並基於MongoDB atlas進行了簡單的嘗試。毫無疑問,Change Stream是很是有前途的特性,能解決不少如今實現起來很彆扭的問題。可是若是要用於線上業務,還須要大量的測試,尤爲是容錯性與性能。
an-introduction-to-change-streams
免費試用MongoDB雲數據庫 (MongoDB Atlas)教程