時間 2017-09-18
5 ways to synchronize data from MongoDb to ElasticSearchcss
https://www.linkedin.com/pulse/5-way-sync-data-from-mongodb-es-kai-haopython
Elastic search(ES) is a pop-star for recording and analyzing data, and Mongodb is a famous NoSQL database for storing and querying data.
With our web infrastructure improving, how can we export data from mongodb to ES for searching or analyzing purposes?
There are 5 possible solutions recommended for your choice.
1.synchronized by web server
We can use Mongoosastic module for storing-in-both-sides purpose when we use Nodejs as a web server container. When one document needs to be stored,
Mongoosastic can commit the changes to both mongo and ES. As the chart below: Here is the reference link: Mongoosastic. The advantage is that data can be stored in both mongo and ES simultaneously, and the downside is that overhead may be caused in CUD operation efficiency.
And inconsistent data might be generated when one type of the db store failed. And the server framework is not flexible enough for db migrating.
2.Manually loading data from Mongo to ES
Transporter tool is a good choice to synchronize data once you want to export mongo data to another ES server. Transporter also can export data from or to other type of data store. Reference link is: Transporter.
It's important to know that the transporter synchronizing only once. When the job is done, the transporter comes to its end. 3. Plugin for ES There is a plugin for ES named "elasticsearch-river-mongodb", and was widely used in ES 1.x, but now river mechanism for ES 2.x is deprecated. Reference link is elasticsearch-river-mongodb. 4. JDBC input plugin for logstash We can take advantage of buffering , inputting, outputting and filtering abilities from logstash by adding a mongo input and ES output plugin to get this job done. JDBC input plugin is one of the choices, but it needs JDBC driver support. As I know there is no well-supported-free JDBC driver for mongo. Some trial versions can be found in Unity or Simba. Reference link is : JDBC Plugin for Logstash 5. Mongo-ES connector mongo-connector is a real-time sync service as a package of python. It creates a pipeline from a mongodb cluster to one or more target systems.
It needs mongo to run in replica-set mode, sync data in mongo to the target then tails the mongo oplog.
It needs a package named "elastic2_doc_manager" to write data to ES. Process chart below:
Reference link is : github or python.
To recapitulate it, it is a must to remember: mongo replica set, an opened port and IP for ES, using elastic2_doc_manager if you use ES 2.x. At present, I am not yet ready with any official support in Beats. it will be in the future. So that's all, 5 ways to mongo-ES-sync.
===============================================》》》》》》
MongoDB-Elasticsearch 實時數據導入git
https://zhuanlan.zhihu.com/p/26906652github
搜索功能是App必不可少的一部分,咱們使用目前比較流行的Elasticsearch進行全文檢索。咱們的數據主要存儲在MongoDB中,如何將這些數據導入到Elasticsearch中,並能一直保持同步呢?作法大體分爲兩種:web
- 在應用層操做,在讀寫MongoDB的同時讀寫Elasticsearch,好比mongoosastic,須要修改已有的業務代碼。
- 與業務無關,經過讀取MongoDB的replica oplog,將MongoDB產生的操做在Elasticsearch上replay,來實現單向同步
爲了減小老代碼修改爲本,咱們選擇了第二種方案,使用mongo-connector來進行數據同步。然而用着用着咱們發現mongo-connector有一些問題:sql
- 有些數據須要關聯查詢,可是mongo-connector並不支持parent-child模型(其實有一個fork是支持的,但已經落後主分支一個版本,而且合進主分支的但願渺茫)。
- mongo-connector支持斷點續傳,可是恢復速度很是緩慢。
- mongo-connector能夠設置每次處理的文檔數量,但坑爹的地方在於,到不了設置的數字,它始終不會寫入。好比,MongoDB一個表只有100個文檔,可是設置了batch的size爲1000,因而那100個文檔這輩子也同步不到Elasticsearch中了。
- mongo-connector不會限速,直接把Elasticsearch寫炸了,但它不會管,接着寫,並且中間丟掉的數據就算後面有oplog裏面有update操做,也沒辦法恢復,會報出404錯誤。
- 在MongoDB裏面存了一張meta表,在Elasticsearch裏面也存了一個meta索引,裏面存了大量的timestamp,直接使Elasticsearch文檔總數翻倍。
因而咱們開始尋找更好用的工具,卻發現沒有好用的工具:mongodb
- Elasticsearch Rivers,曾經的官方同步工具。但該項目早已廢棄。
- Transporter,IBM旗下的Compose公司出品的同步工具。也不支持parent-child relationship,而且項目進度緩慢。
- elasticsearch-hadoop,先導到hadoop,再導到Elasticsearch。高射炮打蚊子,繞一大圈,不經濟。
沒辦法,只好本身用TypeScript寫一個,取名爲mongo-es。shell
現已開源至 github ,併發布到了 npm ,歡迎你們多多試用,多挑(ti)毛(xu)病(qiu)。mongo-es導入數據分爲兩個階段:數據庫
- Scan:掃描整個MongoDB的collection,每條文檔都插入到Elasticsearch對應的index裏面。使用Bulk API,進行批量寫入。在掃描開始前記錄當前的時間點,供第二階段使用。
- Tail:從剛纔記錄的時間點,或一個指定的時間點開始,將MongoDB的oplog在Elasticsearch上進行replay。使用RxJS的bufferWithTimeOrCount函數,既能批量寫入,又能保證同步延遲不會很長(通常是一秒左右)。
mongo-es比mongo-connector進步的地方有:npm
- 支持parent-child relationship,能夠處理須要join的數據。
- 能夠逆序Scan,先導入最新的數據,這對於出錯後重建索引快速恢復很是有用。
- 無需在兩邊存儲多餘元數據,只記錄oplog的timestamp。只要程序掛的時間不太長,oplog裏面還有這個timestamp,就能恢復。
- 遇到缺失文檔自動恢復。當由於不可控因素(如網絡緣由),致使某個本應已經同步了的文檔在Elasticsearch中不存在。這時若是oplog裏面遇到一個對該文檔的update操做,mongo-connector沒法處理,打印出404錯誤。遇到這種狀況時,mongo-es會回到MongoDB中,讀取到這個文檔,進行更新。
- 有限速功能,可以限制每秒鐘讀取的文檔數量,避免把Elasticsearch壓垮。
固然了,mongo-connector是一個更加通用的程序,能夠把文檔導到更多的地方。mongo-es只是把MongoDB的數據導入到Elasticsearch中,這樣比較未免有些不公平,但就在MongoDB到Elasticsearch這個使用場景下,仍是好用很多的。
\開發過程當中踩過的坑:
- Scan階段使用stream,方便控制讀取速度。Tail階段使用cursor,配合noCursorTimeout參數,避免長時間沒有oplog時的超時錯誤。Tail階段若是用stream,即便是設置了noCursorTimeout,超時了也會報錯。
- 對於操做是update的oplog,oplog裏面有多是一個完整的文檔,這時候直接就能夠寫入。也有多是$set或$unset操做,這時候要去Elasticsearch裏面取到舊的,完整的文檔,在內存裏執行update後再寫入回去。最好不要直接讀MongoDB,以減小MongoDB負擔。
- 在內存中執行update時,也要檢查變化的字段是否屬於咱們須要的字段。若是變化的都不是須要的字段,能夠忽略此次update操做,若是變化的字段不在咱們須要的範圍內,則應排除,以減小寫入次數。
- 有_parent的文檔是不能直接用_id訪問到的,由於它的routing是_parent,必須指定_parent的值才行。對於操做是update的oplog,咱們只能拿到_id,拿不到_parent對應的字段,因此這時要用es.search代替es.get,訪問每一個分片,才能拿到文檔。
- Timestamp在js代碼裏表示時low在前,high在後。在mongo shell裏面是反過來的。
- Bulk API傳入的body長度不能爲0,遇到0的狀況要跳過,不然會報錯。
=============================================>>>>>>>>>>
=====================================》》》》》》》》》》》
mongo-connector實現MongoDB與elasticsearch實時同步深刻詳解
http://blog.csdn.net/laoyang360/article/details/51842822
引言:
驗證代表:mongo-connector工具支持MongoDB與ES之間的實時增insert、刪delete、改update操做。
對於歷史數據,mongo-connector工具不能同步到ES中,根因是自己工具不支持(初步界定),仍是沒有這種場景,待查(進一步研究後再更新)。
1. mongo-connector 地址:
https://github.com/mongodb-labs/mongo-connector
二、 mongo-connector 工具簡介
mongo-connector工具建立一個從MongoDB簇到一個或多個目標系統的管道,目標系統包括:Solr,Elasticsearch,或MongoDB簇。
該工具在MongoDB與目標系統間同步數據,並跟蹤MongoDB的oplog,保持操做與MongoDB的實時同步。
該工具已經在python2.6,2.7,3.3+下進行驗證。
mongo-connector工具是基於python開發的實時同步服務工具。它要求mongo運行在replica-set模式,且須要 elastic2_doc_manager將數據寫入ES。
三、 elastic2-doc-manager 工具簡介
這是Elastic2.x版本的文檔管理器。對應Elastic1.x版本須要使用 elastic-doc-manager。
四、ES與MongoDB同步步驟:
(1)安裝 mongo-connector。
pip install mongo-connector
坑:用上面命令在Company內網可能會出現以下錯誤信息,
Retrying (Retry(total=0, connect=None, read=None, redirect=None)) after connection broken by 'ConnectTimeoutError(<pip._vendor.requests.packages.urllib3.connection.HTTPConnection object at 0x03742DF0>, 'Connection to mirrors.aliyun.com timed out. (connect timeout=15)')': /pypi/simple/mongo-connector/ Could not find a version that satisfies the requirement mongo-connector[elastic5] (from versions: ) No matching distribution found for mongo-connector[elastic5]
經調查後,須要配置pip的代理和鏡像(若是install網速特別慢的話)
注:pip爲安裝python後能夠用到的命令 <
xxx.xxx.x.xx爲內網代理ip>
pip install --proxy http://xxx.xxx.x.xx:8000 --index http://mirrors.aliyun.com/pypi/simple/ mongo-connector[elastic5] --trusted-host mirrors.aliyun.com
參考文檔
mongo-connector 2.5.1:https://pypi.python.org/pypi/mongo-connector/
<<
mongodb-labs/mongo-connector / Usage with Elasticsearch:
https://github.com/mongodb-labs/mongo-connector/wiki/Usage-with-Elasticsearch#installation
mongodb-labs/elastic2-doc-manager:
https://github.com/mongodb-labs/elastic2-doc-manager
>>
python pip設置代理:http://blog.csdn.net/dangerousroy/article/details/52924116
Python pip 國內鏡像大全及使用辦法:http://blog.csdn.net/testcs_dn/article/details/54374849
(2)安裝 elastic2-doc-manager。
pip install elastic2-doc-manager
注意:
若是不安裝(2)直接進入(3)、(4)則會報錯:
[root@5b9dbaaa148a bin]# mongo-connector -m 10.8.5.99:27017 -t 10.8.5.101:9200 -d elastic2_doc_manager Logging to mongo-connector.log. Exception in thread Thread-1: Traceback (most recent call last): File "/usr/lib64/python2.6/threading.py", line 532, in __bootstrap_inner self.run()
(3)mongo端啓動
MongoDB 必須開啓複製集,若是已經開啓請忽略這一步:
如需開啓複製集設置,參考以下步驟
Windows搭建MongoDB分片以及複製集:
http://blog.csdn.net/liangxw1/article/details/78031293
- 【驗證】初始化副本集的配置
- 28
- 29
rs0:PRIMARY> rs.status() { "set" : "rs0", "date" : ISODate("2016-07-05T08:50:55.272Z"), "myState" : 1, "term" : NumberLong(1), "heartbeatIntervalMillis" : NumberLong(2000), "members" : [ { "_id" : 0, "name" : "b48eafd69929:27017", "health" : 1, "state" : 1, "stateStr" : "PRIMARY", "uptime" : 115, "optime" : { "ts" : Timestamp(1467708606, 1), "t" : NumberLong(1) }, "optimeDate" : ISODate("2016-07-05T08:50:06Z"), "infoMessage" : "could not find member to sync from", "electionTime" : Timestamp(1467708605, 2), "electionDate" : ISODate("2016-07-05T08:50:05Z"), "configVersion" : 1, "self" : true } ], "ok" : 1 }
(4)ES端同步操做
- 1
- 2
[root@5b9dbaaa148a bin]# mongo-connector -m 10.8.5.99:27017 -t 10.8.5.101:9200 -d elastic2_doc_manager Logging to mongo-connector.log.
參數含義:
-m: mongodb的地址與端口,端口默認爲27017。
-t:ES的地址與端口,端口默認爲9200。
-d:doc manager的名稱,2.x版本爲: elastic2-doc-manager。
五、ES與MongoDB Insert插入操做的同步驗證
(1)Mongo端插入數據操做:
#Mongo建立數據庫(對應ES的Index) rs0:PRIMARY> use zhang_index switched to db zhang_index #Mongo中插入數據(其中col_02對應ES中的Type) rs0:PRIMARY> db.col_02.insert({name:"laoluo", birth:"1964-03-21", sex:"man", company:"chuizi"}); WriteResult({ "nInserted" : 1 }) rs0:PRIMARY> db.col_02.insert({name:"renzhengfei", birth:"1954-03-21", sex:"man", company:"huawei"});
(2)Es端檢索驗證
[root@5b9dbaaa148a test_log]# curl -XGET http://10.8.5.101:9200/zhang_index/col_02/_search?pretty { "took" : 4, "timed_out" : false, "_shards" : { "total" : 8, "successful" : 8, "failed" : 0 }, "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "zhang_index", "_type" : "col_02", "_id" : "577b7d8ceb8e3dc2d1db12a9", "_score" : 1.0, "_source" : { "company" : "huawei", "name" : "renzhengfei", "birth" : "1954-03-21", "sex" : "man" } }, { "_index" : "zhang_index", "_type" : "col_02", "_id" : "577b7d4aeb8e3dc2d1db12a7", "_score" : 1.0, "_source" : { "company" : "chuizi", "name" : "laoluo", "birth" : "1964-03-21", "sex" : "man" } } ] } }
六、 ES與MongoDB Update更新操做的同步驗證
(1)MongoDB的更新update操做
rs0:PRIMARY> db.col_02.update({'name':'laoluo'}, {$set:{'name':'luoyonghao'}}) WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 }) rs0:PRIMARY> rs0:PRIMARY> db.col_02.find().pretty() { "_id" : ObjectId("577b7d4aeb8e3dc2d1db12a7"), "name" : "luoyonghao", "birth" : "1964-03-21", "sex" : "man", "company" : "chuizi" } { "_id" : ObjectId("577b7d8ceb8e3dc2d1db12a9"), "name" : "renzhengfei", "birth" : "1954-03-21", "sex" : "man", "company" : "huawei" }
(2)Es端檢索更新後結果
[root@5b9dbaaa148a test_log]# curl -XGET http://10.8.5.101:9200/zhang_index/col_02/_search?pretty { "took" : 1, "timed_out" : false, "_shards" : { "total" : 8, "successful" : 8, "failed" : 0 }, "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "zhang_index", "_type" : "col_02", "_id" : "577b7d8ceb8e3dc2d1db12a9", "_score" : 1.0, "_source" : { "company" : "huawei", "name" : "renzhengfei", "birth" : "1954-03-21", "sex" : "man" } }, { "_index" : "zhang_index", "_type" : "col_02", "_id" : "577b7d4aeb8e3dc2d1db12a7", "_score" : 1.0, "_source" : { "company" : "chuizi", "name" : "luoyonghao", "birth" : "1964-03-21", "sex" : "man" } } ] } }
七、 ES與MongoDB delete刪除操做的同步驗證
(1) MongoDB的刪除delete操做
rs0:PRIMARY> db.col_02.remove({'name':'renzhengfei'}) WriteResult({ "nRemoved" : 1 }) rs0:PRIMARY> db.col_02.find() { "_id" : ObjectId("577b7d4aeb8e3dc2d1db12a7"), "name" : "luoyonghao", "birth" : "1964-03-21", "sex" : "man", "company" : "chuizi" } rs0:PRIMARY> db.col_02.find().pretty() { "_id" : ObjectId("577b7d4aeb8e3dc2d1db12a7"), "name" : "luoyonghao", "birth" : "1964-03-21", "sex" : "man", "company" : "chuizi" }
(2)ES端檢索刪除後結果
結果代表,MongoDB刪除的內容,ES端已經同步刪除。
[root@5b9dbaaa148a test_log]# curl -XGET http://10.8.5.101:9200/zhang_index/col_02/_search?pretty { "took" : 2, "timed_out" : false, "_shards" : { "total" : 8, "successful" : 8, "failed" : 0 }, "hits" : { "total" : 1, "max_score" : 1.0, "hits" : [ { "_index" : "zhang_index", "_type" : "col_02", "_id" : "577b7d4aeb8e3dc2d1db12a7", "_score" : 1.0, "_source" : { "company" : "chuizi", "name" : "luoyonghao", "birth" : "1964-03-21", "sex" : "man" } } ] } }
參見詳細介紹:
https://docs.mongodb.com/manual/tutorial/deploy-replica-set/
Mongo與ES同步的5種方式:
https://www.linkedin.com/pulse/5-way-sync-data-from-mongodb-es-kai-hao
常見Bug:
How to setup a MongoDB replica set for the connector?
https://docs.mongodb.com/manual/tutorial/deploy-replica-set/
使用Mongo Connector和Elasticsearch實現模糊匹配 http://www.csdn.net/article/2014-09-02/2821485-how-to-perform-fuzzy-matching-with-mongo-connector?