MongoDB Change Stream:簡介、嘗試與應用

  在MongoDB3.6引入的新feature中,change stream無疑是很是吸引人的。html

  Change streams allow applications to access real-time data changes without the complexity and risk of tailing the oplog.mongodb

  Change stream容許應用實時獲取mongodb數據的變動,這是個呼聲很高的一個的需求,能夠用於ETL、跨平臺數據同步、通知服務等。之前沒有change stream的時候,也能夠經過tail oplog來追蹤修改,但這是複雜、危險的野路子。shell

  本文地址:http://www.javashuo.com/article/p-fnwcvuof-u.html數據庫

Change Stream特色

an-introduction-to-change-streams一文中,總結了change stream的幾個特色

Targeted changesexpress

  Changes can be filtered to provide relevant and targeted changes to listening applications.網絡

Resumablilityapp

  Resumability was top of mind when building change streams to ensure that applications can see every change in a collection. resume tokenide

Total orderingpost

  MongoDB 3.6 has a global logical clock that enables the server to order all changes across a sharded cluster.性能

Durability

  Change streams only include majority-committed changes.

Security

  Change streams are secure – users are only able to create change streams on collections to which they have been granted read access.

Ease of use

  Change streams are familiar – the API syntax takes advantage of the established MongoDB drivers and query language, and are independent of the underlying oplog format.

Idempotence

  All changes are transformed into a format that’s safe to apply multiple times. Listening applications can use a resume token from any prior change stream event, not just the most recent one, because reapplying operations is safe and will reach the same consistent state.

 

  相比自動tail oplog,change stream 有如下優勢:

  • 若是隻有單個節點持久化,那麼oplog對應的操做是可能被回滾的,而change stream有Durability特性
  • 在sharded cluster環境,change stream跨shards,能夠經過mongos tail oplog,而不用去每個replica set上分別tail
 

  Change stream對MongoDB的部署有一些需求:

  • 只對replica sets 或者sharded cluster(MongoDB3.6中shard必須是replica set)有用,這個不難理解,由於change stream也是利用了oplog。若是是sharded cluster,必須都過mongos鏈接。
  • 必須使用WiredTiger 引擎,使用replica set protocol version 1
 

Change Stream試用

  在文章 免費試用MongoDB雲數據庫 (MongoDB Atlas)教程中,介紹瞭如何使用MongoDB Atlas提供的雲數據庫服務,免費提供的集羣恰好是使用WiredTiger 引擎的Replica set,所以本文基於這個環境來測試。主要測試Change Stream所支持的全部事件(change event)、fullDocument特性、resume特性。

  change event包括:

  • insert
  • delete
  • replace
  • update
  • invalidate

  有意思的是,相比CRUD,多了一個replace事件。update 與 replace的區別在於

  A replace operation uses the update command, and consists of two stages:
  • Delete the original document with the documentKey and
  • Insert the new document using the same documentkey

 

  測試方法:啓動兩個Mongo shell,一個操做數據庫,一個watch。爲了方便區分,淺綠色背景爲Operate,灰色背景爲Watch

準備環境

  Operate

MongoDB Enterprise free-shard-0:PRIMARY> use engineering switched to db engineering

 

  Watch

MongoDB Enterprise free-shard-0:PRIMARY> use engineering
switched to db engineering
order 2
MongoDB Enterprise free-shard-0:PRIMARY> cursor = db.users.watch()
assert: command failed: {
        "operationTime" : Timestamp(1533888296, 2),
        "ok" : 0,
        "errmsg" : "cannot open $changeStream for non-existent database: engineering",
        "code" : 26,
        "codeName" : "NamespaceNotFound",
        "$clusterTime" : {
                "clusterTime" : Timestamp(1533888296, 2),
                "signature" : {
                        "hash" : BinData(0,"fWTN4Kuv7cq9xCcC0vCF4AkTxuU="),
                        "keyId" : NumberLong("6563302068054917121")
                }
        }
} : aggregate failed

  從watch報錯能夠看出,只能對已經存在的db watch,所以能夠先插入一條數據,建立對應的DB、Collection

  Operate

MongoDB Enterprise free-shard-0:PRIMARY> db.users.insert({'username': 'test1', age: 18, 'email':'test1@gmail.con'}) WriteResult({ "nInserted" : 1 })

 

  Watch
MongoDB Enterprise free-shard-0:PRIMARY> cursor = db.users.watch()
MongoDB Enterprise free-shard-0:PRIMARY> cursor.next()
2018-08-10T16:08:49.200+0800 E QUERY    [thread1] Error: error hasNext: false :
DBCommandCursor.prototype.next@src/mongo/shell/query.js:853:1
@(shell):1:1

  此時已經建立好用於監聽的cursor,此時尚未change event。

Insert 

  Operate

MongoDB Enterprise free-shard-0:PRIMARY> db.users.insert({'username': 'test2', age: 19, 'email':'test2@gmail.con'})
WriteResult({ "nInserted" : 1 })

  Watch

MongoDB Enterprise free-shard-0:PRIMARY> cursor.next()
{
        "_id" : {
                "_data" : BinData(0,"glttSC0AAAADRmRfaWQAZFttSCb45nBxa/FSsABaEAQMcjq0rdpL+LTQHXFkm7J7BA==")
        },
        "operationType" : "insert",
        "fullDocument" : {
                "_id" : ObjectId("5b6d4826f8e670716bf152b0"),
                "username" : "test2",
                "age" : 19,
                "email" : "test2@gmail.con"
        },
        "ns" : {
                "db" : "engineering",
                "coll" : "users"
        },
        "documentKey" : {
                "_id" : ObjectId("5b6d4826f8e670716bf152b0")
        }
}

 

replace

  Operate

MongoDB Enterprise free-shard-0:PRIMARY> db.users.update({username: "test1"}, {age: 19}) WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })

 

  Watch

MongoDB Enterprise free-shard-0:PRIMARY> cursor.next()
{
        "_id" : {
                "_data" : BinData(0,"glttSSMAAAACRmRfaWQAZFttR+r45nBxa/FSrwBaEAQMcjq0rdpL+LTQHXFkm7J7BA==")
        },
        "operationType" : "replace",
        "fullDocument" : {
                "_id" : ObjectId("5b6d47eaf8e670716bf152af"),
                "age" : 19
        },
        "ns" : {
                "db" : "engineering",
                "coll" : "users"
        },
        "documentKey" : {
                "_id" : ObjectId("5b6d47eaf8e670716bf152af")
        }
}

 

  能夠看到,操做的時候使用的是db.collection.update,但change event 倒是replace,緣由在eplace-a-document-entirely中有介紹

If the <update> document contains only field:value expressions, then:

delete

  Operate

MongoDB Enterprise free-shard-0:PRIMARY> db.users.remove({ "_id" : ObjectId("5b6d47eaf8e670716bf152af")}) WriteResult({ "nRemoved" : 1 })

 

  watch

MongoDB Enterprise free-shard-0:PRIMARY> cursor.next()
{
        "_id" : {
                "_data" : BinData(0,"glttSfAAAAAFRmRfaWQAZFttR+r45nBxa/FSrwBaEAQMcjq0rdpL+LTQHXFkm7J7BA==")
        },
        "operationType" : "delete",
        "ns" : {
                "db" : "engineering",
                "coll" : "users"
        },
        "documentKey" : {
                "_id" : ObjectId("5b6d47eaf8e670716bf152af")
        }
}

 

update

  Operate

MongoDB Enterprise free-shard-0:PRIMARY> db.users.insert({'username': 'test1', age: 18, 'email':'test1@gmail.con'}) WriteResult({ "nInserted" : 1 }) MongoDB Enterprise free-shard-0:PRIMARY> db.users.update({username: "test1"}, {$set: {age: 19}}) WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })

 

  Watch

MongoDB Enterprise free-shard-0:PRIMARY> cursor.next()
{
        "_id" : {
                "_data" : BinData(0,"glttSmQAAAAERmRfaWQAZFttSlz45nBxa/FSsgBaEAQMcjq0rdpL+LTQHXFkm7J7BA==")
        },
        "operationType" : "insert",
        "fullDocument" : {
                "_id" : ObjectId("5b6d4a5cf8e670716bf152b2"),
                "username" : "test1",
                "age" : 18,
                "email" : "test1@gmail.con"
        },
        "ns" : {
                "db" : "engineering",
                "coll" : "users"
        },
        "documentKey" : {
                "_id" : ObjectId("5b6d4a5cf8e670716bf152b2")
        }
}

MongoDB Enterprise free-shard-0:PRIMARY> cursor.next()
{
        "_id" : {
                "_data" : BinData(0,"glttSn0AAAABRmRfaWQAZFttSlz45nBxa/FSsgBaEAQMcjq0rdpL+LTQHXFkm7J7BA==")
        },
        "operationType" : "update",
        "ns" : {
                "db" : "engineering",
                "coll" : "users"
        },
        "documentKey" : {
                "_id" : ObjectId("5b6d4a5cf8e670716bf152b2")
        },
        "updateDescription" : {
                "updatedFields" : {
                        "age" : 19
                },
                "removedFields" : [ ]
        }
}

 

update fullDocument

  db.collection.watch 能夠設置選項fullDocument參數,這個在change event:update的時候就能夠返回對用documents的完整信息。

MongoDB Enterprise free-shard-0:PRIMARY> cursor = db.users.watch([], {fullDocument:'updateLookup'} )

 

  Operate

MongoDB Enterprise free-shard-0:PRIMARY> db.users.update({username: "test1"}, {$set: {age: 29}}) WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })

 

  Watch

MongoDB Enterprise free-shard-0:PRIMARY> cursor.next()
{
        "_id" : {
                "_data" : BinData(0,"glttS88AAAAERmRfaWQAZFttSlz45nBxa/FSsgBaEAQMcjq0rdpL+LTQHXFkm7J7BA==")
        },
        "operationType" : "update",
        "fullDocument" : {
                "_id" : ObjectId("5b6d4a5cf8e670716bf152b2"),
                "username" : "test1",
                "age" : 29,
                "email" : "test1@gmail.con"
        },
        "ns" : {
                "db" : "engineering",
                "coll" : "users"
        },
        "documentKey" : {
                "_id" : ObjectId("5b6d4a5cf8e670716bf152b2")
        },
        "updateDescription" : {
                "updatedFields" : {
                        "age" : 29
                },
                "removedFields" : [ ]
        }
}

 

resume change stream

  Operate

MongoDB Enterprise free-shard-0:PRIMARY> db.users.insert({"username": "test3", "age": 14}) WriteResult({ "nInserted" : 1 }) MongoDB Enterprise free-shard-0:PRIMARY> db.users.insert({"username": "test3", "age": 14}) WriteResult({ "nInserted" : 1 }) MongoDB Enterprise free-shard-0:PRIMARY> db.users.remove({"username": "test3"}) WriteResult({ "nRemoved" : 2 })

 

  Watch

MongoDB Enterprise free-shard-0:PRIMARY> ret = cursor.next()
{
        "_id" : {
                "_data" : BinData(0,"gltusJ4AAAABRmRfaWQAZFtusJ5f9Jy7Q0jALABaEAQMcjq0rdpL+LTQHXFkm7J7BA==")
        },
        "operationType" : "insert",
        "fullDocument" : {
                "_id" : ObjectId("5b6eb09e5ff49cbb4348c02c"),
                "username" : "test3",
                "age" : 14
        },
        "ns" : {
                "db" : "engineering",
                "coll" : "users"
        },
        "documentKey" : {
                "_id" : ObjectId("5b6eb09e5ff49cbb4348c02c")
        }
}
MongoDB Enterprise free-shard-0:PRIMARY> cursor.next()
{
        "_id" : {
                "_data" : BinData(0,"gltusKAAAAABRmRfaWQAZFtusJ9f9Jy7Q0jALQBaEAQMcjq0rdpL+LTQHXFkm7J7BA==")
        },
        "operationType" : "insert",
        "fullDocument" : {
                "_id" : ObjectId("5b6eb09f5ff49cbb4348c02d"),
                "username" : "test3",
                "age" : 14
        },
        "ns" : {
                "db" : "engineering",
                "coll" : "users"
        },
        "documentKey" : {
                "_id" : ObjectId("5b6eb09f5ff49cbb4348c02d")
        }
}
MongoDB Enterprise free-shard-0:PRIMARY> cursor.next()
{
        "_id" : {
                "_data" : BinData(0,"gltusK8AAAABRmRfaWQAZFtusJ5f9Jy7Q0jALABaEAQMcjq0rdpL+LTQHXFkm7J7BA==")
        },
        "operationType" : "delete",
        "ns" : {
                "db" : "engineering",
                "coll" : "users"
        },
        "documentKey" : {
                "_id" : ObjectId("5b6eb09e5ff49cbb4348c02c")
        }
}
MongoDB Enterprise free-shard-0:PRIMARY> cursor.next()
{
        "_id" : {
                "_data" : BinData(0,"gltusK8AAAACRmRfaWQAZFtusJ9f9Jy7Q0jALQBaEAQMcjq0rdpL+LTQHXFkm7J7BA==")
        },
        "operationType" : "delete",
        "ns" : {
                "db" : "engineering",
                "coll" : "users"
        },
        "documentKey" : {
                "_id" : ObjectId("5b6eb09f5ff49cbb4348c02d")
        }
}
Mongo

 

  Resume Watch

MongoDB Enterprise free-shard-0:PRIMARY> resume_cursor = db.users.watch([], {"resumeAfter": ret['_id']})
{ "_id" : { "_data" : BinData(0,"gltusKAAAAABRmRfaWQAZFtusJ9f9Jy7Q0jALQBaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "insert", "fullDocument" : { "_id" : ObjectId("5b6eb09f5ff49cbb4348c02d"), "username" : "test3", "age" : 14 }, "ns" : { "db" : "5b6d2180df9db10e4ba91d60_engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6eb09f5ff49cbb4348c02d") } }
{ "_id" : { "_data" : BinData(0,"gltusK8AAAABRmRfaWQAZFtusJ5f9Jy7Q0jALABaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "delete", "ns" : { "db" : "5b6d2180df9db10e4ba91d60_engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6eb09e5ff49cbb4348c02c") } }
{ "_id" : { "_data" : BinData(0,"gltusK8AAAACRmRfaWQAZFtusJ9f9Jy7Q0jALQBaEAQMcjq0rdpL+LTQHXFkm7J7BA==") }, "operationType" : "delete", "ns" : { "db" : "5b6d2180df9db10e4ba91d60_engineering", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("5b6eb09f5ff49cbb4348c02d") } }
MongoDB Enterprise free-shard-0:PRIMARY> resume_cursor.next()
2018-08-11T17:49:13.127+0800 E QUERY    [thread1] Error: error hasNext: false :
DBCommandCursor.prototype.next@src/mongo/shell/query.js:853:1
@(shell):1:1

 

  在resume_cursor中,resumeAfter的參數設置爲了以前的watch document,在watch的時候會一次性返回已經被消費過的change event

Change Stream應用

DDIA cdc

  在Designing Data-Intensive Applications一書中,有一節Change Data Capture(cdc),講述得就是複製集(replica set)中replication log的使用,對於MongoDB, replication log其實就是oplog。書中提到:

The problem with most databases’ replication logs is that they have long been considered to be an internal implementation detail of the database, not a public API.

  也就是說,應用(client)只能按照db的約束來使用db,而不是直接讀取、解析replication log。但直接使用replic log直接用來建立serach index,cache,data warehouse。以下圖所示:

    

  change data capture (CDC), which is the process of observing all data changes written to a database and extracting them in a form in which they can be replicated to other systems.

  CDC使得Search index, Data warehouse成爲了派生數據系統(derived data systems),也能夠理解爲是DB數據的視圖。另外,有意思的是,上圖db、replication log、derived data system組成的系統看起來很像一箇中心化複製集(single leader):DB是leader(Primary),derived data system(cache, data warehouse)是follower(Secondary)。

  Change stream應用前景很是普遍,在 完美數據遷移-MongoDB Stream的應用 一文中,介紹了使用change stream來在服務化改造的時候作數據遷移,且給出了一個完整的示範。在USING MONGODB AS A REALTIME DATABASE WITH CHANGE STREAMS一文中,也結合NodeJs給出了一個簡單的使用案列。

Change Stream實現與問題

官方對在Sharded Cluster上使用change stream有一些說明,能夠參考文檔,有如下幾點值得注意:

(1)

To guarantee total ordering of changes, for each change notification the mongos checks with each shard to see if the shard has seen more recent changes. 

  無論有沒有數據變動,mongos都須要在全部shards上check,影響了change steam的響應時間。若是網絡延時大,如geographically distributed shard,問題會更明顯。若是數據變動特別頻繁,那麼Change stream可能跟不上變化

(2)  

For sharded collections, update operations with multi : true may cause any change streams opened against that collection to send notifications for orphaned documents.

  對於update操做,若是設置 multi:True,那麼操做也可能在 orphaned documents.上執行,這樣也會產生多餘的change stream,應用可能須要處理這種情侶。BTW,ofphaned document是很使人頭疼的問題。

   

  另外,MongoDB3.6只能針對單個collection進行watch,這樣若是要關注多個collection或者多個db的write event時,須要分別創建鏈接進行watch,在 MongoDB 3.6 Change Streams: A Nest Temperature and Fan Control Use Case一文中提到這可能帶來性能問題

  It’s estimated that after 1000 streams you will start to see very measurable performance drops

  不過,在MongoDB4.0中,能夠在db,甚至cluster這個級別watch stream,對應用來講方便了不少,也避免了性能問題。

總結

  本文介紹了MongoDB Change Stream這一新特性,以及其在具體應用中須要注意到的一些問題,並基於MongoDB atlas進行了簡單的嘗試。毫無疑問,Change Stream是很是有前途的特性,能解決不少如今實現起來很彆扭的問題。可是若是要用於線上業務,還須要大量的測試,尤爲是容錯性與性能。

References

MongoDB Change Stream

an-introduction-to-change-streams

免費試用MongoDB雲數據庫 (MongoDB Atlas)教程

Designing Data-Intensive Applications

 完美數據遷移-MongoDB Stream

USING MONGODB AS A REALTIME DATABASE WITH CHANGE STREAMS

相關文章
相關標籤/搜索