前面研究過ES的get api
的總體思路,做爲編寫ES插件時的借鑑。當時的重點在與理解總體流程,主要是shardOperation()
的方法內部的調用邏輯,就弱化了shards()
方法。實際上shards()
方法在理解ES的結構層面,做用更大一些。咱們仍是從get api
入手來理解shards()
。node
先回顧一下get api
的使用流程:api
添加文檔到ES: curl -XPUT 'http://localhost:9200/test1/type1/1' -d '{"name":"hello"}' 根據文檔ID讀取數據: curl -XGET 'http://localhost:9200/test1/type1/1'
使用很簡單。可是若是考慮到分佈式,背後的邏輯就不簡單了。 假如ES集羣有3個節點,數據所在的索引也有3個分片,每一個分片一個副本。即index的設置以下:數據結構
{ "test1" : { "settings" : { "index" : { "number_of_replicas" : "1", "number_of_shards" : "3" } } } }
那麼id爲1的doc該分發到那個分片呢? 這個問題須要一篇詳細的博文解答,這裏咱們先簡單給一個結論:app
默認狀況下,ES會按照文檔id計算一個hash值, 採用的是Murmur3HashFunction,而後根據這個id跟分片數取模。實現代碼是MathUtils.mod(hash, indexMetaData.getNumberOfShards()); 最後的結果做爲文檔所在的分片id,因此ES的分片標號是從0開始的。
不知存,焉知取。 負載均衡
再整理一下取數據的核心流程:dom
s1: 根據文檔id定位到數據所在分片。因爲能夠設爲多個副本,因此一個分片會映射到多個節點。 s2: 根據分片節點的映射信息,選擇一個節點,去獲取數據。 這裏重點關注的是節點的選擇方式,簡而言之,咱們須要負載均衡,否則設置副本就沒有意義了。
上面兩步都關聯着一個核心的數據結構ClusterState
, 咱們可使用_cluster/state?pretty
來查看這個數據結構:curl
# http://localhost:9200/_cluster/state?pretty { "cluster_name" : "elasticsearch", "version" : 4, "state_uuid" : "b6B739p5SbanNLyKxTMHfQ", "master_node" : "KnEE25tzRjaXblFJq5jqRA", "blocks" : { }, "nodes" : { "KnEE25tzRjaXblFJq5jqRA" : { "name" : "Mysterio", "transport_address" : "127.0.0.1:9300", "attributes" : { } } }, "metadata" : { "cluster_uuid" : "ZIl7g86YRiGv8Dqz4DCoAQ", "templates" : { }, "indices" : { "test1" : { "state" : "open", "settings" : { "index" : { "creation_date" : "1553995485603", "uuid" : "U7v5t_T7RG6rNU3JlGCCBQ", "number_of_replicas" : "1", "number_of_shards" : "1", "version" : { "created" : "2040599" } } }, "mappings" : { }, "aliases" : [ ] } } }, "routing_table" : { "indices" : { "test1" : { "shards" : { "0" : [ { "state" : "STARTED", "primary" : true, "node" : "KnEE25tzRjaXblFJq5jqRA", "relocating_node" : null, "shard" : 0, "index" : "test1", "version" : 2, "allocation_id" : { "id" : "lcSHbfWDRyOKOhXAf3HXLA" } }, { "state" : "UNASSIGNED", "primary" : false, "node" : null, "relocating_node" : null, "shard" : 0, "index" : "test1", "version" : 2, "unassigned_info" : { "reason" : "INDEX_CREATED", "at" : "2019-03-31T01:24:45.845Z" } } ] } } } }, "routing_nodes" : { "unassigned" : [ { "state" : "UNASSIGNED", "primary" : false, "node" : null, "relocating_node" : null, "shard" : 0, "index" : "test1", "version" : 2, "unassigned_info" : { "reason" : "INDEX_CREATED", "at" : "2019-03-31T01:24:45.845Z" } } ], "nodes" : { "KnEE25tzRjaXblFJq5jqRA" : [ { "state" : "STARTED", "primary" : true, "node" : "KnEE25tzRjaXblFJq5jqRA", "relocating_node" : null, "shard" : 0, "index" : "test1", "version" : 2, "allocation_id" : { "id" : "lcSHbfWDRyOKOhXAf3HXLA" } } ] } } }
整個結構比較複雜,咱們慢慢拆解, 一步步逐個擊破。 拆解的思路仍是從使用場景入手。 elasticsearch
"metadata" : { "cluster_uuid" : "ZIl7g86YRiGv8Dqz4DCoAQ", "templates" : { }, "indices" : { "test1" : { "state" : "open", "settings" : { "index" : { "creation_date" : "1553995485603", "uuid" : "U7v5t_T7RG6rNU3JlGCCBQ", "number_of_replicas" : "1", "number_of_shards" : "1", "version" : { "created" : "2040599" } } }, "mappings" : { }, "aliases" : [ ] } } }
即metadata中存儲了集羣中每一個索引的分片和副本數量, 索引的狀態, 索引的mapping, 索引的別名等。這種結構,能提供出來的功能就是根據索引名稱獲取索引元數據
, 代碼以下:分佈式
# OperationRouting.generateShardId() IndexMetaData indexMetaData = clusterState.metaData().index(index); if (indexMetaData == null) { throw new IndexNotFoundException(index); } final Version createdVersion = indexMetaData.getCreationVersion(); final HashFunction hashFunction = indexMetaData.getRoutingHashFunction(); final boolean useType = indexMetaData.getRoutingUseType();
這裏咱們關注點就是clusterState.metaData().index(index)
這句代碼,它實現了根據索引名稱獲取索引元數據的功能
。 經過元數據中的分片數結合文檔id,咱們就能定位出文檔所在的分片。 這個功能在Delete, Index, Get 三類API中都是必須的。 這裏咱們也能理解爲何ES的索引分片數量不能修改: 若是修改了,那麼hash函數就無法正肯定位數據所在分片。ide
"routing_table" : { "indices" : { "test1" : { "shards" : { "0" : [ { "state" : "STARTED", "primary" : true, "node" : "KnEE25tzRjaXblFJq5jqRA", "relocating_node" : null, "shard" : 0, "index" : "test1", "version" : 2, "allocation_id" : { "id" : "lcSHbfWDRyOKOhXAf3HXLA" } }, { "state" : "UNASSIGNED", "primary" : false, "node" : null, "relocating_node" : null, "shard" : 0, "index" : "test1", "version" : 2, "unassigned_info" : { "reason" : "INDEX_CREATED", "at" : "2019-03-31T01:24:45.845Z" } } ] } } } }
routing_table
存儲着每一個索引的分片信息,經過這個結構,咱們能清晰地瞭解以下的信息:
1. 索引分片在各個節點的分佈 2. 索引分片是否爲主分片
假如一個分片有2個副本,且都分配在不一樣的節點上,那麼get api
一共有三個數據節點可供選擇, 選擇哪個呢?這裏暫時不考慮帶preference
參數。
爲了使每一個節點都能公平被選擇到,達到負載均衡的目的,這裏用到了隨機數。參考RotateShuffer
/** * Basic {@link ShardShuffler} implementation that uses an {@link AtomicInteger} to generate seeds and uses a rotation to permute shards. */ public class RotationShardShuffler extends ShardShuffler { private final AtomicInteger seed; public RotationShardShuffler(int seed) { this.seed = new AtomicInteger(seed); } @Override public int nextSeed() { return seed.getAndIncrement(); } @Override public List<ShardRouting> shuffle(List<ShardRouting> shards, int seed) { return CollectionUtils.rotate(shards, seed); } }
也就是說使用ThreadLocalRandom.current().nextInt()
生成隨機數做爲種子, 而後取的時候依次旋轉。 Collections.rotate()
的效果能夠用以下的代碼演示:
public static void main(String[] args) { List<String> list = Lists.newArrayList("a","b","c"); int a = ThreadLocalRandom.current().nextInt(); List<String> l2 = CollectionUtils.rotate(list, a ); List<String> l3 = CollectionUtils.rotate(list, a+1); System.out.println(l2); System.out.println(l3); } ----- [b, c, a] [c, a, b]
好比請求A獲得的節點列表是[b,c,a], 那麼請求B獲得的節點列表是[c,a,b]。這樣就達到了負載均衡的目的。
routing_table
中存儲的是節點的id, 那麼將請求發送到目標節點時,還須要知道節點的ip及端口等配置信息。 這些信息存儲在nodes
中。"nodes" : { "KnEE25tzRjaXblFJq5jqRA" : { "name" : "Mysterio", "transport_address" : "127.0.0.1:9300", "attributes" : { } } }
經過這個nodes
獲取到節點信息後,就能夠發送請求了,ES全部內部節點的通訊都是基於transportService.sendRequest()
。
總結一下,本文基於get api
梳理了一下ES的ClusterState中的幾個核心結構: metadata
,nodes
, routing_table
。 還有一個routing_nodes
這裏沒有用到。後面梳理清楚使用場景後再記錄。