衆所周知,Elasticsearch是⼀個實時的分佈式搜索引擎,爲⽤戶提供搜索服務。當咱們決定存儲某種數據,在建立索引的時候就須要將數據結構,即Mapping肯定下來,於此同時索引的設定和不少固定配置將不能改變。
<!-- more -->
那若是後續業務發生變化,須要改變數據結構或者更換ES更換分詞器怎麼辦呢?爲此,Elastic團隊提供了不少經過輔助⼯具來幫助開發⼈員進⾏重建索引的方案。
若是對 reindex
API 不熟悉,那麼在遇到重構的時候,必然事倍功半,效率低下。反之,就能夠方便地進行索引重構,省時省力。node
假設以前咱們已經存在一個blog索引,由於更換分詞器須要對該索引中的數據進行重建索引,以便支持業務使用新的分詞規則搜索數據,而且儘量使這個變化對外服務沒有感知,大概分爲如下幾個步驟:程序員
blog_lastest
,Mapping數據結構與blog
索引一致blog
數據同步至blog_lastest
blog
索引blog_lastest
添加別名blog
在這裏推薦一個ES管理工具 Kibana,主要針對數據的探索、可視化和分析。
put /blog_lastest/ { "mappings":{ "properties":{ "title":{ "type":"text", "analyzer":"ik_max_word" }, "author":{ "type":"keyword", "fields":{ "seg":{ "type":"text", "analyzer":"ik_max_word" } } } } } }
接⼝將會在 reindex 結束後返回微信
POST /_reindex { "source": { "index": "blog" }, "dest": { "index": "blog_lastest" } }
在 kibana
中的使用以下所示數據結構
固然高版本(7.1.1)中,ES都有提供對應的Java REST Client
,好比app
ReindexRequest reindexRequest = new ReindexRequest(); reindexRequest.setSourceIndices("blog").setSource.setDestIndex("blog_lastest"); TaskSubmissionResponse taskSubmissionResponse = client.submitReindexTask(reindexRequest, RequestOptions.DEFAULT);
爲了防止贅述,接下來舉例所有以kibana
中請求介紹,若是有須要用Java REST Client
,能夠自行去ES官網查看。異步
若是 reindex 時間過⻓,建議加上 wait_for_completion=false
的參數條件,這樣 reindex 將直接返回 taskId
。分佈式
POST /_reindex?wait_for_completion=false { "source": { "index": "blog" }, "dest": { "index": "blog_lastest" } }
返回:工具
{ "task" : "dpBihNSMQfSlboMGlTgCBA:4728038" }
op_type
參數控制着寫入數據的衝突處理方式,若是把 op_type
設置爲 create
【默認值】,在 _reindex
API 中,表示寫入時只在 dest
index
中添加不存在的 doucment,若是相同的 document 已經存在,則會報 version confilct
的錯誤,那麼索引操做就會失敗。【這種方式與使用 _create API 時效果一致】oop
POST _reindex { "source": { "index": "blog" }, "dest": { "index": "blog_lastest", "op_type": "create" } }
若是這樣設置了,也就不存在更新數據的場景了【衝突數據沒法寫入】,咱們也能夠把 op_type
設置爲 index
,表示全部的數據所有從新索引建立。測試
默認狀況下,當發生 version conflict
的時候,_reindex
會被 abort
,任務終止【此時數據尚未 reindex
完成】,在返回體中的 failures
指標中會包含衝突的數據【有時候數據會很是多】,除非把 conflicts
設置爲 proceed
。
關於 abort
的說明,若是產生了 abort
,已經執行的數據【例如更新寫入的】仍然存在於目標索引,此時任務終止,還會有數據沒有被執行,也就是漏數了。換句話說,該執行過程不會回滾,只會終止。若是設置了 proceed
,任務在檢測到數據衝突的狀況下,不會終止,會跳過沖突數據繼續執行,直到全部數據執行完成,此時不會漏掉正常的數據,只會漏掉有衝突的數據。
POST _reindex { "source": { "index": "blog" }, "dest": { "index": "blog_lastest", "op_type": "create" }, "conflicts": "proceed" }
咱們能夠故意把 op_type
設置爲 create
,人爲製造數據衝突的場景,測試時更容易觀察到衝突現象。
若是把 conflicts
設置爲 proceed
,在返回體結果中不會再出現 failures
的信息,可是經過 version_conflicts
指標能夠看到具體的數量。
當你發現reindex
的速度有些慢的時候,能夠在 query
參數的同一層次【即 source
參數中】添加 size
參數,表示 scroll size
的大小【會影響批次的次數,進而影響總體的速度】,若是不顯式設置,默認是一批 1000 條數據,在一開始的簡單示例中也看到了。
以下,設置 scroll size
爲 5000:
POST /_reindex?wait_for_completion=false { "source": { "index": "blog", "size":5000 }, "dest": { "index": "blog_lastest", "op_type": "create" }, "conflicts": "proceed" }
測試後,速度達到了 30 分鐘 500 萬左右,明顯提高了不少。
通常來講,若是咱們的 source index
很大【好比幾百萬數據量】,則可能須要比較長的時間來完成 _reindex
的工做,可能須要幾十分鐘。而在此期間不可能一直等待結果返回,能夠去作其它事情,若是中途須要查看進度,能夠經過 _tasks
API 進行查看。
GET /_tasks/{taskId}
返回:
{ "completed" : false, "task" : { "node" : "dpBihNSMQfSlboMGlTgCBA", "id" : 4704218, "type" : "transport", "action" : "indices:data/write/reindex", …… }
當執行完畢時,completed
爲true
查看任務進度以及取消任務,除了根據taskId查看之外,咱們還能夠經過查看全部的任務中篩選本次reindex
的任務。
GET _tasks?detailed=true&actions=*reindex
返回結果:
{ "nodes" : { "dpBihNSMQfSlboMGlTgCBA" : { "name" : "node-16111-9210", "transport_address" : "192.168.XXX.XXX:9310", "host" : "192.168.XXX.XXX", "ip" : "192.168.16.111:9310", "roles" : [ "ingest", "master" ], "attributes" : { "xpack.installed" : "true", "transform.node" : "false" }, "tasks" : { "dpBihNSMQfSlboMGlTgCBA:6629305" : { "node" : "dpBihNSMQfSlboMGlTgCBA", "id" : 6629305, "type" : "transport", "action" : "indices:data/write/reindex", "status" : { "total" : 8361421, "updated" : 0, "created" : 254006, "deleted" : 0, "batches" : 743, "version_conflicts" : 3455994, "noops" : 0, "retries" : { "bulk" : 0, "search" : 0 }, "throttled_millis" : 0, "requests_per_second" : -1.0, "throttled_until_millis" : 0 }, "description" : "reindex from [blog] to [blog_lastest][_doc]", "start_time_in_millis" : 1609338953464, "running_time_in_nanos" : 1276738396689, "cancellable" : true, "headers" : { } } } } } }
注意觀察裏面的幾個重要指標,例如從 description
中能夠看到任務描述,從 tasks 中能夠找到任務的 id
【例如 dpBihNSMQfSlboMGlTgCBA:6629305
】,從 cancellable
能夠判斷任務是否支持取消操做。
這個 API 其實就是模糊匹配,同理也能夠查詢其它類型的任務信息,例如使用 GET _tasks?detailed=true&actions=*byquery
查看查詢請求的狀態。
當集羣的任務太多時咱們就能夠根據task_id
,也就是上面提到GET /_tasks/task_id
方式更加準確地查詢指定任務的狀態,避免集羣的任務過多,不方便查看。
若是遇到操做失誤的場景,想取消任務,有沒有辦法呢?
固然有啦,雖然覆水難收,經過調用_tasks API
:
POST _tasks/task_id/_cancel
這裏的 task_id
就是經過上面的查詢任務接口獲取的任務id(任務要支持取消操做,即【cancellable 爲 true】時方能收效)。
當咱們經過 API 查詢發現任務完成後,就能夠進行後續操做,我這裏是要刪除舊索引,而後再給新索引發別名,用於替換舊索引,這樣才能保證對外服務沒有任何感知。
DELETE /blog
POST /_aliases { "actions":[ { "add":{ "index":"blog_lastest", "alias":"blog" } } ] }
進行過以上操做後,咱們能夠使用一個簡單的搜索驗證服務。
POST /blog/_search { "query": { "match": { "author": "james" } } }
若是搜索結果達到咱們的預期目標,至此,數據索引重建遷移完成。
本文可轉載,但需聲明原文出處。 程序員小明,一個不多加班的程序員。歡迎關注微信公衆號「程序員小明」,獲取更多優質文章。