MongoDB -> kafka 高性能實時同步(採集)mongodb數據到kafka解決方案

寫這篇博客的目的

讓更多的人瞭解 阿里開源的MongoShake能夠很好知足mongodb到kafka高性能高可用實時同步需求(項目地址:https://github.com/alibaba/MongoShake,下載地址:https://github.com/alibaba/MongoShake/releases)。至此博客就結束了,你能夠愉快地啃這個項目了。仍是一塊兒來看一下官方的描述:linux

MongoShake is a universal data replication platform based on MongoDB's oplog. Redundant replication and active-active replication are two most important functions. 基於mongodb oplog的集羣複製工具,能夠知足遷移和同步的需求,進一步實現災備和多活功能。git

沒有標題的標題

哈哈,有興趣聽我囉嗦的能夠往下。最近,有個實時增量採集mongodb數據(數據量在天天10億條左右)的需求,須要先調研一下解決方案。我分別百度、google了mongodb kafka sync 同步 採集 實時等 關鍵詞,寫這篇博客的時候排在最前面的當屬kafka-connect(官方有實現https://github.com/mongodb/mongo-kafka,其實也有非官方的實現)那一套方案,我對kafka-connect相對熟悉一點(不熟悉的話估計編譯部署都要花好一段時間),沒測以前就感受可能不知足個人採集性能需求,測下來果真也是不知足需求。後來,也看到了https://github.com/rwynn/route81,編譯部署也較爲麻煩,一樣不知足採集性能需求。我搜索東西的時候通常狀況下不會往下翻太多,沒找到所需的,大多會嘗試換關鍵詞(包括中英文)搜搜,此次可能也提醒我下次要多往下找找,說不定有些好東西未必排在最前面幾個github

以後在github上搜in:readme mongodb kafka sync,讓我眼前一亮。mongodb

github上搜索mongodb、kafka、sync關鍵詞結果

點進去快速讀了一下readme,正是我想要的(後面本身實際測下來確實高性能、高可用,知足個人需求),官方也提供了MongoShake的性能測試報告json

這篇博客不講(也很大多是筆者技術太渣,沒法參透領會(●´ω`●))MongoShake的架構、原理、實現,如何高性能的,如何高可用的等等。就一個目的,但願其餘朋友在搜索實時同步mongodb數據時候,MongoShake的解決方案能夠排在最前面(實力所歸,誰用誰知道,獨樂樂不如衆樂樂,故做此博客),避免走彎路、繞路。api

初次使用MongoShake值得注意的地方

數據處理流程

v2.2.1以前的MongoShake版本處理數據的流程:架構

MongoDB(數據源端,待同步的數據)
-->MongoShake(對應的是collector.linux進程,做用是採集)
-->Kafka(raw格式,未解析的帶有header+body的數據)
-->receiver(對應的是receiver.linux進程,做用是解析,這樣下游組件就能拿到好比解析好的一條一條的json格式的數據)
-->下游組件(拿到mongodb中的數據用於本身的業務處理)工具

v2.2.1以前MongoShake的版本解析入kafka,須要分別啓collector.linux和receiver.linux進程,並且receiver.linux須要本身根據你的業務邏輯填充完整,而後編譯出來,默認只是把解析出來的數據打個log而已性能

src/mongoshake/receiver/replayer.go中的代碼如圖:測試

須要本身填充receiver邏輯的地方

詳情見:https://github.com/alibaba/MongoShake/wiki/FAQ#q-how-to-connect-to-different-tunnel-except-direct

v2.2.1版本MongoShake的collector.conf有一個配置項tunnel.message

# the message format in the tunnel, used when tunnel is kafka.
# "raw": batched raw data format which has good performance but encoded so that users
# should parse it by receiver.
# "json": single oplog format by json.
# "bson": single oplog format by bson.
# 通道數據的類型,只用於kafka和file通道類型。
# raw是默認的類型,其採用聚合的模式進行寫入和
# 讀取,可是因爲攜帶了一些控制信息,因此須要專門用receiver進行解析。
# json以json的格式寫入kafka,便於用戶直接讀取。
# bson以bson二進制的格式寫入kafka。
tunnel.message = json
  • 若是選擇的raw格式,那麼數據處理流程和上面以前的一致(MongoDB->MongoShake->Kafka->receiver->下游組件)
  • 若是選擇的是jsonbson,處理流程爲MongoDB->MongoShake->Kafka->下游組件

v2.2.1版本設置爲json處理的優勢就是把之前須要由receiver對接的格式,改成直接對接,從而少了一個receiver,也不須要用戶額外開發,下降開源用戶的使用成本。

簡單總結一下就是:
raw格式可以最大程度的提升性能,可是須要用戶有額外部署receiver的成本。json和bson格式可以下降用戶部署成本,直接對接kafka便可消費,相對於raw來講,帶來的性能損耗對於大部分用戶是可以接受的。

高可用部署方案

我用的是v2.2.1版本,高可用部署很是簡單。collector.conf開啓master的選舉便可:

# high availability option.
# enable master election if set true. only one mongoshake can become master
# and do sync, the others will wait and at most one of them become master once
# previous master die. The master information stores in the `mongoshake` db in the source
# database by default.
# 若是開啓主備mongoshake拉取同一個源端,此參數須要開啓。
master_quorum = true

# checkpoint存儲的地址,database表示存儲到MongoDB中,api表示提供http的接口寫入checkpoint。
context.storage = database

同時我checkpoint的存儲地址默認用的是database,會默認存儲在mongoshake這個db中。咱們能夠查詢到checkpoint記錄的一些信息。

rs0:PRIMARY> use mongoshake
switched to db mongoshake
rs0:PRIMARY> show collections;
ckpt_default
ckpt_default_oplog
election
rs0:PRIMARY> db.election.find()
{ "_id" : ObjectId("5204af979955496907000001"), "pid" : 6545, "host" : "192.168.31.175", "heartbeat" : NumberLong(1582045562) }

我在192.168.31.174,192.168.31.175,192.168.31.176上總共啓了3個MongoShake實例,能夠看到如今工做的是192.168.31.175機器上進程。自測過程,高速往mongodb寫入數據,手動kill掉192.168.31.175上的collector進程,等192.168.31.174成爲master以後,我又手動kill掉它,最終只保留192.168.31.176上的進程工做,最後統計數據發現,有重採數據現象,猜想有實例還沒來得及checkpoint就被kill掉了。

相關文章
相關標籤/搜索