「Elasticsearch」ES重建索引怎麼才能作到數據無縫遷移呢?

背景

衆所周知,Elasticsearch是⼀個實時的分佈式搜索引擎,爲⽤戶提供搜索服務。當咱們決定存儲某種數據,在建立索引的時候就須要將數據結構,即Mapping肯定下來,於此同時索引的設定和不少固定配置將不能改變。
<!-- more -->
那若是後續業務發生變化,須要改變數據結構或者更換ES更換分詞器怎麼辦呢?爲此,Elastic團隊提供了不少經過輔助⼯具來幫助開發⼈員進⾏重建索引的方案。
若是對 reindex API 不熟悉,那麼在遇到重構的時候,必然事倍功半,效率低下。反之,就能夠方便地進行索引重構,省時省力。node

步驟

假設以前咱們已經存在一個blog索引,由於更換分詞器須要對該索引中的數據進行重建索引,以便支持業務使用新的分詞規則搜索數據,而且儘量使這個變化對外服務沒有感知,大概分爲如下幾個步驟:​程序員

  • 新增⼀個索引blog_lastest,Mapping數據結構與blog索引一致
  • blog數據同步至blog_lastest
  • 刪除blog索引
  • 數據同步後給blog_lastest添加別名blog

新建索引

在這裏推薦一個ES管理工具 Kibana,主要針對數據的探索、可視化和分析。

官網

put /blog_lastest/
{
    "mappings":{
        "properties":{
            "title":{
                "type":"text",
                "analyzer":"ik_max_word"
            },
            "author":{
                "type":"keyword",
                "fields":{
                    "seg":{
                        "type":"text",
                        "analyzer":"ik_max_word"
                    }
                }
            }
        }
    }
}

將舊索引數據copy到新索引

同步等待

接⼝將會在 reindex 結束後返回微信

POST /_reindex
{
    "source": {
        "index": "blog"
    },
    "dest": {
        "index": "blog_lastest"
    }
}

kibana 中的使用以下所示
-w706數據結構

固然高版本(7.1.1)中,ES都有提供對應的Java REST Client,好比app

ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices("blog").setSource.setDestIndex("blog_lastest");
TaskSubmissionResponse taskSubmissionResponse = client.submitReindexTask(reindexRequest, RequestOptions.DEFAULT);

爲了防止贅述,接下來舉例所有以kibana中請求介紹,若是有須要用Java REST Client,能夠自行去ES官網查看。異步

異步執⾏

若是 reindex 時間過⻓,建議加上 wait_for_completion=false 的參數條件,這樣 reindex 將直接返回 taskId分佈式

POST /_reindex?wait_for_completion=false
{
    "source": {
        "index": "blog"
    },
    "dest": {
        "index": "blog_lastest"
    }
}

返回:工具

{
  "task" : "dpBihNSMQfSlboMGlTgCBA:4728038"
}

op_type 參數

op_type 參數控制着寫入數據的衝突處理方式,若是把 op_type 設置爲 create【默認值】,在 _reindex API 中,表示寫入時只在 dest index中添加不存在的 doucment,若是相同的 document 已經存在,則會報 version confilct 的錯誤,那麼索引操做就會失敗。【這種方式與使用 _create API 時效果一致】oop

POST _reindex
{
  "source": {
    "index": "blog"
  },
  "dest": {
    "index": "blog_lastest",
    "op_type": "create"
  }
}

若是這樣設置了,也就不存在更新數據的場景了【衝突數據沒法寫入】,咱們也能夠把 op_type 設置爲 index,表示全部的數據所有從新索引建立。測試

conflicts 配置

默認狀況下,當發生 version conflict 的時候,_reindex 會被 abort,任務終止【此時數據尚未 reindex 完成】,在返回體中的 failures 指標中會包含衝突的數據【有時候數據會很是多】,除非把 conflicts 設置爲 proceed

關於 abort 的說明,若是產生了 abort,已經執行的數據【例如更新寫入的】仍然存在於目標索引,此時任務終止,還會有數據沒有被執行,也就是漏數了。換句話說,該執行過程不會回滾,只會終止。若是設置了 proceed,任務在檢測到數據衝突的狀況下,不會終止,會跳過沖突數據繼續執行,直到全部數據執行完成,此時不會漏掉正常的數據,只會漏掉有衝突的數據。

POST _reindex
{
  "source": {
    "index": "blog"
  },
  "dest": {
    "index": "blog_lastest",
    "op_type": "create"
  },
  "conflicts": "proceed"
}

咱們能夠故意把 op_type 設置爲 create,人爲製造數據衝突的場景,測試時更容易觀察到衝突現象。

若是把 conflicts 設置爲 proceed,在返回體結果中不會再出現 failures 的信息,可是經過 version_conflicts 指標能夠看到具體的數量。

批次大小配置

當你發現reindex的速度有些慢的時候,能夠在 query 參數的同一層次【即 source 參數中】添加 size 參數,表示 scroll size 的大小【會影響批次的次數,進而影響總體的速度】,若是不顯式設置,默認是一批 1000 條數據,在一開始的簡單示例中也看到了。
以下,設置 scroll size 爲 5000:

POST /_reindex?wait_for_completion=false
{
    "source": {
        "index": "blog",
        "size":5000
    },
    "dest": {
        "index": "blog_lastest",
        "op_type": "create"
    },
    "conflicts": "proceed"
}

測試後,速度達到了 30 分鐘 500 萬左右,明顯提高了不少。

根據taskId能夠實時查看任務的執行狀態

通常來講,若是咱們的 source index 很大【好比幾百萬數據量】,則可能須要比較長的時間來完成 _reindex 的工做,可能須要幾十分鐘。而在此期間不可能一直等待結果返回,能夠去作其它事情,若是中途須要查看進度,能夠經過 _tasks API 進行查看。

GET /_tasks/{taskId}

返回:

{
  "completed" : false,
  "task" : {
    "node" : "dpBihNSMQfSlboMGlTgCBA",
    "id" : 4704218,
    "type" : "transport",
    "action" : "indices:data/write/reindex",
    ……
}

當執行完畢時,completed爲true
查看任務進度以及取消任務,除了根據taskId查看之外,咱們還能夠經過查看全部的任務中篩選本次reindex的任務。

GET _tasks?detailed=true&actions=*reindex

返回結果:

{
  "nodes" : {
    "dpBihNSMQfSlboMGlTgCBA" : {
      "name" : "node-16111-9210",
      "transport_address" : "192.168.XXX.XXX:9310",
      "host" : "192.168.XXX.XXX",
      "ip" : "192.168.16.111:9310",
      "roles" : [
        "ingest",
        "master"
      ],
      "attributes" : {
        "xpack.installed" : "true",
        "transform.node" : "false"
      },
      "tasks" : {
        "dpBihNSMQfSlboMGlTgCBA:6629305" : {
          "node" : "dpBihNSMQfSlboMGlTgCBA",
          "id" : 6629305,
          "type" : "transport",
          "action" : "indices:data/write/reindex",
          "status" : {
            "total" : 8361421,
            "updated" : 0,
            "created" : 254006,
            "deleted" : 0,
            "batches" : 743,
            "version_conflicts" : 3455994,
            "noops" : 0,
            "retries" : {
              "bulk" : 0,
              "search" : 0
            },
            "throttled_millis" : 0,
            "requests_per_second" : -1.0,
            "throttled_until_millis" : 0
          },
          "description" : "reindex from [blog] to [blog_lastest][_doc]",
          "start_time_in_millis" : 1609338953464,
          "running_time_in_nanos" : 1276738396689,
          "cancellable" : true,
          "headers" : { }
        }
      }
    }
  }
}

注意觀察裏面的幾個重要指標,例如從 description 中能夠看到任務描述,從 tasks 中能夠找到任務的 id【例如 dpBihNSMQfSlboMGlTgCBA:6629305】,從 cancellable 能夠判斷任務是否支持取消操做。
這個 API 其實就是模糊匹配,同理也能夠查詢其它類型的任務信息,例如使用 GET _tasks?detailed=true&actions=*byquery 查看查詢請求的狀態。
當集羣的任務太多時咱們就能夠根據task_id,也就是上面提到GET /_tasks/task_id 方式更加準確地查詢指定任務的狀態,避免集羣的任務過多,不方便查看。
若是遇到操做失誤的場景,想取消任務,有沒有辦法呢?
固然有啦,雖然覆水難收,經過調用
_tasks API

POST _tasks/task_id/_cancel

這裏的 task_id 就是經過上面的查詢任務接口獲取的任務id(任務要支持取消操做,即【cancellable 爲 true】時方能收效)。

刪除舊索引

當咱們經過 API 查詢發現任務完成後,就能夠進行後續操做,我這裏是要刪除舊索引,而後再給新索引發別名,用於替換舊索引,這樣才能保證對外服務沒有任何感知。

DELETE /blog

使用別名

POST /_aliases
{
    "actions":[
        {
            "add":{
                "index":"blog_lastest",
                "alias":"blog"
            }
        }
    ]
}

經過別名訪問新索引

進行過以上操做後,咱們能夠使用一個簡單的搜索驗證服務。

POST /blog/_search
{
    "query": {
        "match": {
            "author": "james"
        }
    }
}

若是搜索結果達到咱們的預期目標,至此,數據索引重建遷移完成。

本文可轉載,但需聲明原文出處。 程序員小明,一個不多加班的程序員。歡迎關注微信公衆號「程序員小明」,獲取更多優質文章。
相關文章
相關標籤/搜索