Elasticsearch由淺入深(六)批量操做:mget批量查詢、bulk批量增刪改、路由原理、增刪改內部原理、document查詢內部原理、bulk api的奇特json格式

mget批量查詢

  1. 批量查詢的好處
    就是一條一條的查詢,好比說要查詢100條數據,那麼就要發送100次網絡請求,這個開銷仍是很大的
    若是進行批量查詢的話,查詢100條數據,就只要發送1次網絡請求,網絡請求的性能開銷縮減100倍
  2. mget批量查詢的語法
    GET _mget
    {
      "docs":[
        {
          "_index":"test_index",
          "_type":"test_type",
          "_id":1
        },
        {
          "_index":"test_index",
          "_type":"test_type",
          "_id":2
        }
        ]
    }
    {
      "took": 2,
      "timed_out": false,
      "_shards": {
        "total": 5,
        "successful": 5,
        "failed": 0
      },
      "hits": {
        "total": 9,
        "max_score": 1,
        "hits": [
          {
            "_index": "test_index",
            "_type": "test_type",
            "_id": "AWypxxLYFCl_S-ox4wvd",
            "_score": 1,
            "_source": {
              "test_content": "my test"
            }
          },
          {
            "_index": "test_index",
            "_type": "test_type",
            "_id": "8",
            "_score": 1,
            "_source": {
              "test_field": "test client 2"
            }
          },
          {
            "_index": "test_index",
            "_type": "test_type",
            "_id": "10",
            "_score": 1,
            "_source": {
              "test_field1": "test1",
              "test_field2": "updated test2"
            }
          },
          {
            "_index": "test_index",
            "_type": "test_type",
            "_id": "2",
            "_score": 1,
            "_source": {
              "test_content": "my test"
            }
          },
          {
            "_index": "test_index",
            "_type": "test_type",
            "_id": "4",
            "_score": 1,
            "_source": {
              "test_field": "test test"
            }
          },
          {
            "_index": "test_index",
            "_type": "test_type",
            "_id": "6",
            "_score": 1,
            "_source": {
              "test_field": "test test"
            }
          },
          {
            "_index": "test_index",
            "_type": "test_type",
            "_id": "1",
            "_score": 1,
            "_source": {
              "test_field1": "test field1",
              "test_field2": "test field2"
            }
          },
          {
            "_index": "test_index",
            "_type": "test_type",
            "_id": "7",
            "_score": 1,
            "_source": {
              "test_field": "test client 1"
            }
          },
          {
            "_index": "test_index",
            "_type": "test_type",
            "_id": "11",
            "_score": 1,
            "_source": {
              "num": 0,
              "tags": []
            }
          }
        ]
      }
    }
    View Code
  3. 若是查詢的document是一個index下的不一樣type的話java

    GET /test_index/_mget
    {
       "docs" : [
          {
             "_type" :  "test_type",
             "_id" :    1
          },
          {
             "_type" :  "test_type",
             "_id" :    2
          }
       ]
    }
  4. 若是查詢的數據都在同一個index下的同一個type下,最簡單了
    GET /test_index/test_type/_mget
    {
       "ids": [1, 2]
    }

    能夠說mget是很重要的,通常來講,在進行查詢的時候,若是一次性要查詢多條數據的話,那麼必定要用batch批量操做的api
    儘量減小網絡開銷次數,可能能夠將性能提高數倍,甚至數十倍,很是很是之重要node

bulk批量增刪改

bulk語法

POST /_bulk
{ "delete": { "_index": "test_index", "_type": "test_type", "_id": "3" }} 
{ "create": { "_index": "test_index", "_type": "test_type", "_id": "12" }}
{ "test_field":    "test12" }
{ "index":  { "_index": "test_index", "_type": "test_type", "_id": "2" }}
{ "test_field":    "replaced test2" }
{ "update": { "_index": "test_index", "_type": "test_type", "_id": "1", "_retry_on_conflict" : 3} }
{ "doc" : {"test_field2" : "bulk test1"} }

有哪些類型的操做能夠執行呢?算法

  1. delete:刪除一個文檔,只要1個json串就能夠了
  2. create:PUT /index/type/id/_create,強制建立
  3. index:普通的put操做,能夠是建立文檔,也能夠是全量替換文檔
  4. update:執行的partial update操做

bulk api對json的語法,有嚴格的要求,每一個json串不能換行,只能放一行,同時一個json串和一個json串之間,必須有一個換行json

示例:api

POST _bulk
{"delete":{"_index":"test_index","_type":"test_type","_id":10}}
{"create":{"_index":"test_index","_type":"test_type","_id":3}}
{"test_field":"test3"}
{"create":{"_index":"test_index","_type":"test_type","_id":2}}
{"test_field":"test2"}
{"index":{"_index":"test_index","_type":"test_type","_id":4}}
{"test_field":"test4"}
{"index":{"_index":"test_index","_type":"test_type","_id":2}}
{"test_field":"replaces test2"}
{"update":{"_index":"test_index","_type":"test_type","_id":1}}
{"doc":{"test_field2":"partial updated test1"}}
{
  "took": 17,
  "errors": true,
  "items": [
    {
      "delete": {
        "found": true,
        "_index": "test_index",
        "_type": "test_type",
        "_id": "10",
        "_version": 3,
        "result": "deleted",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "status": 200
      }
    },
    {
      "create": {
        "_index": "test_index",
        "_type": "test_type",
        "_id": "3",
        "_version": 1,
        "result": "created",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "created": true,
        "status": 201
      }
    },
    {
      "create": {
        "_index": "test_index",
        "_type": "test_type",
        "_id": "2",
        "status": 409,
        "error": {
          "type": "version_conflict_engine_exception",
          "reason": "[test_type][2]: version conflict, document already exists (current version [17])",
          "index_uuid": "d7GOSxVnTNKYuI8x7cZfkA",
          "shard": "2",
          "index": "test_index"
        }
      }
    },
    {
      "index": {
        "_index": "test_index",
        "_type": "test_type",
        "_id": "4",
        "_version": 2,
        "result": "updated",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "created": false,
        "status": 200
      }
    },
    {
      "index": {
        "_index": "test_index",
        "_type": "test_type",
        "_id": "2",
        "_version": 18,
        "result": "updated",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "created": false,
        "status": 200
      }
    },
    {
      "update": {
        "_index": "test_index",
        "_type": "test_type",
        "_id": "1",
        "_version": 3,
        "result": "updated",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "status": 200
      }
    }
  ]
}
View Code

bulk操做中,任意一個操做失敗,是不會影響其餘的操做的,可是在返回結果裏,會告訴你異常日誌數組

BulkIndex語法:(指定Index)性能優化

POST /test_index/_bulk
{ "delete": { "_type": "test_type", "_id": "3" }} 
{ "create": { "_type": "test_type", "_id": "12" }}
{ "test_field":    "test12" }
{ "index":  { "_type": "test_type" }}
{ "test_field":    "auto-generate id test" }
{ "index":  { "_type": "test_type", "_id": "2" }}
{ "test_field":    "replaced test2" }
{ "update": { "_type": "test_type", "_id": "1", "_retry_on_conflict" : 3} }
{ "doc" : {"test_field2" : "bulk test1"} }

BulkIndex語法:(指定Index、type)網絡

POST /test_index/test_type/_bulk
{ "delete": { "_id": "3" }} 
{ "create": { "_id": "12" }}
{ "test_field":    "test12" }
{ "index":  { }}
{ "test_field":    "auto-generate id test" }
{ "index":  { "_id": "2" }}
{ "test_field":    "replaced test2" }
{ "update": { "_id": "1", "_retry_on_conflict" : 3} }
{ "doc" : {"test_field2" : "bulk test1"} }

bulk size最佳大小:bulk request會加載到內存裏,若是太大的話,性能反而會降低,所以須要反覆嘗試一個最佳的bulk size。通常從1000~5000條數據開始,嘗試逐漸增長。另外,若是看大小的話,最好是在5~15MB之間。數據結構

Elasticsearch document數據路由原理

咱們知道,一個index的數據會被分紅多個分片shard,因此說一個document只能存在與一個shard中。負載均衡

當客戶端建立document的時候,elasticsearch此時就須要決定這個document是放在這個index的哪一個分片shard中,這個過程就稱之爲document routing,即數據路由。

 

document數據路由算法

算法;shard = hash(routing) % number_of_primary_shards
舉個例子,假設一個index有5個primary shard(p0,p1,p2,p3,p4)。每次對index的一個document進行增刪改查的時候,都會帶過來一個routing number,默認就是這個documentd的_id(多是手動指定,也能夠是自動生成),routing=_id。
假設_id=1,那麼就會將routing=1這個routing值傳入一個hash函數中,產生一個routing值的hash值,假設hash(routing)=21,而後將hash函數產生的值對這個index的primary shard的數量求餘數,21 % 5 = 1
也就決定了這個document就放在p1上。

注意:此這個計算過程就能夠看出,決定一個document在哪一個shard上,最重要的值就是routing值,默認是_id,也能夠手動指定,相同的routing值,每次過來,從hash函數中生成的hash值必定是相同的。不管最後計算的hash值是多少,對number_of_primary_shards求餘數,結果必定在0~number_of_primary_shards之間。

決定一個document在哪一個shard上,最重要的一個值就是routing值,默認是_id,也能夠手動指定,相同的routing值,每次過來,從hash函數中,產出的hash值必定是相同的

_id or custom routing value

默認的routing就是_id
也能夠在發送請求的時候,手動指定一個routing value,好比說put /index/type/id?routing=user_id

手動指定routing value是頗有用的,能夠保證說,某一類document必定被路由到一個shard上去,那麼在後續進行應用級別的負載均衡,以及提高批量讀取的性能的時候,是頗有幫助的

routing實戰

  • 測試一下默認的routing
    //先插入數據
    PUT /test_index/test_doc/10
    {
      "test_field": "test10 routing _id"
    }
    //獲取數據不帶routing參數
    GET /test_index/test_doc/10
    {
      "_index" : "test_index",
      "_type" : "_doc",
      "_id" : "10",
      "_version" : 1,
      "_seq_no" : 0,
      "_primary_term" : 1,
      "found" : true,
      "_source" : {
        "test_field" : "test10 routing _id"
      }
    }
    //獲取數據帶routing參數 參數值爲_id
    GET /test_index/test_doc/10?routing=10
    {
      "_index" : "test_index",
      "_type" : "_doc",
      "_id" : "10",
      "_version" : 1,
      "_seq_no" : 0,
      "_primary_term" : 1,
      "found" : true,
      "_source" : {
        "test_field" : "test10 routing _id"
      }
    }
  • 測試帶上routing值
    //先插入數據
    PUT /test_index/test_doc/11?routing=12
    {
      "test_field": "test routing not _id"
    }
    //獲取數據不帶routing參數
    GET /test_index/test_doc/11
    {
      "_index" : "test_index",
      "_type" : "_doc",
      "_id" : "11",
      "found" : false
    }
    //獲取數據帶routing參數 參數值爲自定義的值
    GET /test_index/test_doc/11?routing=12
    {
      "_index" : "test_index",
      "_type" : "_doc",
      "_id" : "11",
      "_version" : 1,
      "_seq_no" : 9,
      "_primary_term" : 1,
      "_routing" : "12",
      "found" : true,
      "_source" : {
        "test_field" : "test routing not _id"
      }
    }

主分片數量不可變

    經過上面的分析,特別是路由算法,咱們不難知道,在咱們最開始建立索引的時候,肯定了primary shard的數量,以後根據路由算法,每一個document就被路由到了指定的shard上面,以後的各類操做路由規則都是同樣的。試想一下,若是咱們改變了primary shard的數量,那麼路由算法取餘的時候值可能就跟以前的不同了,就會被路由到其它的shard上面去,就會致使數據混亂,本該已經存在的document,可能經過查詢根本查不到,某種程度上說這也會形成數據的丟失。

document增刪改內部原理,寫一致性機制

document增刪改內部原理

  1. 對於客戶端首先會選擇一個節點node發送請求過去,這個節點node就是協調節點coordinating node
  2. 協調節點coordinating node會對docuemnt數據進行路由,將請求轉發給對應的node(含有primary shard)
  3. 實際上node的primary shard會處理請求,而後將數據同步到對應的含有replica shard的node
  4. 協調節點coordinating node若是發現含有primary shard的node和全部的含有replica shard的node都搞定以後,就會返回響應結果給客戶端

下面手工畫圖展現一下上面的過程:
假設咱們有2個節點,5個primary shard replica=1

  1. 客戶端發送增刪改請求給協調節點node2
  2. 協調節點node2將請求路由到含有primary shard的node1
  3. node1處理請求,並同步數據到對應的含有replica shard的node2
  4. 協調節點node2若是發現含有primary shard的node1以及全部含有replica shard的node2都搞定了,就會返回響應結果給客戶端

寫一致性機制(已經被移除,替換爲wait_for_active_shards)

consistency
默認狀況下,主分片須要法定數量或大部分的分片副本(其中分片副本能夠是主分片或副本分片)在嘗試寫入操做以前可用。這是爲了防止將數據寫入網絡分區的 「wrong side」。法定人數定義以下:

int((primary + number_of_replicas)/ 2)+ 1
容許的值consistency是one(僅主要分片),all (主要和全部副本),或默認quorum或大多數分片副本。

請注意,它number_of_replicas是索引設置中指定的副本數,而不是當前活動的副本數。若是您已指定索引應具備三個副本,則仲裁將以下所示:

int( (primary + 3 replicas) / 2 ) + 1 = 4
可是,若是僅啓動兩個節點,則將沒有足夠的活動分片副原本知足仲裁,而且您將沒法索引或刪除任何文檔。

timeout
若是可用的分片副本不足,會發生什麼?Elasticsearch等待,但願會出現更多的分片。默認狀況下,它將等待最多1分鐘。若是須要,可使用timeout參數讓它更快地停止:100是100毫秒,30s是30秒。
注意
默認狀況下,新索引具備一個副本,這意味着應該須要兩個活動分片副本以知足須要的quorum。可是,這些默認設置會阻止咱們對單節點羣集執行任何有用的操做。爲避免此問題,僅當number_of_replicas大於1時才強制要求仲裁。

移除consistency這個參數以後,用wait_for_active_shards這個參數替代了。
緣由就是,consistency檢查是在Put以前作的。然而,雖然檢查的時候,shard知足quorum,可是真正從primary shard寫到replica以前,仍會出現shard掛掉,但Update Api會返回succeed。所以,這個檢查並不能保證replica成功寫入,甚至這個primary shard是否能成功寫入也未必能保證。

爲了提升對系統寫入的彈性,使用wait_for_active_shards,能夠將索引操做配置爲在繼續操做以前等待必定數量的活動分片副本。若是必需數量的活動分片副本不可用,則寫入操做必須等待並重試,直到必需的分片副本已啓動或發生超時。默認狀況下,寫入操做僅等待主分片在繼續(即wait_for_active_shards=1)以前處於活動狀態。
注意,此設置大大下降了寫入操做不寫入所需數量的分片副本的可能性,但它並未徹底消除這種可能性,由於此檢查在寫入操做開始以前發生。一旦寫入操做正在進行,複製在任何數量的分片副本上仍然可能失敗,但仍然能夠在主分片上成功。在_shards寫操做的響應部分揭示了其複製成功/失敗碎片的份數。

Elasticsearch document查詢內部原理

查詢請求打過來Elasticsearch內部作了什麼?

  1. 客戶端發送請求到任意一個node,這個node就成爲了協調節點coordinating node
  2. 協調節點coordinating node會對document進行路由,將請求轉發到包含該document的對應的node上面去,此時會使用round-robin隨機輪詢算法,在primary shard以及全部的replica shard中隨機選擇一個,讓打過來的讀請求實現負載均衡
  3. 接收請求的node會返回document給協調節點coordinating node
  4. 協調節點將document數據返回給客戶端

對於讀取請求,協調節點將在每一個請求上選擇不一樣的分片副本以平衡負載; 它循環遍歷全部碎片副本。

在索引文檔時,文檔可能已經存在於主分片上但還沒有複製到副本分片。在這種狀況下,副本可能會報告文檔不存在,而主副本可能會成功返回文檔。索引請求將成功返回給用戶後,該文檔將在主分片和全部副本分片上可用。

最後簡單描述一下隨機輪詢算法:
舉個例子,好比一個協調節點coordinating接受到一個document的4次請求,就會使用隨機輪詢算法,循環遍歷全部shard,將4次請求均勻的打在全部shard上面,好比有4個shard,就會每一個shard各一個請求。

bulk api的奇特json格式與底層性能優化關係

爲何bulk要採用這種奇特的json格式?

因爲bulk中的每一個操做均可能要轉發到不一樣的node的shard去執行,假設咱們不用這種奇特的json格式,採用比較良好的json數組格式,容許任意的換行,整個可讀性很是棒,讀起來很爽。可是ES拿到這種標準格式的json串以後,要按照下述流程去進行執行處理。
格式以下:

[{
  "action": {
  },
  "data": {
  }
}]
  1. 將json數組解析爲JSONArray對象,這個時候,整個數據,就會在內存中出現一份一摸同樣的拷貝,一份數據是json文本,一份數據是JSONArray對象
  2. 解析json數組裏面的每一個json,對每一個請求中的document進行路由
  3. 爲路由到同一個shard上的多個請求,建立一個請求數組
  4. 將這個請求數組序列化
  5. 將序列化後的請求數組發送到對應的節點上去

不難看出這樣就會耗費更多的內存,更多的jvm gc開銷。

 

假設一個場景,對於bulk size的大小通常建議在幾千條,大小在10MB左右,因此說,可怕的事情來了。假設說如今100個bulk請求發送到了一個節點上去,而後每一個請求是10MB,100個請求就是1000MB=1G,而後每一個請求的json都copy一份JSONArray對象,此時內存中的佔用就會翻倍,就會佔用2GB的內存,甚至還不止,由於弄成JSONArray對象以後,還可能會多弄一些其它的數據結構,2GB+的內存佔用。
佔用更多的內存可能就會積壓其它請求的內存使用量,好比說最重要的搜索請求,分析請求等等。此時就可能會致使其它請求的性能急速降低,另外的話,佔用內存更多,就會致使java虛擬機的垃圾回收次數更多,更加頻繁,每次要回收的垃圾對象更多,耗費的時間更多,致使ES的java虛擬機中止工做線程的時間更多。

而使用這個奇特格式的json

{"action": {"meta"}}\n
{"data"}\n
{"action": {"meta"}}\n
{"data"}\n
  1. 不用將其轉換爲json對象,不會出現內存中的相同數據的拷貝,直接按照換行符切割json
  2. 對每兩個一組的json,讀取meta,進行document路由
  3. 直接將對應的json發送到node上去

和標準格式的json相比,最大的優點在於不須要將json數組解析爲一個JSONArray對象,造成一份大數據的拷貝,浪費內存空間,儘量的保證性能。

示例:

PUT _bulk
{"index": {"_index": "test","_type":"test_doc", "_id": "1"}}
{"field1": "value1", "field2": "value2"}
{"index": {"_index": "test","_type":"test_doc", "_id": "2"}}
{"field1": "value1 id2", "field2": "value2 id2"}
{"delete": {"_index": "test","_type":"test_doc", "_id": "2"}}
{"create": {"_index": "test","_type":"test_doc", "_id": "3"}}
{"field1": "value3"}
{"update": {"_index": "test", "_type":"test_doc","_id": "1"}}
{"doc": {"field2": "value2"}}
{
  "took": 136,
  "errors": false,
  "items": [
    {
      "index": {
        "_index": "test",
        "_type": "test_doc",
        "_id": "1",
        "_version": 1,
        "result": "created",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "created": true,
        "status": 201
      }
    },
    {
      "index": {
        "_index": "test",
        "_type": "test_doc",
        "_id": "2",
        "_version": 1,
        "result": "created",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "created": true,
        "status": 201
      }
    },
    {
      "delete": {
        "found": true,
        "_index": "test",
        "_type": "test_doc",
        "_id": "2",
        "_version": 2,
        "result": "deleted",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "status": 200
      }
    },
    {
      "create": {
        "_index": "test",
        "_type": "test_doc",
        "_id": "3",
        "_version": 1,
        "result": "created",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "created": true,
        "status": 201
      }
    },
    {
      "update": {
        "_index": "test",
        "_type": "test_doc",
        "_id": "1",
        "_version": 1,
        "result": "noop",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "status": 200
      }
    }
  ]
}
View Code
相關文章
相關標籤/搜索