現場填坑系列:使用bulk操做提升性能,解決mongoshake 向ES同步延遲。

接到現場報告,MongoDB向ES同步數據延遲愈來愈大,有的已經超過10個小時,形成客戶新加入的用戶沒法被搜索出來。因爲在系統中ES相似於數倉,不少統計和第三方接系統都須要從ES獲取數據,因此也影響了一些其餘依賴ES數據的功能和業務。mongodb

架構簡圖

tomcat------日誌數據----->logstash-------日誌數據--->|      E  S數據庫

mongodb---業務數據--->mongoshake---業務數據 -->|      集羣vim

日誌經過logstash同步到ES,業務數據經過mongoshake本身實現的ES推送組件推送到ES。ES 5臺構成一個集羣。api

問題現象

  1. 延遲只發生在第二條通路,第一條通路雖然也有較大併發,但不發生延遲;
  2. 且延遲在業務峯期纔會出現,非高峯期延遲不嚴重或沒有延遲;
  3. 經過上面的描述能夠肯定此延遲於業務相關,且與mongoshake推送的實現方式相關;

初步分析

這個結構在兩個異地機房各有一套,中間經過同步機制同步。因爲業務數據是從本地mongodb同步到本地ES發生延遲,首先排除是雙機房間傳輸帶寬問題。tomcat

會不會是ES在業務高峯期負載太高,形成推送延遲:可能性有,可是較低,由於若是是ES查詢並不慢,且若是是由此形成,logstash 推送也應該受到影響。對ES的性能監控也能夠印證這個問題。性能優化

會不會是兩種數據業務的不一樣形成的性能差別:logstash的數據主要是日誌,插入爲主,mongoshake的數據是業務數據,涉及到對之前的數據進行修改:確實有可能,但比較起來延遲不該該有如此之大,一個秒級延遲,一個天級延遲,仍是首先考慮實現機制問題。多線程

同步機制

mongoshake向ES同步機制,是將須要在ES存放的幾張表的oplog在ES回放,此程序由咱們的開發人員擴展的mongoshake ES組件完成。架構

oplog ----- mongoshake----- oplog replay----->ES併發

聯合高峯期延遲增長的現象,能夠猜想高峯期業務數據操做形成的oplog有大量增長,因爲mongoshake自己(除非修改源碼)只能篩選表,不能篩選哪些表的具體日誌,只要是這幾張包的oplog都會同步,因此形成延遲。究竟是oplog過多須要篩選,仍是同步能力過低須要改進,咱們須要進一步查證。app

推送能力統計

現場人員查看了一段時間的同步量,對現有機制的oplog處理及回放能力進行統計。

第一次統計:130秒同步 2186

第二次統計:180秒同步 2714

可見平均每秒能力不足20條日誌,確定是太低。那麼客戶現場實際業務每秒到底要產生多少條數據?這個問題要查清楚,做爲推送性能優化的底線。

實際業務oplog狀況分析

對客戶業務能力進行統計,須要將一成天的oplog導出,oplog因爲沒有索引,雖然能夠直接經過find,並給出ts 查詢,但因爲有40G數據,查詢及其緩慢 。因此咱們選擇將數據導出到文本文件,進行分析

start=$(date +%s -d "2020-03-24 08:00:00")
end=$(date +%s -d "2020-03-24 10:00:00")
mongodump -h localhost -d local -c oplog.rs -q '{"ts":{$gt:Timestamp('$start',1),$lt:Timestamp('$end',1)}}' -o /home/backup
cd /home/backup/local
bsondump oplog.rs.bson > oplog.rs.txt

進一步對oplog進行分析,在vim中分別統計每一個小時的日誌數量,能夠獲得下表:橫軸是北京時間24個小時,縱軸是oplog數量,其中灰色是oplog中須要同步到ES的

能夠看出高峯期oplog大量增加,須要同步的日誌超過150000,平均每秒42條oplog須要同步。而處理能力不到20,因此高峯期一個小時的數據每每須要2-3個小時才能同步完成,且從8點開始,一直都下午18點,實際oplog產生都超過處理能力。

往下的優化方向,一個是減小日誌,一個是增長處理能力,高峯期每秒42條日誌已經不高,雖然能夠優化,但可優化範圍有限。增長處理能力纔是關鍵。

開發查看ES同步代碼,原有代碼使用逐條同步模式,同步一條,獲取一條,同時採用性能較低的腳本同步方式,如今使用批量處理(實現參照mongoshake 向mongodb同步的 direct writer實現。批量調用elastic Client 提供的bulk api進行操做)

bulkRequest := bw.client.Bulk()
	
for _, log := range oplogs { ...... bulkRequest.Add(elastic.NewBulkDeleteRequest().Index(index).Type(doc_type).Id(id)) } bulkResponse, err := bulkRequest.Do(context.Background()) 

對update 中的unset(刪除字段) 進行處理,由於unset執行有速度有瓶頸,因此根據實際狀況直接改成將字段置空;

for _, v := range unsets { doc[v] = nil } 

效果

在進行上述操做後,單線程狀況下數據處理的速度超過每秒1000條(未嚴格測試),新同步代碼幾分鐘就能同步一個小時的oplog,徹底達到性能要求。

討論

oplog優化

因爲ES同步性能大幅提升,因此能夠不用繼續優化oplog,可是oplog能夠反映關鍵業務對數據庫的訪問狀況,特別是寫入,在mongodb replica set中只能在primary 節點完成,即便增長節點也沒法分擔流量,因此對oplog的進一步分析依然必要。同時oplog中包含數量的大小,也對replicaset 的同步帶寬有影響,特別是出現跨機房同步的狀況時。

因此更進一步,咱們還對業務oplog進行了分析,此分析咱們會在別的文章中討論。

多線程執行

注意這裏ES同步沒有使用多線程處理,主要是考慮業務數據多線程操做的事務性。要實現此種事務,須要對mongodb自己進行必定改造。對mongodb源碼改造實現雙向同步和多線程寫入會在其餘文章中討論。

相關文章
相關標籤/搜索