Elasticsearch High Level Rest Client 發起請求的過程分析

本文討論的是JAVA High Level Rest Client向ElasticSearch6.3.2發送請求(index操做、update、delete……)的一個詳細過程的理解,主要涉及到Rest Client如何選擇哪一臺Elasticsearch服務器發起請求。html

maven依賴以下:java

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>6.3.2</version>
</dependency>

High Level Rest Client 爲這些請求提供了兩套接口:同步和異步,異步接口以Async結尾。以update請求爲例,以下:node

官方也提供了詳細的示例來演示如何使用這些API:java-rest-high,在使用以前須要先初始化一個RestHighLevelClient 而後就能夠參考API文檔開發了。RestHighLevelClient 底層封裝的是一個http鏈接池,當須要執行 update、index、delete操做時,直接從鏈接池中取出一個鏈接,而後發送http請求到ElasticSearch服務端,服務端基於Netty接收請求。算法

The high-level client will internally create the low-level client used to perform requests based on the provided builder. That low-level client maintains a pool of connections

本文的主要內容是探究一下 index/update/delete請求是如何一步步構造,併發送到ElasticSearch服務端的,並重點探討選擇向哪一個ElasticSearch服務器發送請求的 round robin 算法編程

以update請求爲例:構造了update請求後:執行esClient.update(updateRequest);發起請求:json

updateRequest.doc(XContentFactory.jsonBuilder().startObject().field(fieldName, val).endObject());
            UpdateResponse response = esClient.update(updateRequest);

最終會執行到performRequest(),index、delete請求最終也是執行到這個方法:服務器

/**
     * Sends a request to the Elasticsearch cluster that the client points to. Blocks until the request is completed and returns
     * its response or fails by throwing an exception. Selects a host out of the provided ones in a round-robin fashion. Failing hosts
     * are marked dead and retried after a certain amount of time (minimum 1 minute, maximum 30 minutes), depending on how many times
     * they previously failed (the more failures, the later they will be retried). In case of failures all of the alive nodes (or dead
     * nodes that deserve a retry) are retried until one responds or none of them does, in which case an {@link IOException} will be thrown.
     *
     *
     */
    public Response performRequest(String method, String endpoint, Map<String, String> params,
                                   HttpEntity entity, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
                                   Header... headers) throws IOException {
        SyncResponseListener listener = new SyncResponseListener(maxRetryTimeoutMillis);
        performRequestAsyncNoCatch(method, endpoint, params, entity, httpAsyncResponseConsumerFactory,
            listener, headers);
        return listener.get();
    }

看這個方法的註釋,向Elasticsearch cluster發送請求,並等待響應。等待響應就是經過建立一個SyncResponseListener,而後執行performRequestAsyncNoCatch先異步把HTTP請求發送出去,而後SyncResponseListener等待獲取請求的響應結果,即:listener.get();阻塞等待直到拿到HTTP請求的響應結果。併發

performRequestAsyncNoCatch()裏面調用的內容以下:app

client.execute(requestProducer, asyncResponseConsumer, context, new FutureCallback<HttpResponse>() {
            @Override
            public void completed(HttpResponse httpResponse) {

也就是CloseableHttpAsyncClient的execute()方法向ElasticSearch服務端發起了HTTP請求。(rest-high-level client封裝的底層http鏈接池)負載均衡

以上就是:ElasticSearch JAVA High Level 同步方法的具體執行過程。總結起來就二句:performRequestAsyncNoCatch異步發送請求,SyncResponseListener阻塞獲取響應結果。異步方法的執行方式也是相似的。

這篇文章中提到,ElasticSearch集羣中每一個節點默認都是Coordinator 節點,能夠接收Client的請求。由於在建立ElasticSearch JAVA High Level 時,通常會配置多個IP地址,以下就配置了三臺:

//      es中默認 每一個節點都是 coordinating node
            String[] nodes = clusterNode.split(",");
            HttpHost host_0 = new HttpHost(nodes[0].split(":")[0], Integer.parseInt(nodes[0].split(":")[1]), "http");
            HttpHost host_1 = new HttpHost(nodes[1].split(":")[0], Integer.parseInt(nodes[1].split(":")[1]), "http");
            HttpHost host_2 = new HttpHost(nodes[2].split(":")[0], Integer.parseInt(nodes[2].split(":")[1]), "http");
            restHighLevelClient = new RestHighLevelClient(RestClient.builder(host_0, host_1, host_2));

那麼,Client在發起HTTP請求時,究竟是請求到了哪臺ElasticSearch服務器上呢?這就是本文想要討論的問題。

而發送請求主要由RestClient實現,看看這個類的源碼註釋,裏面就提到了sending a request, a host gets selected out of the provided ones in a round-robin fashion.

/**
 * Client that connects to an Elasticsearch cluster through HTTP.
 * The hosts that are part of the cluster need to be provided at creation time, but can also be replaced later
 * The method {@link #performRequest(String, String, Map, HttpEntity, Header...)} allows to send a request to the cluster. When
 * sending a request, a host gets selected out of the provided ones in a round-robin fashion. Failing hosts are marked dead and
 * retried after a certain amount of time (minimum 1 minute, maximum 30 minutes), depending on how many times they previously
 * failed (the more failures, the later they will be retried). In case of failures all of the alive nodes (or dead nodes that
 * deserve a retry) are retried until one responds or none of them does, in which case an {@link IOException} will be thrown.
 * <p>
 * Requests can be either synchronous or asynchronous. The asynchronous variants all end with {@code Async}.
 * <p>
 */
public class RestClient implements Closeable {
    
    //一些代碼
    
    
        /**
     * {@code HostTuple} enables the {@linkplain HttpHost}s and {@linkplain AuthCache} to be set together in a thread
     * safe, volatile way.
     */
    private static class HostTuple<T> {
        final T hosts;
        final AuthCache authCache;

        HostTuple(final T hosts, final AuthCache authCache) {
            this.hosts = hosts;
            this.authCache = authCache;
        }
    }
}

HostTuple是RestClient是靜態內部類,封裝在配置文件中配置的ElasticSearch集羣中各臺機器的IP地址和端口。

所以,對於Client而言,存在2個問題:

  1. 怎樣選一臺「可靠的」機器,而後放心地把個人請求交給它?
  2. 若是Client端的請求量很是大,不能總是把請求都往ElasticSearch某一臺服務器發,應該要考慮一下負載均衡。

其實具體的算法實現細節我也沒有深刻去研究理解,不過把這兩個問題抽象出來,其實在不少場景中都能碰到。

客戶端想要鏈接服務端,服務器端提供了不少主機可供選擇,我應該須要考慮哪些因素,選一臺合適的主機鏈接?

performRequestAsync方法的參數中,會調用RestClient類的netxtHost():方法,選擇合適的ElasticSearch服務器IP進行鏈接。

void performRequestAsyncNoCatch(String method, String endpoint, Map<String, String> params,
                                    HttpEntity entity, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
                                    ResponseListener responseListener, Header... headers) {
    
    //省略其餘無關代碼
        performRequestAsync(startTime, nextHost(), request, ignoreErrorCodes, httpAsyncResponseConsumerFactory,
                failureTrackingResponseListener);
}
/**
     * Returns an {@link Iterable} of hosts to be used for a request call.
     * Ideally, the first host is retrieved from the iterable and used successfully for the request.
     * Otherwise, after each failure the next host has to be retrieved from the iterator so that the request can be retried until
     * there are no more hosts available to retry against. The maximum total of attempts is equal to the number of hosts in the iterable.
     * The iterator returned will never be empty. In case there are no healthy hosts available, or dead ones to be be retried,
     * one dead host gets returned so that it can be retried.
     */
    private HostTuple<Iterator<HttpHost>> nextHost() {

nextHost()方法的大體邏輯以下:

do{
    //先從HostTuple中拿到ElasticSearch集羣配置的主機信息
    //....
    
    if (filteredHosts.isEmpty()) {
        //last resort: if there are no good hosts to use, return a single dead one, the one that's closest to being retried
        //全部的主機都不可用,那就死馬當活馬醫
        HttpHost deadHost = sortedHosts.get(0).getKey();
        nextHosts = Collections.singleton(deadHost);
    }else{
        List<HttpHost> rotatedHosts = new ArrayList<>(filteredHosts);
        //rotate()方法選取最適合鏈接的主機
                Collections.rotate(rotatedHosts, rotatedHosts.size() - lastHostIndex.getAndIncrement());
                nextHosts = rotatedHosts;
    }
    
}while(nextHosts.isEmpty())

選擇ElasticSearch主機鏈接主要是由rotate()實現的。該方法裏面又有2種實現,具體代碼就不貼了,看註釋:

/**
     * Rotates the elements in the specified list by the specified distance.
     * After calling this method, the element at index <tt>i</tt> will be
     * the element previously at index <tt>(i - distance)</tt> mod
     * <tt>list.size()</tt>, for all values of <tt>i</tt> between <tt>0</tt>
     * and <tt>list.size()-1</tt>, inclusive.  (This method has no effect on
     * the size of the list.)
     *
     * <p>For example, suppose <tt>list</tt> comprises<tt> [t, a, n, k, s]</tt>.
     * After invoking <tt>Collections.rotate(list, 1)</tt> (or
     * <tt>Collections.rotate(list, -4)</tt>), <tt>list</tt> will comprise
     * <tt>[s, t, a, n, k]</tt>.
     *
     * <p>Note that this method can usefully be applied to sublists to
     * move one or more elements within a list while preserving the
     * order of the remaining elements.  For example, the following idiom
     * moves the element at index <tt>j</tt> forward to position
     * <tt>k</tt> (which must be greater than or equal to <tt>j</tt>):
     * <pre>
     *     Collections.rotate(list.subList(j, k+1), -1);
     * </pre>
     * To make this concrete, suppose <tt>list</tt> comprises
     * <tt>[a, b, c, d, e]</tt>.  To move the element at index <tt>1</tt>
     * (<tt>b</tt>) forward two positions, perform the following invocation:
     * <pre>
     *     Collections.rotate(l.subList(1, 4), -1);
     * </pre>
     * The resulting list is <tt>[a, c, d, b, e]</tt>.
     *
     * <p>To move more than one element forward, increase the absolute value
     * of the rotation distance.  To move elements backward, use a positive
     * shift distance.
     *
     * <p>If the specified list is small or implements the {@link
     * RandomAccess} interface, this implementation exchanges the first
     * element into the location it should go, and then repeatedly exchanges
     * the displaced element into the location it should go until a displaced
     * element is swapped into the first element.  If necessary, the process
     * is repeated on the second and successive elements, until the rotation
     * is complete.  If the specified list is large and doesn't implement the
     * <tt>RandomAccess</tt> interface, this implementation breaks the
     * list into two sublist views around index <tt>-distance mod size</tt>.
     * Then the {@link #reverse(List)} method is invoked on each sublist view,
     * and finally it is invoked on the entire list.  For a more complete
     * description of both algorithms, see Section 2.3 of Jon Bentley's
     * <i>Programming Pearls</i> (Addison-Wesley, 1986).
     *
     */
    public static void rotate(List<?> list, int distance) {
        if (list instanceof RandomAccess || list.size() < ROTATE_THRESHOLD)
            rotate1(list, distance);
        else
            rotate2(list, distance);
    }

若是想要了解算法的具體思路就結合源碼並參考:《編程珠璣》2.3節中的詳細描述。

原文:https://www.cnblogs.com/hapjin/p/10116073.html

相關文章
相關標籤/搜索