ElasticSearch-Java-low-level-rest-client官方文檔翻譯

       人肉翻譯,非谷歌機翻,部分地方添加了我的的理解,並作了分割,若有錯誤請在評論指出。轉載請指明原連接,尊重我的勞動成果。node

       High-Level-Rest-Client基於Low-Level-Rest-Client封裝,Client配置方面基於Low-Level,而API使用上基於High-Level。apache

       翻譯的版本爲6.5.4版本的Elasticsearch,部分不重要的內容(如Maven/Gradle座標、License等不在本文出現)。編程

       在實際的配置過程當中,查看源碼發現LowLevelClient徹底基於HttpAsyncClient來實現的,內部具體組裝Client的細節,若是有時間會寫另一篇博文分享出來。json

 
低級別rest-client包含了以下特性:
  • 最小的依賴
  • 在集羣全部結點間負載均衡
  • 結點失敗後,根據具體的響應碼(response code)進行失效轉移
  • 鏈接失敗懲罰機制(是否重連一個失敗的結點取決於它失敗的次數;嘗試從新鏈接失敗的次數越多,客戶端下一次嘗試從新鏈接該結點的等待時間也越長)
  • 持久化鏈接
  • 請求和響應的日誌追蹤
  • (可選功能)集羣結點自動發現
 

一、初始化客戶端

1.1.基本設置

        RestClient實例只須要經過RestClientBuilder類進行構建就行,以下所示,能夠提供多個鏈接實例的HttpHost(每個HttpHost都是一個結點Node):

1 RestClient restClient = RestClient.builder(
2     new HttpHost("localhost", 9200, "http"),
3     new HttpHost("localhost", 9201, "http")).build();
        RestClient是線程安全的,而且和它依附的應用有相同的生命週期。須要注意的是,當不須要Client的時候,須要關閉它以便正確的釋放其持有的資源(這些資源有底層的http client實例與它的線程):
1 restClient.close();

 

 1.2.參數化設置

   RestClientBuilder一樣容許參數化的設置以下參數來用於構建RestClient實例:
  • 設置默認的請求頭,該請求頭會應用到每一個request上去。不容許單獨的在每次請求時進行請求頭設置。
1 RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));
2 Header[] defaultHeaders = new Header[]{new BasicHeader("header", "value")};
3 builder.setDefaultHeaders(defaultHeaders);
  • 設置同一個請求的屢次嘗試時的最大超時時間,這個設置默認的時間是30秒(同socket timeout的默認超時時間同樣)。若是你修改了socket timeout,你也應該修改這個值來適應socket timeout。
1 RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));
2 builder.setMaxRetryTimeoutMillis(10000); 
  • 設置監聽器,當一個結點故障時收到通知,以採起措施。嗅探失敗節點功能會隱式的使用一個這樣的監聽器。
1 RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));
2 builder.setFailureListener(new RestClient.FailureListener() {
3     @Override
4     public void onFailure(Node node) {
5         
6     }
7 });
  • 設置結點選擇器,用於過濾某些es的結點(這些結點在初始化時設置到Client中)。當結點嗅探開啓時,若是你不想將請求發送至dedicated master結點上,該功能很好用。Client默認會將請求發送到全部配置的節點上。
1 RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));
2 builder.setNodeSelector(NodeSelector.SKIP_DEDICATED_MASTERS);
  • 經過一個回調函數修改默認的請求配置(如超時時間、驗證信息等其餘apache httpclient包中RequestConfig.Builder容許的配置)。
1 RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));
2 builder.setRequestConfigCallback(
3     new RestClientBuilder.RequestConfigCallback() {
4         @Override
5         public RequestConfig.Builder customizeRequestConfig(
6                 RequestConfig.Builder requestConfigBuilder) {
7             return requestConfigBuilder.setSocketTimeout(10000); 
8         }
9     });
  • 經過一個回調函數修改默認的HttpClient配置(如ssl配置等其餘apache httpclient包中HttpAsyncClientBuilder容許的配置)。
1 RestClientBuilder builder = RestClient.builder(new HttpHost("localhost", 9200, "http"));
2 builder.setHttpClientConfigCallback(new HttpClientConfigCallback() {
3         @Override
4         public HttpAsyncClientBuilder customizeHttpClient(
5                 HttpAsyncClientBuilder httpClientBuilder) {
6             return httpClientBuilder.setProxy(
7                 new HttpHost("proxy", 9000, "http"));  
8         }
9     });

 

二、執行請求

2.1.同步與異步API

        一旦Client建立,就能夠經過performRequest或performRequestAsync來執行請求:
(1)performRequest是同步請求,它會阻塞調用線程,在請求成功或者請求失敗拋出異常以後,返回Response;
1 Request request = new Request(
2     "GET",   //HTTP方法,支持GET、POST、HEAD等
3     "/");    //es服務端點endpoint
4 Response response = restClient.performRequest(request);
(2)performRequestAsync是異步請求,它接收一個ResponseListener參數,當請求成功或者請求失敗拋出異常以後,該Listener會被調起並傳入Response或Exception到相應方法;
 1 Request request = new Request(
 2     "GET",   //HTTP方法,支持GET、POST、HEAD等
 3     "/");    //es服務端點endpoint
 4 restClient.performRequestAsync(request, new ResponseListener() {
 5     @Override
 6     public void onSuccess(Response response) {
 7         //成功回調
 8     }
 9 
10     @Override
11     public void onFailure(Exception exception) {
12         //失敗回調
13     }
14 });
        另外,你能夠向Request對象中添加請求參數,以下:
1 request.addParameter("pretty", "true");

        若是將HttpEntity設置爲String,那麼ContentType會被默認設置爲application/json:api

2.2.請求可選項

        RequestOptions這個類用於設置請求可選項,咱們建議在同一個應用中,對於使用一樣的請求可選項的Request共享一個RequestOption實例,你能夠經過單例來實現:
1 private static final RequestOptions COMMON_OPTIONS;
2 static {
3     RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
4     builder.addHeader("Authorization", "Bearer " + TOKEN);  //設置請求頭的校驗信息
5     builder.setHttpAsyncResponseConsumerFactory( //設置Response的緩衝池大小          
6         new HeapBufferedResponseConsumerFactory(30 * 1024 * 1024 * 1024)); 
7     COMMON_OPTIONS = builder.build();
8 }
  • 這裏有一個addHeader方法,須要注意的是,在這裏不須要設置ContentType,由於在設置HttpEntity時就會自動關聯ContentType;
  • 你能夠在這裏設置NodeSelector。NodeSelector.NOT_MASTER_ONLY是一個比較好的選擇;
  • 你能夠在這裏定製用於緩衝異步響應的Response Consumer。默認的ResponseConsumer會在JVM堆上緩存100M的響應數據。若是響應數據比緩衝區過大,當次請求會失敗。若是你的應用跑在一個JVM堆比較小的環境中,你須要下降緩衝區的最大值。
        配置好RequestOptions以後,咱們在每個共享該配置的Request中,簡單設置一下該對象便可:
1 request.setOptions(COMMON_OPTIONS);

        若是有某些請求須要對OPTIONS作必定程度的變動,能夠經過以下方式增量處理而不是從新寫一遍設置代碼:數組

1 RequestOptions.Builder options = COMMON_OPTIONS.toBuilder();
2 options.addHeader("cats", "knock things off of other things");
3 request.setOptions(options); 

2.3.批量並行異步請求

        下面是一個使用閉鎖並行執行異步請求的例子。在實際的編程中,你通常會傾向於使用_bulk這個API來替代,固然了,下面的例子依然具備參考性:緩存

 1 final CountDownLatch latch = new CountDownLatch(documents.length);
 2 for (int i = 0; i < documents.length; i++) {
 3     Request request = new Request("PUT", "/posts/doc/" + i);
 4     //let's assume that the documents are stored in an HttpEntity array
 5     request.setEntity(documents[i]);
 6     restClient.performRequestAsync(
 7             request,
 8             new ResponseListener() {
 9                 @Override
10                 public void onSuccess(Response response) {
11                     
12                     latch.countDown();
13                 }
14 
15                 @Override
16                 public void onFailure(Exception exception) {
17                     
18                     latch.countDown();
19                 }
20             }
21     );
22 }
23 latch.await();

 

三、讀取響應

3.1.獲取響應對象

        響應對象,不管是經過同步仍是異步獲取到的(同步經過API返回值獲取,異步經過回調方法中的參數獲取),都是同一個類的對象。該對象包裝了http client原始的響應對象以及一些附加信息:
1 Response response = restClient.performRequest(new Request("GET", "/"));
2 RequestLine requestLine = response.getRequestLine(); 
3 HttpHost host = response.getHost(); 
4 int statusCode = response.getStatusLine().getStatusCode(); 
5 Header[] headers = response.getHeaders(); 
6 String responseBody = EntityUtils.toString(response.getEntity()); 
          若是執行請求失敗的話,你將會獲得一個異常,異常有如下幾種:
(1)IOException:通信問題,例如SocketTimeoutException;
(2)ResponseException:響應已經返回,可是其狀態碼不爲2XX時將拋出這個異常。ResponseException源自於一個有效的http Response,所以該異常會將源Response暴露出來,並容許你獲取它。
        注意:HEAD請求若是返回了404,ResponseException是不會被拋出的,由於對於HEAD來講資源未找到也是該方法指望的返回值之一。全部其餘的HTTP方法對於404返回碼都會拋出一個ResponseException,除非在ignore參數中包含了404。ignore是一個特殊的參數,該參數不會發送給es,它包含了一個分割的error status code列表。當Response的狀態碼在這個列表中時,它會被當成Response而不是做爲Exception。對於GET方法相關的API,這是一個至關有用的特性,好比GET查詢一個文檔返回404,咱們指望其不返回一個異常,而是一個沒有document的response對象。
        注意:low-level-client並不會給出json相關的編組與解組API。你能夠自由選擇相關的庫來進行json的編組與解組。
        底層的Apache Async Http Client庫附帶了不一樣的org.apache.http.HttpEntity實現,這些實現容許以不一樣的格式(流,字節數組,字符串等)提供請求主體。至於讀取響應體,HttpEntity#getContent方法很方便,它返回從先前緩衝的響應體讀取的InputStream。做爲替代方案,能夠提供一個自定義的org.apache.http.nio.protocol.HttpAsyncResponseConsumer來控制字節的讀取和緩衝方式。
 

四、通用配置

        在初始化客戶端中,咱們看到了RestClientBuilder支持提供一個RequestConfigCallback與一個可以用於個性化異步請求的HttpClientConfigCallback。這些回調函數可以讓你在不覆蓋每個RestClient初始化默認值的狀況下修改某些指定行爲。本小節咱們描述一些須要爲low-level-client添加額外配置的公共場景。

4.1.Timeouts

        在初始化客戶端的參數化設置就提過了,能夠經過以下形式進行設置:
 1 RestClientBuilder builder = RestClient.builder(
 2     new HttpHost("localhost", 9200))
 3     .setRequestConfigCallback(
 4         new RestClientBuilder.RequestConfigCallback() {
 5             @Override
 6             public RequestConfig.Builder customizeRequestConfig(
 7                     RequestConfig.Builder requestConfigBuilder) {
 8                 return requestConfigBuilder
 9                     .setConnectTimeout(5000) //默認值1S
10                     .setSocketTimeout(60000); //默認值30S
11             }
12         })
13     .setMaxRetryTimeoutMillis(60000); //設置完SocketTimeout後,通常須要聯動修改這個值

4.2.線程數

        ApacheHttpAsyncClient基於IOReactor模式,在啓動時會建立一個分發線程和一組工做線程,它們由Connection Manager使用,工做線程的默認數量與可以檢測到的CPU核心數一致(Runtime.getRuntime().availableProcessors()的返回結果)。能夠經過以下方法修改工做線程數:
 1 RestClientBuilder builder = RestClient.builder(
 2     new HttpHost("localhost", 9200))
 3     .setHttpClientConfigCallback(new HttpClientConfigCallback() {
 4         @Override
 5         public HttpAsyncClientBuilder customizeHttpClient(
 6                 HttpAsyncClientBuilder httpClientBuilder) {
 7             return httpClientBuilder.setDefaultIOReactorConfig(
 8                 IOReactorConfig.custom()
 9                     .setIoThreadCount(1)
10                     .build());
11         }
12     }); 

4.3.身份驗證

        使用以下的方式設置身份驗證相關的請求參數:
 1 final CredentialsProvider credentialsProvider =
 2     new BasicCredentialsProvider();
 3 credentialsProvider.setCredentials(AuthScope.ANY,
 4     new UsernamePasswordCredentials("user", "password"));
 5 
 6 RestClientBuilder builder = RestClient.builder(
 7     new HttpHost("localhost", 9200))
 8     .setHttpClientConfigCallback(new HttpClientConfigCallback() {
 9         @Override
10         public HttpAsyncClientBuilder customizeHttpClient(
11                 HttpAsyncClientBuilder httpClientBuilder) {
12             return httpClientBuilder
13                 .setDefaultCredentialsProvider(credentialsProvider);
14         }
15     });
         搶佔式(這裏翻譯不太肯定,結合上下文,應該指的是在第一次請求就發送身份驗證信息)身份驗證能夠被禁用,這意味着每一個HTTP請求都會在沒有身份驗證請求頭的狀況下發送,以查看它是否被接受,而且在收到HTTP 401響應後,它將使用含有身份驗證請求頭的報文從新發送徹底相同的請求。 若是您但願這樣作,那麼您能夠經過HttpAsyncClientBuilder禁用它:
 1 final CredentialsProvider credentialsProvider =
 2     new BasicCredentialsProvider();
 3 credentialsProvider.setCredentials(AuthScope.ANY,
 4     new UsernamePasswordCredentials("user", "password"));
 5 
 6 RestClientBuilder builder = RestClient.builder(
 7     new HttpHost("localhost", 9200))
 8     .setHttpClientConfigCallback(new HttpClientConfigCallback() {
 9         @Override
10         public HttpAsyncClientBuilder customizeHttpClient(
11                 HttpAsyncClientBuilder httpClientBuilder) {
12             httpClientBuilder.disableAuthCaching();  /* 禁用搶佔式身份驗證 */
13             return httpClientBuilder
14                 .setDefaultCredentialsProvider(credentialsProvider);
15         }
16     });

4.4.SSL通訊

        加密通訊一樣能夠經過HttpClientConfigCallback進行配置。HttpAsyncClientBuilder經過3個暴露的API:setSSLContext、setSSLSessionStrategy、setConnectionManager進行加密通訊的設置。下面是一個配置示例(若是沒有顯式配置,則會使用系統的默認配置):
 1 KeyStore truststore = KeyStore.getInstance("jks");
 2 try (InputStream is = Files.newInputStream(keyStorePath)) {
 3     truststore.load(is, keyStorePass.toCharArray());
 4 }
 5 SSLContextBuilder sslBuilder = SSLContexts.custom()
 6     .loadTrustMaterial(truststore, null);
 7 final SSLContext sslContext = sslBuilder.build();
 8 RestClientBuilder builder = RestClient.builder(
 9     new HttpHost("localhost", 9200, "https"))
10     .setHttpClientConfigCallback(new HttpClientConfigCallback() {
11         @Override
12         public HttpAsyncClientBuilder customizeHttpClient(
13                 HttpAsyncClientBuilder httpClientBuilder) {
14             return httpClientBuilder.setSSLContext(sslContext);
15         }
16     });

4.5.節點選擇器

        客戶端默認狀況下會以輪詢的形式將請求發送到每個初始化時配置的節點上。在初始化時,咱們能夠提供一個節點選擇器NodeSelector。當嗅探開啓時這個功能會很好用,以防只有HTTP請求才能訪問專用主節點。 對於每一個請求,客戶端將運行最終配置的節點選擇器以過濾候選節點,而後從列表中選擇下一個節點選擇器。下面是一個基於機架進行節點選擇的例子:
 1 RestClientBuilder builder = RestClient.builder(
 2         new HttpHost("localhost", 9200, "http"));
 3 builder.setNodeSelector(new NodeSelector() { 
 4     @Override
 5     public void select(Iterable<Node> nodes) {
 6         /*
 7          * Prefer any node that belongs to rack_one. If none is around
 8          * we will go to another rack till it's time to try and revive
 9          * some of the nodes that belong to rack_one.
10          */
11         boolean foundOne = false;
12         for (Node node : nodes) {
13             String rackId = node.getAttributes().get("rack_id").get(0);
14             if ("rack_one".equals(rackId)) {
15                 foundOne = true;
16                 break;
17             }
18         }
19         if (foundOne) {
20             Iterator<Node> nodesIt = nodes.iterator();
21             while (nodesIt.hasNext()) {
22                 Node node = nodesIt.next();
23                 String rackId = node.getAttributes().get("rack_id").get(0);
24                 if ("rack_one".equals(rackId) == false) {
25                     nodesIt.remove();
26                 }
27             }
28         }
29     }
30 });
         上面的例子中,優先選擇一號機架上的節點。當一號機架沒有任何節點時,纔會選擇其餘機架節點。
       須要注意的是,若是一個NodeSelector沒法提供一個始終如一的(或者說穩定的)節點列表,那麼輪詢發送請求至不一樣node的行爲將會變得不可預測或失效。在上面的例子中,輪詢行爲會很好的工做,由於這個例子中的代碼能夠很好的(或比較穩定的)獲取到哪些結點可用(這些獲取到的可用結點可以影響輪詢行爲的可預測性)。所以,這也提醒咱們NodeSelector不該該依賴於任何外部因素,不然輪詢行爲將會變的不可預測。
 

五、嗅探Sniffer

        容許從一個運行的ES集羣中自動發現結點,而且將發現的結點設置入一個已存在的RestClient。默認的,它使用節點信息api檢索屬於集羣的節點,並使用jackson解析得到的json響應。
        要使用該功能,必須引入依賴,該依賴版本同rest-client版本同樣,跟着es的發佈版本走的:
1 <dependency>
2     <groupId>org.elasticsearch.client</groupId>
3     <artifactId>elasticsearch-rest-client-sniffer</artifactId>
4     <version>6.5.4</version>
5 </dependency>

5.1.基本設置

        在初始化的RestClient的時候將一個Sniffer與其進行關聯。Sniffer嗅探器會爲RestClient提供一個週期性的拉取es全部結點列表的功能(默認5分鐘拉取一次),拉取到節點列表後,會調用RestClient.setNodes的API將其更新入RestClient。
1 RestClient restClient = RestClient.builder(
2     new HttpHost("localhost", 9200, "http"))
3     .build();
4 Sniffer sniffer = Sniffer.builder(restClient).build(); 
        Sniffer應該具備同RestClient同樣的生命週期,也就是說,在關閉RestClient時,也須要關閉Sniffer以釋放其佔用的資源:
1 sniffer.close();
2 restClient.close(); 

5.2.定製刷新間隔

        上面咱們提到了,Sniffer默認5分鐘刷新一次,咱們能夠在初始化Sniffer的時候改變這個時間(單爲:毫秒):
1 RestClient restClient = RestClient.builder(
2     new HttpHost("localhost", 9200, "http"))
3     .build();
4 Sniffer sniffer = Sniffer.builder(restClient)
5     .setSniffIntervalMillis(60000).build(); //設置Sniffer刷新間隔
        另外,咱們能夠在請求失敗上啓用Sniffer,即在每次請求節點失敗後,可讓Sniffer執行一次節點更新(區別於Sniffer固定間隔的刷新)。咱們須要爲RestClient提供一個SnifferOnFailureListener,並在Client初始化的時候設置這個監聽器,同時在建立Sniffer以後也須要將其設置給這個監聽器。這樣一來,每次失敗以後,監聽器監聽到失敗就會委託Sniffer去執行節點刷新:
 1 SniffOnFailureListener sniffOnFailureListener =
 2     new SniffOnFailureListener(); //建立Sniffer失敗監聽器
 3 RestClient restClient = RestClient.builder(
 4     new HttpHost("localhost", 9200))
 5     .setFailureListener(sniffOnFailureListener) //爲RestClient設置監聽器
 6     .build();
 7 Sniffer sniffer = Sniffer.builder(restClient)
 8     .setSniffAfterFailureDelayMillis(30000) //設置失敗以後的額外調度延遲
 9     .build();
10 sniffOnFailureListener.setSniffer(sniffer); //爲監聽器設置Sniffer
  • 關於setSniffAfterFailureDelayMillis:當咱們在失敗上設置了Sniffer監聽後,每次失敗不只會經過監聽器勾起一次Sniffer的節點刷新行爲,一次額外的Sniffer節點刷新行爲也會比日常的間隔調度更快的到來(這是Sniffer的默認特徵,且默認時間爲1分鐘)。假設失敗的節點可以很快的恢復,咱們的應用須要儘快知道這個狀況,那麼咱們能夠經過setSniffAfterFailureDelayMillis來調整這個額外Sniffer行爲的延遲時間(這是什麼意思呢?舉個例子,當請求失敗後,經過監聽器觸發的Sniffer行爲獲取到的節點列表多是剔除了有問題節點的列表,若是此時問題節點未恢復的話;可能這些問題節點在一陣子後自行恢復了,可是Sniffer尚未到下次觸發時間,所以咱們須要調整一下這個失敗後Sniffer額外調度的延遲時間,縮短該時間以儘快拉取到恢復後的節點列表)。

5.3.設置節點通訊協議

         Sniffer的節點列表刷新行爲使用的是NodeInfo API,這個API只能返回節點的IP和PORT信息,對於鏈接節點使用到的協議沒法返回,所以默認認爲節點通訊使用的是HTTP協議。在使用HTTPS協議的狀況下,咱們須要手工的設置一個ElasticsearchNodesSniffer實例用來告知Sniffer節點通訊的協議是HTTPS:
1 RestClient restClient = RestClient.builder(
2         new HttpHost("localhost", 9200, "http"))
3         .build();
4 NodesSniffer nodesSniffer = new ElasticsearchNodesSniffer(
5         restClient,
6         ElasticsearchNodesSniffer.DEFAULT_SNIFF_REQUEST_TIMEOUT,
7         ElasticsearchNodesSniffer.Scheme.HTTPS);
8 Sniffer sniffer = Sniffer.builder(restClient)
9         .setNodesSniffer(nodesSniffer).build();
        經過一樣的方式,咱們能夠設置SnifferRequestTimeout這個參數的值(默認1S)。當調用NodeInfoAPI時,這個超時參數會做爲查詢字符串中的參數,它告訴ES服務器查詢節點列表的超時時間。所以當服務器側出現超時的時候仍然會返回一個有效的響應結果(服務器側查詢節點列表的超時,不表明Sniffer請求自身的超時),這個響應結果可能只包含節點列表的一個子集,即在超時時間內能獲取到的節點列表數據:
1 RestClient restClient = RestClient.builder(
2     new HttpHost("localhost", 9200, "http"))
3     .build();
4 NodesSniffer nodesSniffer = new ElasticsearchNodesSniffer(
5     restClient,
6     TimeUnit.SECONDS.toMillis(5),
7     ElasticsearchNodesSniffer.Scheme.HTTP);
8 Sniffer sniffer = Sniffer.builder(restClient)
9     .setNodesSniffer(nodesSniffer).build(); 

5.4.自定義NodesSniffer

        有時候,咱們須要自定義一個NodesSniffer完成一些個性化的特殊功能,好比從一個內部資源中獲取節點列表(如文件)而不是從ES服務器集羣中獲取節點列表:
 1 RestClient restClient = RestClient.builder(
 2     new HttpHost("localhost", 9200, "http"))
 3     .build();
 4 NodesSniffer nodesSniffer = new NodesSniffer() {
 5         @Override
 6         public List<Node> sniff() throws IOException {
 7             return null; 
 8         }
 9     };
10 Sniffer sniffer = Sniffer.builder(restClient)
11     .setNodesSniffer(nodesSniffer).build();
相關文章
相關標籤/搜索