Elasticsearch建立索引流程

前言

說明:本文章使用的ES版本是:6.7.0java

在上一篇文章搜索引擎ElasticSearch的啓動過程中,介紹了ES的啓動過程。node

由此可知,在ES啓動過程當中,建立Node對象(new Node(environment))時,初始化了RestHandler,由其名字能夠知道這是用來處理Rest請求的。git

在ES源碼中,RestHandlerAction以下圖:github

clipboard.png

其中:算法

  • adminsql

    • cluster:處理集羣相關請求
    • indices:處理索引相關請求
  • cat:平常查詢
  • document:文檔處理
  • ingest:pipeline處理。pipeline?幹嗎的
  • search:搜索

接下來咱們具體的看一下ES是如何建立索引的:org.elasticsearch.rest.action.document.RestIndexAction數據庫

數據概念和結構

一個完整的ES集羣由如下幾個基本元素組成json

名稱 概念 對應關係型數據庫概念 說明
Cluster 集羣 一個或多個節點的集合,經過啓動時指定名字做爲惟一標識,默認cluster-state
node 節點 啓動的ES的單個實例,保存數據並具備索引和搜索的能力,經過名字惟一標識,默認node-n
index 索引 Database 具備類似特色的文檔的集合,能夠對應爲關係型數據庫中的數據庫,經過名字在集羣內惟一標識
type 文檔類別 Table 索引內部的邏輯分類,能夠對應爲Mysql中的表,ES 6.x 版本中,一個索引只容許一個type,再也不支持多個type。7.x版本中,type將廢棄。
document 文檔 Row 構成索引的最小單元,屬於一個索引的某個類別,從屬關係爲: Index -> Type -> Document,經過id 在Type 內惟一標識
field 字段 Column 構成文檔的單元
mapping 索引映射(約束) Schema 用來約束文檔字段的類型,能夠理解爲索引內部結構
shard 分片 將索引分爲多個塊,每塊叫作一個分片。索引定義時須要指定分片數且不能更改,默認一個索引有5個分片,每一個分片都是一個功能完整的Index,分片帶來規模上(數據水平切分)和性能上(並行執行)的提高,是ES數據存儲的最小單位
replicas 分片的備份 每一個分片默認一個備份分片,它能夠提高節點的可用性,同時可以提高搜索時的併發性能(搜索能夠在所有分片上並行執行)

一個ES集羣的結構以下:安全

clipboard.png

每一個節點默認有5個分片,每一個分片有一個備分片。併發

6.x版本以前的索引的內部結構:

clipboard.png

說明:ES 6.x 版本中,相同索引只容許一個type,再也不支持多個type。7.x版本中,type將廢棄。

因此,6.x版本的索引結構以下:

clipboard.png

7.x版本的索引結構以下:

clipboard.png

索引一個文檔

啓動ES實例後,發送以下請求:

curl -X PUT 'localhost:9200/index_name/type_name/1' -H 'Content-Type: application/json' -d '
{
  "title": "我是文件標題,可被搜索到",
  "text":  "文本內容,ES時如何索引一個文檔的",
  "date":  "2019/01/01"
}'

其中:

  • index_name:表示索引名稱
  • type_name:類別名稱
  • 1:文檔ID

ES執行流程:

客戶端:

  1. BaseRestHandler#handleRequest:處理請求
  2. RestIndexAction#prepareRequest:封裝request,識別行爲,容許的行爲以下,默認INDEX

    enum OpType {
        // Index the source. If there an existing document with the id, it will be replaced.
        INDEX(0),
        // Creates the resource. Simply adds it to the index, if there is an existing document with the id, then it won't be removed.
        CREATE(1),
        /** Updates a document */
        UPDATE(2),
        /** Deletes a document */
        DELETE(3);
        
        ...
    }
  3. 參數檢查,查看是否有關鍵字,並獲取相關關鍵字的值

    0 = "parent"
    1 = "pretty"
    2 = "version_type"
    3 = "format"
    4 = "index"
    5 = "refresh"
    6 = "error_trace"
    7 = "type"
    8 = "timeout"
    9 = "pipeline"
    10 = "routing"
    11 = "if_seq_no"
    12 = "if_primary_term"
    13 = "wait_for_active_shards"
    14 = "id"
    15 = "op_type"
    16 = "human"
    17 = "filter_path"
  4. NodeClient#doExecute:指定執行該請求的actionName:indices:data/write/index
  5. TransportAction#execute():將請求封裝成CreateIndexRequest併發送到服務端,處理髮送前置任務

    1. IndexRequest#validate:校驗參數內容,type、source、contentType
    2. 這裏若是是更新或者刪除操做,檢查是否傳入ID字段,沒傳如則報錯

      if (opType() != OpType.INDEX && id == null) {
          addValidationError("an id is required for a " + opType() + " operation", validationException);
      }
    3. 判斷ID長度,最長不能超過512個字符

Transport層

Transport將request封裝成Task,將請求發送給服務端

服務端

  1. 服務端根據actionName獲取具體響應請求的action,此處爲執行:TransportBulkAction#doExecute()
  2. 讀取AutoCreateIndex#AUTO_CREATE_INDEX_SETTING,該值由配置文件elasticsearch.yml中的auto_create_index控制,true表示當插入的索引不存在時,自動建立該索引
    clipboard.png

    1. 若是"auto_create_index"爲true:

      1. 分析bulkRequest中的全部請求中的全部index,生成Set<String> indices,
      2. 而後遍歷indices,判斷索引名稱是否存在

        1. 索引不存在:將請求轉發給TransportCreateIndexAction#masterOperation,建立索引,且索引建立完成後,執行第2步
        2. 索引存在:啓動異步進程BulkOperation,該進程將負責建立索引
    2. 若是"auto_create_index"爲false,則索引不存在的寫入文檔的請求
TransportCreateIndexAction 建立索引過程
  1. 該類繼承TransportMasterNodeAction,它會啓動一個異步線程來執行任務,若是當前節點是master節點,則執行masterOperation,不然轉發給master節點(每一個節點在啓動時會加入集羣,同時保存完整的集羣信息,該信息又Discovery模塊維護)
  2. TransportCreateIndexAction將CreateIndexRequest轉換爲CreateIndexClusterStateUpdateRequest,將請求做爲參數,調用MetaDataCreateIndexService#createIndex
  3. 調用MetaDataCreateIndexService#onlyCreateIndex,該方法負責在clusterstate中建立新的index,而且等待指定數目(默認爲1)狀態爲active的分片副本建立完成(activeShardsObserver.waitForActiveShards方法實現),最終返回給listener。
  4. onlyCreateIndex方法,其內部執行clusterService.submitStateUpdateTask,提交集羣狀態修改任務,提交任務的執行邏輯是AckedClusterStateUpdateTask類內部的execute方法。其內部邏輯爲:

    1. 校驗index的名字和settings是否合法(好比index名不能有大寫,若是有別名,判斷是否有重名)
    2. 根據index name 查找合適的模板信息,即mapping
    3. 構建indexSettingsBuilder,能夠認爲是該索引的默認環境變量
    4. 準備工做完成,開始寫入索引IndicesService#createIndex,寫入索引的動做由IndexModule#newIndexService完成
    5. 爲indicesService服務增長index服務,mapperService服務,同時合併新老mappings
    6. 構建IndexMetaData,並生成新的ClusterState
    7. 若是index狀態open,執行allocationService.reroute將分片分配到其餘節點
    8. 最後刪除索引服務(indicesService.removeIndex)
  5. 上一步修改完成clusterstate後

    1. 若是是master節點同步集羣狀態(若是是master)
    2. 通知集羣狀態監聽器listener,其餘節點接收到集羣狀態變化,啓動indicesService服務
BulkOperation 寫入文檔過程
  1. 獲取最新的集羣狀態clusterstate
  2. 遍歷request中的文檔

    1. 獲取文檔操做類型OpType,寫入文檔
    2. 對文檔作一些加工,主要包括:解析routing(若是mapping裏有的話)、指定的timestamp(若是沒有帶timestamp會使用當前時間),若是文檔沒有指定id字段,會自動生成一個base64UUID做爲id字段
      clipboard.png
  3. 再次遍歷全部的request,獲取獲取每一個request應該發送到的shardId,獲取的過程是這樣的:若是上一步獲取到了routing則取routing,不然取文檔ID,取其hash值(哈希算法 Murmur3Hash)而後對當前索引的分片數量取模,獲得分片ID:shardId
  4. 將相同分片的請求分組,將請求封裝成BulkShardRequest,經過TransportBulkAction將請求發送到分片所在節點
  5. 請求轉發到Node節點更新主分片,TransportReplicationAction.execute(),建立一個ReroutePhase異步線程,並執行,此處文檔會寫入主分片buffer中(InternalEngine#indexIntoLucene),最後並啓動異步進程ReplicationPhase,更新副分片
  6. 至此,文檔寫入完成,但只是將數據寫入內存buffer和transLog中,以後還有異步進程將數據refresh到索引中使其可搜索,將數據flush到磁盤

文檔寫入總結

  1. 經過副本分片和Translog日誌保障數據安全和一致性
  2. 在可用性和一致性二者的取捨中,ES更看重可用性。主分片寫入後,便可搜索。所以若是請求落到副分片可能出現不一致的狀況,可是在搜索業務中,這種短期的不一致大可能是能夠接受的

系列文章

  1. 搜索引擎ElasticSearch源碼編譯和Debug環境搭建
  2. 搜索引擎ElasticSearch的啓動過程
  3. Elasticsearch建立索引流程
  4. Elasticsearch搜索過程詳解
  5. Elasticsearch搜索相關性排序算法詳解
  6. Elasticsearch中的倒排索引
相關文章
相關標籤/搜索