Elasticsearch Document Index API詳解、原理與示例

本節將開始介紹Document API,本節將重點介紹ElasticSearch Doucment Index API(新增索引)。
從上節可知,ElasticSearch Index Rest Hign level Index Api聲明以下:java

  • public final IndexResponse index(IndexRequest indexRequest, RequestOptions options) throws IOException
  • public final void indexAsync(IndexRequest indexRequest, RequestOptions options,ActionListener<IndexResponse> listener)
    上述兩個API,一個同步調用,一個異步調用,同步調用方法直接組裝IndexResponse並返回,而異步方法經過回調ActionListener,並將執行結果(IndexResponse)傳入回調方法。從中能夠看出,Index API的核心是IndexRequest與RequestOptions。接下來咱們重點關注這兩個對象。
    一、IndexRequest 詳解
    IndexRequest的完整類繼承層次以下:
    Elasticsearch Document Index API詳解、原理與示例
    咱們目前重點關注與索引相關的IndexRequest、ReplicatedWriteRequest、ReplicationRequest便可。
    Elasticsearch Document Index API詳解、原理與示例
    接下來對各個屬性作個簡單介紹(相關的內部實現機制將會在源碼分析篇重點深刻分析,目前點到爲止)。
  • protected ShardId shardId:目標執行的分片信息。(指定該名字哪些分片上執行)
  • protected ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT,設置處理索引操做以前必須處於激活狀態到的副本數量。
  • private long routedBasedOnClusterVersion = 0 請求基於的集羣路由版本號,若是該版本後大於當前 集羣版本,則拒絕該請求。
  • private RefreshPolicy refreshPolicy = RefreshPolicy.NONE 索引刷新機制,該部分將在下文詳細分析。
  • protected String index:索引庫,相似關係型數據庫的database。
  • private String type:類型名,相似於關係數據庫的table(表)。
  • private String id :文檔ID,所謂的文檔,相似於關係數據庫的行,id,相似於關係數據庫的主鍵ID。
  • private String routing:分片值,默認爲id的值,elasticsearch的分片路由算法爲( hashcode(routing) % primary_sharding_count(主分片個數) )。
  • private String parent:暫未知(與父子任務相關)
  • private BytesReference source:source,document的原始數據(被索引的原始數據,有效載荷)。
  • private OpType opType = OpType.INDEX:操做類型,例如INDEX、CREATE、UPDATE、DELETE。
    private long version = Versions.MATCH_ANY:數據版本。
    private VersionType versionType = VersionType.INTERNAL:版本類型,分爲內部版本、外部版本,默認爲內部版本。
  • private XContentType contentType:source的數據contentType,主要包含(XContentType.JSON、XContentType.SMILE、XContentType.YAML、XContentType.CBOR),默認爲XContentType.JSON。
  • private String pipeline:管道,暫未知。
  • private long autoGeneratedTimestamp = UNSET_AUTO_GENERATED_TIMESTAMP 利用基於事件戳建立自增ID。
  • private boolean isRetry = false 是否重試,默認爲false。
    二、RequestOption詳解
    Elasticsearch Document Index API詳解、原理與示例
    RequestOptions,其實就是與Http相關的請求參數,http request header,由於Rest Hign Level Client其本質是Http請求。

三、Index BytesReference source構造詳解
下面是4中構建JSON document的4種形式:算法

  • java的json字符串的byte[]或json字符串
  • java.util.Map
  • 使用第三方JSON類庫構建json字符串或其byte[]。
  • 使用Elasticsearch自身提供的XContentFactory.jsonBuilder()類庫。
    3.1 json字符串
String json = "{" +
                    "\"user\":\"kimchy\"," +
                    "\"postDate\":\"2013-01-30\"," +
                    "\"message\":\"trying out Elasticsearch\"" +
            "}";
IndexResponse response = client.prepareIndex("twitter", "tweet")
    .setSource(json, XContentType.JSON)
    .get();

3.2 Map數據庫

Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate",new Date());
json.put("message","trying out Elasticsearch");
IndexResponse response = client.prepareIndex("twitter", "tweet")
        .setSource(json)
        .get();

3.3 使用第三方類庫json

import com.fasterxml.jackson.databind.*;
// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse
// generate json
byte[] json = mapper.writeValueAsBytes(yourbeaninstance);
IndexResponse response = client.prepareIndex("twitter", "tweet")
        .setSource(json, XContentType.JSON)
        .get();

3.四、使用ElasticSearch自帶類庫服務器

import static org.elasticsearch.common.xcontent.XContentFactory.*;
IndexResponse response = client.prepareIndex("twitter", "tweet", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        .get();

四、Index API使用Demorestful

Demo在單機版本的ElasticSearch服務器中運行。併發

package persistent.prestige.elasticsearchdemo;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
public class IndexApiDemo {
    public static void main(String[] args) {
        RestHighLevelClient client = EsClient.getClient();
        try {
            IndexRequest request = new IndexRequest();
            request.index("twitter");
            request.type("_doc");
            request.id("1");
            Map<String, String> source = new HashMap<>();
            source.put("user", "dingw");
            source.put("post_date", "2009-11-16T14:12:12");
            source.put("message", "trying out Elasticsearch");
            request.source(source);
            try {
                IndexResponse result = client.index(request, RequestOptions.DEFAULT);
                System.out.println(result);
            } catch (IOException e) {
                e.printStackTrace();
            }
        } finally {
            EsClient.close(client);
        }
    }
}

運行結果以下:app

{
    "_shards" : {
        "total" : 2,
        "failed" : 0,
        "successful" : 1
    },
    "_index" : "twitter",
    "_type" : "_doc",
    "_id" : "1",
    "_version" : 1,
    "_seq_no" : 0,
    "_primary_term" : 1,
    "result" : "created"
}

接下來的內容將詳細介紹Index API返回結果相關的擴展知識,讓你們更加全面的瞭解Index API內部運行的機制。異步

五、Index API 內部實現機制
5.1 _shards 返回字段概述
_shards 結構體將反饋索引在副本級的複製信息。elasticsearch

  • total:表示本次請求應該在多少個分片上執行(包含主分片 + 副本)。
  • successful:表示本次請求成功執行的分片個數。
  • failed:表示本次請求爲成功執行請求的分片個數。

注:索引操做成功的標誌是successful大於0。當索引操做成功返回時,複製分片(副本)可能不會所有啓動(默認狀況下,只有主服務器是必需的,可是這種行爲能夠被更改)。
當前單機環境,total爲2表示,一個分片存在1主一從,但同一個複製組內的分片不會分佈在同一個機器上,故只啓動了主分片,複製分片未啓動;successful爲1表示在主分片上已成功執行,failed爲0表示沒有執行失敗的分片。

5.2 自動建立索引
使用Index API,若是索引不存在,則會自動建立對應的索引(類型映射類型爲動態映射機制,具體關於字段映射,將會在Mapping章節中詳細介紹)。Elasticsearch數據的組織形式爲(index/type/document)。索引的管理(增刪改查等API在後續文中會描述)。
自動索引建立能夠經過配置來禁用。經過在全部節點的配置文件中添加action.auto_create_index=false來禁用。經過配置index.mapper.dynamic=false能夠禁用索引的映射自動建立。配置是否禁用自動建立索引可基於模式的白名單/黑名單列表模式,例如action.auto_create_index=aaa,-bbb,+ccc,- 分別表明 aaa開頭的索引自動建立,bbb開頭的索引禁止自動建立,禁用索引自動建立。

5.3 版本工做機制
每一個索引文檔都有一個版本號。關聯的版本號做爲對索引API請求的響應的一部分返回。索引請求若是指定了版本號這個參數(IndexRequest#version)時,索引API可選擇性地容許樂觀併發控制機制,所謂樂觀併發控制就是若是待操做的索引文檔的版本號若是與IndexRequest#version版本不相同,則本次操做失敗。版本控制徹底是實時的,若是未提供版本,則無需驗證版本信息而當即執行。
默認狀況下使用內部版本控制,從1開始,每次更新自增1,(包含刪除)。可選地,版本號能夠用外部值來補充(例如,若是在數據庫中維護)。爲了啓用這個功能,IndexRequest#versionType應該被設置爲外部(VersionType.EXTERNAL或VersionType.EXTERNAL_GTE)。外部版本號的取值範圍爲[0,9.2 e+18)。若是使用外部版本號,系統會檢查傳遞給索引請求的版本號是否大於當前存儲文檔的版本號,而不是檢查匹配的版本號。若是所提供的值小於或等於存儲文檔的版本號,則會出現版本衝突,索引操做將失敗。
警告:外部版本控制支持0做爲有效版本號。這容許版本與外部版本控制系統同步,其中版本號從0開始,而不是1。它有一個反作用,即版本號爲零的文檔不能使用更新的查詢API進行更新,也不能使用查詢API的Delete來刪除,只要它們的版本號等於零。
外部版本號一個最佳實踐,使用源數據庫中數據的版本號,就不須要維護對源數據庫的更改所執行的異步索引操做的嚴格排序。即便使用來自數據庫的數據來更新Elasticsearch索引的簡單狀況,若是使用外部版本控制,也會簡化,由於若是索引操做出於某種緣由而不正常,則只使用最新的版本便可。

5.4 版本類型
ElasticSearch支持以下版本類型:

  • internal
    內部版本號,只有當請求版本號與數據版本號想等時才能夠執行對應的動做。
  • external or external_gt
    默認外部版本號,當請求版本號大於數據存儲版本號時才能夠執行對應動做,若是數據不存在,則使用指定版本號。
  • external_gte
    外部版本號,當請求版本號大於等於數據存儲版本號時能夠執行對應動做,若是數據不存在,則使用指定版本號。
    注意,外部版本號一般基於數據庫,其思想更是基於樂觀鎖,對於版本號相等的更新動做須要特別謹慎,故外部版本號一般建議external( external_gt)。外部版本號的取值範圍[0,9.2 e+18)。
    5.5 操做類型(IndexRequest#opType)
    索引API也接受一個opType,它能夠用來強制建立動做。當使用create時,若是該id中的文檔已經存在於索引中,索引操做將會敗。
    OpType以下可選值:
  • OpType.INDEX
    索引,若是文檔已存在,覆蓋,內部版本號+1。
  • OpType.CREATE
    建立,若是文檔已存在,返回錯誤。
  • OpType.UPDATE
    更新操做。
  • OpType.DELETE
    刪除操做。

5.6 自動ID生成
索引動做能夠不指定文檔ID,ElasticSearch會自動建立ID,此時的opType屬性會自動設置爲OpType.CREATE。其Restfull請求又原先的PUT變動爲POST,固然咱們在使用Rest Hign Level API時無需關注restfull請求類型,都是經過index方法發生調用,內部會自動封裝相應的http請求。

5.7 路由
默認狀況下,路由字段是經過使用文檔的id值的散列來控制的,其路由算法(hash(路由字段) % (primary count))來定位所在的主分片(複製組)。ElasticSearch提供了顯示指定路由字段的方法,經過routing來指定路由值,索引API經過IndexRequest#routing()方法來指定路由值。
當設置顯式映射(Mapping)時,能夠選擇使用路由字段來指導索引操做從文檔自己提取路由值。若是路由映射被定義並設置爲required,那麼若是沒有提供或提取路由值,則索引操做將失敗。

5.8 分佈式
索引操做首先根據路由規則將請求轉發到主分片,並在包含此分片的的實際節點上執行。在主分片完成操做以後,若是須要,更新將被分發到對應複製組中的副本所在的節點上執行。其執行邏輯已在上篇《Elasticsearch Document API之文檔讀寫概要設計》中寫模型一節中詳細介紹,在此不重複介紹。

5.9 等待活動的分片數(Wait For Active Shards )
爲了提升對系統寫操做的彈性,引入了(wait for active shards)機制,就是在進行索引操做以前,先校驗當前活躍的分片(副本數量),若是當前的激活分片數量不足,則先等待更多的分片啓動直到有新的分片加入或等待超時。默認狀況下,寫操做只需等待主分片處於激活狀態便可(即wait_for_active_shards=一、請求參數waitForActiveShards=1)。這個默認值能夠經過設置索引的配置(Setting)index.write.wait_for_active_shards來動態改變。一樣也能夠經過請求參數IndexRequest#waitForActiveShards(waitForActiveShards)來動態改變。
wait_for_active_shards的數據類型爲正整數,取值範圍爲[1,number_of_replicas+1]。
例如,假設咱們有一個由三個節點組成的集羣,A、B和C,咱們建立一個索引,其中的副本數量(number_of_replicas)設置爲3(3個副本+1個主分片,比節點數量多一個)。若是咱們嘗試索引操做,默認狀況下,只要主節點處於激活,則索引操做會在主節點上執行,而後轉發到其餘複製組。這意味着,即便B和C宕機(主分片在A節點上),索引操做仍然會在主分片上執行。若是wait_for_active_shards設置爲3(而且全部3個節點都正常),索引操做能繼續執行而無需等待,由於集羣中有3個活動節點,每一個節點都持有分片的副本。可是,若是咱們將wait_for_active_shards設置爲all或4,索引操做將不會繼續,陷入等待,由於咱們目前集羣中沒有4個副本。除非集羣中出現一個新的節點來承載第4個副本,不然該操做將超時。
須要注意的是,這種設置大大減小了沒必要要的寫操做(能避免無謂的寫處理,若是分片數量不足,則不執行索引動做),可是它並無徹底消除這種可能性,由於這種檢查發生在寫操做開始以前。一旦寫操做開始,複製在任意數量的碎片副本上仍然可能失敗,可是在主服務器上仍然成功。寫操做響應的分片部分(5.1節所示)揭示了複製成功/失敗的分片副本的數量,數據在主分片、副本之間數據的最終一致性處理在《Elasticsearch Document API之文檔讀寫概要設計》寫模型異常處理部分有相應的處理機制。
ActiveShardCount取值常量說明:

  • public static final ActiveShardCount DEFAULT = new ActiveShardCount(-2): 默認,只需主分片激活便可。
  • public static final ActiveShardCount ALL = new ActiveShardCount(-1):所有,number_of_replicas+1 個副本都在線。
  • public static final ActiveShardCount NONE = new ActiveShardCount(0) :不校驗。
  • public static final ActiveShardCount ONE = new ActiveShardCount(1):一個。
  • int 正整數,取值範圍爲[1,number_of_replicas+1]。

5.10 刷新機制
Index API 、Update API、Delete API、Bulk API都支持RefreshPolicy設置刷新策略(private RefreshPolicy refreshPolicy),以便控制上述API所產生的變化對查詢API的可見性策略。其可選值以下:

  • 空字符串或true(RefreshPolicy.IMMEDIATE)
    在操做(index,update,delete)發生以後,當即刷新相關的主分片與複製分片(不是刷新整個索引,只是刷新發生變化的文檔)。目前從索引與查詢的角度來看,他不會致使性能低下。

  • false(RefreshPolicy.NONE)
    在操做(index,update,delete)執行完畢後,直接返回,而不執行刷新,而是依靠Elasticsearch的刷新機制。

  • wait_for(RefreshPolicy.WAIT_UNTIL)
    操做發生後,並不當即強制刷新,而是等待刷新的發生,此時會阻塞等待直到超時或刷新事件到達。Elasticsearch會以一個固定的頻率刷新那些發生了變化的索引分片,刷新週期默認爲1s,經過參數index.refresh_interval配置。
    按照官方的建議,該參數不建議對其進行修改,保持默認值便可。一般不建議使用wait_for,目前沒有明顯的證據顯示RefreshPolicy.IMMEDIATE會帶來明顯的性能損耗。

5.11 超時
當執行索引操做時,主分片所在的節點可能不可用。形成這種狀況的一些緣由多是,主分片目前正在從網關中恢復或正在進行從新安置。默認狀況下,索引操做將在主上等待最多1分鐘,而後失敗並以錯誤響應。超時參數能夠用來顯式地指定它等待多長時間,可經過IndexRequest#timeout(timeout)方法設置,或經過?timeout=5m設置。

總結:本文首先羅列了Elasticsearch Index API, 而後詳細介紹了其API兩個核心的對象(IndexRequest與RequestOptions),接着經過示例演示了RestHighLevelClient index API的使用,最後深刻分析了Index API的一些內在處理機制。後續會更深一步從源碼角度深度剖析其實現細節。


關注《中間件興趣圈》公衆號,查看更多關於中間件相關的文章:
Elasticsearch Document Index API詳解、原理與示例

相關文章
相關標籤/搜索