ElasticSearch -- TransportClient

  • 前言html

    因爲須要兼容服務器上部署5.4.3版本的ElasticSearch,所以ElasticSearch的版本也必需要5.X,同時SpringBoot也要升級到2.X。java

    重點來了,暫時不想升級SpringBoot的版本,因此只能不使用spring-boot-starter-data-elasticsearch的jar包來集成ElasticSearch。改用TransportClient來集成!node

  • jar包spring

    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>transport</artifactId>
        <version>5.4.3</version>
        <exclusions>
            <exclusion>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>5.4.3</version>
    </dependency>

記錄一個大坑: Caused by: java.lang.ClassNotFoundException: org.elasticsearch.plugins.NetworkPluginjson

切記必定要把包放到ClassPath根路徑下,否則他掃不到NetworkPlugin!!!api

  • 配置緩存

    ## 集羣名(默認是elasticsearch)
    elasticsearch.cluster-name=elasticsearch
    ## 集羣節點地址列表,用逗號分隔
    elasticsearch.cluster-nodes=ip:9300
    ## 是否嗅探集羣狀態(默認false)
    elasticsearch.sniff=true
    ## 是否忽略集羣名認證
    elasticsearch.ignore-cluster-name=true
    ## 等待Ping命令返回結果的時間(默認5秒)
    elasticsearch.ping-timeout=5
    ## 節點之間互相ping、檢測的時間間隔
    elasticsearch.nodes-sampler-interval=5

啓動時初始化客戶端:服務器

  • 配置注意點網絡

    client.transport.sniff這個配置須要額外關注。當打開這個配置的時候,客戶端會去嗅探(sniff)整個集羣的狀態。此時若是集羣中有其餘的機器加入進來,客戶端會自動發現這個加入集羣的機器,不用手動設置就能夠自動鏈接到客戶端。併發

    此時注意集羣必須在同一個網段,他默認5秒會自動檢測一次,加入新節點,去掉壞節點。

  • 配置可能出錯點

    當sniff開關打開的時候,若是報了這個錯誤:None of the configured nodes are available: [{#transport#-1}{VTss4h8SRsCpP6EW5PoLrQ}

    這時候就要檢查下網絡,若是你的客戶端的網絡是內網,而後又通過外網去訪問了內網的ES,這時候就會報這個錯誤!!!

請看官文所說:

官方文檔: https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.4/transport-client.html

  • ES操做
  1. 增長索引

    // 新增
    IndexResponse response = client.prepareIndex(indexName, indexType, id)
            .setSource(entity, XContentType.JSON).execute().actionGet();
    // 校驗
    if (!response.status().equals(RestStatus.CREATED)) {
        // ...
    }

注意: 若是字段爲Null,Es字段欄並不會增長該字段。這裏能夠將value爲Null的過濾成空字符串便可!

  1. 根據Id刪除索引

    // 刪除
    DeleteResponse response = client.prepareDelete(indexName, indexType, id).get();
    if (!response.status().equals(RestStatus.OK)) {
        // ...
    }
  2. 根據Id查詢索引

    // 查詢
        GetResponse response = client.prepareGet(indexName, indexType, id)
                .setOperationThreaded(false)
                .get();
        if (StringUtils.hasText(response.getSourceAsString())) {
            // ...
        }

注意: setOperationThreaded(false) ,先來看官方文檔

他的意思是默認是true。當true的時候他將用其餘的線程去查詢,當false的時候他會用該調用線程去查詢。可是不論是哪一種方法,這個API都是用異步實現的。

  1. 更新索引

    UpdateRequest updateRequest = new UpdateRequest();
    updateRequest.index(indexName);
    updateRequest.type(indexType);
    updateRequest.id(id);
    // 加版本號控制
    updateRequest.version(data.getVersion());
    updateRequest.doc(entity, XContentType.JSON);
    UpdateResponse response = client.update(updateRequest).get();
    if (!response.status().equals(RestStatus.OK)) {
       // ....
    }

每一次更新數據,這一行數據的version就會加1;若是每次更新相同的數據,版本並不會增長!

  1. 批量新增

    // 構建
    BulkRequestBuilder bulkRequest = client.prepareBulk();
    for (String entity : list) {
        bulkRequest.add(client.prepareIndex(indexName, indexType, JSON.parseObject(entity).get("id").toString())
                .setSource(entity, XContentType.JSON));
    }
    BulkResponse bulkResponse = bulkRequest.get();
    // 判斷
    if (bulkResponse.hasFailures()) {
         // ....
    }

因爲BulkRequest會將數據保存內存中,因此批量插入的數據量不能過大,通常是在1000到5000條之間。大小通常在5到15MB之間。具體提數值要在本身項目的環境中不斷嘗試,每一個環境都會不同!

bulk引發的異常:https://www.jianshu.com/p/d4f7a6d58008 能夠看下這篇文章,對於優化很是有幫助!

  1. 批量處理器

官方文檔地址:https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.6/java-docs-bulk-processor.html

上述參數解釋

BulkActions:一次性提交的請求數,默認1000
    BulkSize:達到多少文件大小後提交請求,默認5MB
    FlushInterval:每多少時間提交一次請求,默認不設置。當打開這個配置的時候,BulkActions和BulkSize都要靠邊
    ConcurrentRequests:支持用多少併發來請求,默認1
    BackoffPolicy:重試策略,默認重試8次,啓動延遲時間50ms,總等待時間5.1s

同時注意的是ConcurrentRequests若爲0的時候,他接受請求和發出請求是同步的。當不爲0的時候,接受和發出請求是異步的!

在使用過程當中總結出來推薦使用單例,對服務資源狀態佔用較佳。同時注意當使用成單例時,不要使用完對其進行close操做!

7.總條數查詢

// 查詢
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        // 構建
        buildQueryBuilder(boolQueryBuilder, json);
        SearchHits searchHits = client.prepareSearch(indexName).setTypes(indexType)
                .setQuery(boolQueryBuilder)
                .setSize(0)
                .get().getHits();

只要將查詢的條數設置爲0,他默認會將搜索類型search_type改成count,他會跳過fetch階段,對只須要獲取查詢總條數的場景很是實用。能用極低的內存,不用查詢出真實數據來得知數據總條數。

  1. 分頁查詢

    // 查詢
         BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
         // 構建
         buildQueryBuilder(boolQueryBuilder, json);
         int formSize = (pageDTO.getPageNum() - 1) * pageDTO.getPageSize();
         SearchRequestBuilder searchRequestBuilder = client.prepareSearch(indexName).setTypes(indexType)
                 .setQuery(boolQueryBuilder)
                 .setFrom(formSize)
                 .setSize(pageDTO.getPageSize());
         // 結果
         SearchHits searchHits = searchRequestBuilder.get().getHits();
         if (searchHits.getTotalHits() == 0) {
             ....
         }

特別注意的是這裏雖然是分頁查詢,沒有全查。可是ES的機制就是會將全部的數據都查出來,而後再按傳進來的分信息頁來切割數據返還給你。因此換句話來講,若是總數據有10條,每頁1條,無論你取第一頁仍是最後一頁,都是要將這10條數據查出來。

同時單次查詢最大的數據量取決於ES的Index.max_result_window的這個配置。默認是10000.也就是說若是單次查詢超過了這個數,就會報錯。這就是暴露ES的一個問題就是他對深度分頁搜索支持性很低,越深度的分頁查詢很依賴max_result_window參數的上限。可是又不能無限加大這個值,這個值越大ES在查詢大量數據所佔內存就會越大。因此我本身也找了3種解決方案。

  1. 滾動查詢

官網地址:https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.6/java-search-scrolling.html

// 第一次查詢
    SearchResponse scrollResp = searchRequestBuilder
            .setScroll(new TimeValue(60000)))
            .setSize(10000).get();
    // 開始滾動查詢
    List<String> resultList = new ArrayList<>();
    do {
        for (SearchHit hit : scrollResp.getHits().getHits()) {
            resultList.add(hit.getSourceAsString());
        }
        // 繼續滾動
        scrollResp = client.prepareSearchScroll(scrollResp.getScrollId())
                .setScroll(new TimeValue(60000)
                .execute().actionGet();
    } while(scrollResp.getHits().getHits().length != 0);

滾動查詢不太適合實時查詢,比較適合後臺大批量的查詢或者導出使用。

他至關於先查了一次,es會將全部知足條件的數據緩存起來,保存一個視圖快照,而後接下去咱們只要拿着第一次查詢的ScrollId爲憑證,不斷的去es查詢接下來的數據,直到全部數據查完。因爲咱們是從快照取數據,所以就算咱們在滾動取數據的過程當中有新的數據增長和更新也不會對咱們這次操做有影響。

注意的是雖然咱們能夠用滾動查詢查詢出全部的數據,可是單次取得數據量仍是和max_result_window配置是掛鉤的哦!

  1. 滾動查詢升級版

上面的滾動查詢已經速度不錯了,可是若是使用scroll-scan的話,查詢速度會更快!!可是缺點就是這種查詢ES不能進行排序,只能查出來以後咱們本身進行排序。

scroll-scan未完待續。。。

如有錯誤,懇請指正,萬分感謝!!!

相關文章
相關標籤/搜索