使用Java Low Level REST Client操做elasticsearch

Java REST客戶端有兩種風格:html

Java低級別REST客戶端(Java Low Level REST Client,之後都簡稱低級客戶端算了,可貴碼字):Elasticsearch的官方low-level客戶端。 它容許經過http與Elasticsearch集羣進行通訊。 不會對請求進行編碼和響應解碼。 它與全部Elasticsearch版本兼容。
Java高級REST客戶端(Java High Level REST Client,之後都簡稱高級客戶端):Elasticsearch的官方high-level客戶端。 基於low-level客戶端,它公開了API特定的方法,並負責處理。java

低級客戶端 的功能包括:apache

  • 依賴最小
  • 全部可用節點,會負載平衡
  • 在節點故障和響應特定狀態碼的狀況下會進行故障轉移
  • 鏈接失敗會進行處罰(失敗的節點是否重試,取決於連續失敗的次數,失敗次數越多,客戶端等待的時間越長)
  • 持久鏈接
  • 跟蹤記錄請求和響應的日誌
  • 可選的自動發現羣集節點

低級客戶端 快速入門

Java API文檔在這裏能夠找到。低級客戶端託管在Maven Central上。所需的最低Java版本是1.7。低級客戶端與Elasticsearch的發佈週期相同。發佈的第一個版本爲5.0.0-alpha4。客戶端版本和與之通訊的Elasticsearch版本沒有任何關係,能夠替換客戶端版本爲你想要的任何版本。低級客戶端與全部Elasticsearch版本兼容。json

Maven Repository  

Maven 配置api

下面是使用maven做爲依賴管理器配置依賴項。 將如下內容添加到您的pom.xml文件中:數組

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-client</artifactId>
    <version>6.2.3</version>
</dependency>

Gradle 配置緩存

 下面是使用gradle做爲依賴項管理器來配置依賴項。在您的build.gradle中添加如下內容:安全

dependencies {
    compile 'org.elasticsearch.client:elasticsearch-rest-client:6.2.3'
}

Dependencies

低級客戶端在內部使用Apache Http Async Client發送http請求。 它依賴於如下部件,即the async http client及其自身的傳遞依賴:curl

  • org.apache.httpcomponents:httpasyncclient
  • org.apache.httpcomponents:httpcore-nio
  • org.apache.httpcomponents:httpclient
  • org.apache.httpcomponents:httpcore
  • commons-codec:commons-codec
  • commons-logging:commons-logging

Shading

爲了不版本衝突,依賴須要shaded(翻譯爲被隱藏不知合不合理)和打包到一個單獨的jar文件中。(該操做也被稱做"uber JAR"或"fat JAR",是一種可執行的Jar包。FatJar和普通的jar不一樣在於它包含了依賴的jar包。)
對依賴進行隱藏須要取其內容(資源文件和java類文件),而後在放到jar文件以前會對一些包進行重命名。該操做可使用第三方的插件,好比Gradle 和 Maven來完成。感興趣的參照這兒。異步

請注意,隱藏一個JAR也是有缺點的。 例如,隱藏 Commons Logging 層,意味着也須要對依賴的第三方日誌進行隱藏。

Maven 配置

下面是使用Maven Shade插件的配置。將如下內容添加到您的pom中。

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.1.0</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals><goal>shade</goal></goals>
                    <configuration>
                        <relocations>
                            <relocation>
                                <pattern>org.apache.http</pattern>
                                <shadedPattern>hidden.org.apache.http</shadedPattern>
                            </relocation>
                            <relocation>
                                <pattern>org.apache.logging</pattern>
                                <shadedPattern>hidden.org.apache.logging</shadedPattern>
                            </relocation>
                            <relocation>
                                <pattern>org.apache.commons.codec</pattern>
                                <shadedPattern>hidden.org.apache.commons.codec</shadedPattern>
                            </relocation>
                            <relocation>
                                <pattern>org.apache.commons.logging</pattern>
                                <shadedPattern>hidden.org.apache.commons.logging</shadedPattern>
                            </relocation>
                        </relocations>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

Gradle 配置

下面是使用Gradle ShadowJar插件的配置。在您的 build.gradle 中添加如下內容。

shadowJar {
    relocate 'org.apache.http', 'hidden.org.apache.http'
    relocate 'org.apache.logging', 'hidden.org.apache.logging'
    relocate 'org.apache.commons.codec', 'hidden.org.apache.commons.codec'
    relocate 'org.apache.commons.logging', 'hidden.org.apache.commons.logging'
}

初始化

RestClient實例能夠經過RestClientBuilder類建立,經過RestClient 的 builder(HttpHost ...)靜態方法建立。 惟一須要的參數是客戶端將與之通訊的一個或多個主機,以下所示:

RestClient restClient = RestClient.builder(
        new HttpHost("localhost", 9200, "http"),
        new HttpHost("localhost", 9201, "http")).build();

RestClient類是線程安全的,理想狀況下與使用它的應用程序具備相同的生命週期。當再也不須要時關閉它是很是重要的,這樣它所使用的全部資源以及底層http客戶端實例及其線程均可以獲得釋放。

restClient.close();

RestClientBuilder還容許在構建RestClient實例時可選地設置如下配置參數:

       //配置可選參數
        RestClientBuilder builder = RestClient.builder(
                new HttpHost("localhost", 9200, "http"));
        Header[] defaultHeaders = new Header[]{new BasicHeader("header", "value")};
        //設置每一個請求須要發送的默認headers,這樣就不用在每一個請求中指定它們。
        builder.setDefaultHeaders(defaultHeaders);
        // 設置應該授予的超時時間,以防對相同的請求進行屢次嘗試。默認值是30秒,與默認socket超時時間相同。
        // 若是自定義socket超時時間,則應相應地調整最大重試超時時間。
        builder.setMaxRetryTimeoutMillis(10000);
        builder.setFailureListener(new RestClient.FailureListener() {
            @Override
            public void onFailure(HttpHost host) {
                //設置一個監聽程序,每次節點發生故障時都會收到通知,這樣就能夠採起相應的措施。
                //Used internally when sniffing on failure is enabled.(這句話沒搞懂啥意思)
            }
        });
        builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
            @Override
            public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
                //設置容許修改默認請求配置的回調
                // (例如,請求超時,身份驗證或org.apache.http.client.config.RequestConfig.Builder容許設置的任何內容)
                return requestConfigBuilder.setSocketTimeout(10000);
            }
        });
        builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
            @Override
            public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                //設置容許修改http客戶端配置的回調
                // (例如,經過SSL的加密通訊,或者org.apache.http.impl.nio.client.HttpAsyncClientBuilder容許設置的任何內容)
                return httpClientBuilder.setProxy(new HttpHost("proxy", 9000, "http"));
            }
        });

執行請求

 一旦建立了RestClient,就能夠調用performRequest或performRequestAsync方法來發送請求。

performRequest方法是同步的,直接返回響應,這意味着客戶端將被阻塞並等待響應返回。

performRequestAsync方法返回void,並接受一個ResponseListener做爲參數,這意味着它們是異步執行的。當請求完成或失敗時,監聽器將被通知。

發送同步請求

    //方式1:只提供謂詞和終節點,這兩個參數是必須要的參數
        Response response = restClient.performRequest("GET", "/");

        //方式2:提供謂詞和終節點以及一些查詢字符串參數來發送請求
        Map<String, String> params = Collections.singletonMap("pretty", "true");
        response = restClient.performRequest("GET", "/", params);

        //方式3:提供謂詞和終節點以及可選查詢字符串參數和org.apache.http.HttpEntity對象中包含的請求主體來發送請求
        params = Collections.emptyMap();
        String jsonString = "{" +
                "\"user\":\"kimchy\"," +
                "\"postDate\":\"2013-01-30\"," +
                "\"message\":\"trying out Elasticsearch\"" +
                "}";
        //爲HttpEntity指定ContentType很是重要,由於它將用於設置Content-Type請求頭,以便Elasticsearch能夠正確解析內容。
        HttpEntity entity = new NStringEntity(jsonString, ContentType.APPLICATION_JSON);
        response = restClient.performRequest("PUT", "/posts/doc/1", params, entity);

        //方式4:提供謂詞,終節點,可選查詢字符串參數,可選請求主體
        // 以及用於爲每一個請求嘗試建立org.apache.http.nio.protocol.HttpAsyncResponseConsumer回調實例的可選工廠來發送請求。
        // 控制響應正文如何從客戶端的非阻塞HTTP鏈接進行流式傳輸。
        // 若是未提供,則使用默認實現,將整個響應主體緩存在堆內存中,最大爲100 MB。
        params = Collections.emptyMap();
        HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory consumerFactory =
                new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024);
        response = restClient.performRequest("GET", "/posts/_search", params, null, consumerFactory);

 發送異步請求

    //方式1: 提供謂詞,終節點和響應監聽器來發送異步請求,一旦請求完成,就會通知響應監聽器,這三個參數是必須要的參數
        ResponseListener responseListener = new ResponseListener() {
            @Override
            public void onSuccess(Response response) {
                // 定義請求成功執行時須要作的事情
            }
            @Override
            public void onFailure(Exception exception) {
                // 定義請求失敗時須要作的事情,即每當發生鏈接錯誤或返回錯誤狀態碼時作的操做。
            }
        };
        restClient.performRequestAsync("GET", "/", responseListener);

        //方式2: 提供謂詞,終節點,一些查詢字符串參數和響應監聽器來發送異步請求
        Map<String, String>  params = Collections.singletonMap("pretty", "true");
        restClient.performRequestAsync("GET", "/", params, responseListener);

        //方式3:提供謂詞,終節點,可選查詢字符串參數,
        // org.apache.http.HttpEntity對象中包含的請求主體以及在請求完成後通知響應偵聽器 來發送異步請求
        String jsonString = "{" +
                "\"user\":\"kimchy\"," +
                "\"postDate\":\"2013-01-30\"," +
                "\"message\":\"trying out Elasticsearch\"" +
                "}";
        NStringEntity entity = new NStringEntity(jsonString, ContentType.APPLICATION_JSON);
        restClient.performRequestAsync("PUT", "/posts/doc/1", params, entity, responseListener);

        //方式4:提供謂詞,終節點,可選查詢字符串參數,可選請求主體
        // 以及用於爲每一個請求嘗試建立org.apache.http.nio.protocol.HttpAsyncResponseConsumer回調實例的可選工廠 來發送異步請求。
        // 控制響應正文如何從客戶端的非阻塞HTTP鏈接進行流式傳輸。
        // 若是未提供,則使用默認實現,將整個響應主體緩存在堆內存中,最大爲100 MB。
        HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory  consumerFactory =
                new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024);
        restClient.performRequestAsync("GET", "/posts/_search", params, null, consumerFactory, responseListener);

接下來是一個發送異步請求的基本示例:

final CountDownLatch latch = new CountDownLatch(documents.length);
        for (int i = 0; i < documents.length; i++) {
            restClient.performRequestAsync(
                    "PUT",
                    "/posts/doc/" + i,
                    Collections.<String, String>emptyMap(),
                    //此處假設文檔已存在 HttpEntity數組裏
                    documents[i],
                    new ResponseListener() {
                        @Override
                        public void onSuccess(Response response) {
                            //處理返回的響應內容
                            latch.countDown();
                        }

                        @Override
                        public void onFailure(Exception exception) {
                            // 因爲通訊錯誤或帶有指示錯誤的狀態碼的響應,用於處理返回的異常
                            latch.countDown();
                        }
                    }
            );
        }
        latch.await();

上面列出的每一個方法都支持經過Header 可變參數發送請求頭,源碼以下:

一個header時, 以下例所示:

Response response = restClient.performRequest("GET", "/", new BasicHeader("header", "value"));

 多個header時,以下所示:

Header[] headers = {
        new BasicHeader("header1", "value1"),
        new BasicHeader("header2", "value2")
};
restClient.performRequestAsync("GET", "/", responseListener, headers);

獲取響應

 Response對象(由同步performRequest方法返回或由ResponseListener的onSuccess(Response)中的參數接收),包裝從http客戶端返回的響應對象並公開一些其餘的信息。

 RestClient restClient = RestClient.builder(
                new HttpHost("localhost", 9200, "http"),
                new HttpHost("localhost", 9201, "http")).build();
        Response response = restClient.performRequest("GET", "/");
        RequestLine requestLine = response.getRequestLine();//關於已執行請求的信息
        HttpHost host = response.getHost();//返回響應的主機
        int statusCode = response.getStatusLine().getStatusCode();//響應狀態行,能夠從中獲取狀態碼
        Header[] headers = response.getHeaders();// 獲取響應頭
        String header=response.getHeader("content-type");// 獲取指定名稱的響應頭
        String responseBody = EntityUtils.toString(response.getEntity());//響應體包含在org.apache.http.HttpEntity對象中

執行請求時,會在如下狀況中引起異常(異步時在ResponseListener#onFailure(Exception)中做爲參數接收到該異常):

  1. IOException,通訊問題(例如SocketTimeoutException)
  2. ResponseException,返回了一個響應,可是它的狀態碼代表是錯誤的(不是2xx)。 ResponseException是一個有效的http響應,所以它暴露了其相應的Response對象,能夠訪問返回的響應。

對於返回404狀態代碼的HEAD請求,不會引起ResponseException,由於它是預期的HEAD響應,它只是表示找不到資源。 

Response response = restClient.performRequest("HEAD", "/s");//不會拋異常

除非ignore參數包含404,不然全部其餘HTTP方法(例如GET)都會爲404響應拋出ResponseException。

ignore是一個特殊的客戶端參數,它不會發送到Elasticsearch,且包含以逗號分隔的錯誤狀態碼列表。 它容許控制是否應將某些錯誤狀態碼視爲預期響應而不是異常。

這對於get api來講頗有用,由於它能夠在缺乏文檔時返回404,在這種狀況下,響應主體將不會包含錯誤,而是一般的get api響應,只是沒有文檔,由於它沒有找到。

注意,低級客戶端不會序列化或反序列化json。用戶能夠自由使用他們喜歡的庫。

底層的Apache Async Http Client附帶不一樣的org.apache.http.HttpEntity實現,容許以不一樣格式(流,字節數組,字符串等)提供請求體。

至於讀取響應體,HttpEntity的getContent方法很方便,它會返回來自先前緩衝的響應體的InputStream。做爲一種替代方法,能夠提供一個自定義org.apache.http.nio.protocol.HttpAsyncResponseConsumer來控制如何讀取和緩衝字節。

 日誌

The Java REST client使用了和Apache Async Http Client相同的日誌庫:Apache Commons Logging,
它支持許多流行的日誌實現。 用於啓用日誌記錄功能的java包分別是客戶端自己的org.elasticsearch.client,以及嗅探器的org.elasticsearch.client.sniffer。

也能夠啓用以curl格式來記錄每一個請求和相應的響應。 這使得調試時很是方便。

例如在須要手動執行請求以檢查它是否仍然產生相同的響應時。 爲跟蹤器包啓用日誌記錄以打印出此類日誌。 請注意,該類型的日誌記錄很是昂貴,不該始終在生產環境中啓用,而只是在須要時暫時使用。

官方文檔:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-overview.html

相關文章
相關標籤/搜索