Elasticsearch系列---Java客戶端代碼Demo

前言

前面歷經33篇內容的講解,與ES的請求操做都是在Kibana平臺上用Restful請求完成的,一直沒發佈Java或python的客戶端代碼,Restful纔是運用、理解ES核心功能最直接的表達方式,但實際項目中確定是以Java/python來完成ES請求的發起與數據處理的,前面理解了ES的核心功能,後面Java API的使用將會很是簡單,剩餘的未覆蓋的功能API,自行查閱文檔便可。html

概要

本篇講解Elasticsearch的客戶端API開發的一些示例,以Java語言爲主,介紹一些最經常使用,最核心的API。java

代碼示例

引入依賴

咱們以maven項目爲例,添加項目依賴node

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>6.3.1</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>transport</artifactId>
    <version>6.3.1</version>
</dependency>
<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
</dependency>
<dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-core</artifactId>
    <version>2.12.1</version>
</dependency>

創建ES鏈接

  1. 建立Settings對象,指定集羣名稱
  2. 建立TransportClient對象,手動指定IP、端口便可
Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build();
        
TransportClient client = new PreBuiltTransportClient(settings).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));

若是集羣的節點數比較多,爲每一個node分別指定IP、Port可行性不高,咱們可使用集羣節點自動探查的功能,代碼以下:python

// 將client.transport.sniff設置爲true便可打開集羣節點自動探查功能
Settings settings = Settings.builder().put("client.transport.sniff", true)..put("cluster.name", "elasticsearch").build();

// 只須要指定一個node就行
TransportClient client = new PreBuiltTransportClient(settings);
transport.addTransportAddress(new TransportAddress(InetAddress.getByName("192.168.17.137"), 9300));

基本CRUD

最基本的CRUD代碼,能夠看成入門demo來寫:mysql

/**
     * 建立員工信息(建立一個document)
     * @param client
     */
    private static void createEmployee(TransportClient client) throws Exception {
        IndexResponse response = client.prepareIndex("company", "employee", "1")
                .setSource(XContentFactory.jsonBuilder()
                        .startObject()
                            .field("name", "jack")
                            .field("age", 27)
                            .field("position", "technique")
                            .field("country", "china")
                            .field("join_date", "2017-01-01")
                            .field("salary", 10000)
                        .endObject())
                .get();
        System.out.println(response.getResult()); 
    }
    
    /**
     * 獲取員工信息
     * @param client
     * @throws Exception
     */
    private static void getEmployee(TransportClient client) throws Exception {
        GetResponse response = client.prepareGet("company", "employee", "1").get();
        System.out.println(response.getSourceAsString()); 
    }
    
    /**
     * 修改員工信息
     * @param client
     * @throws Exception
     */
    private static void updateEmployee(TransportClient client) throws Exception {
        UpdateResponse response = client.prepareUpdate("company", "employee", "1") 
                .setDoc(XContentFactory.jsonBuilder()
                            .startObject()
                                .field("position", "technique manager")
                            .endObject())
                .get();
        System.out.println(response.getResult());  
     }
    
    /**
     * 刪除 員工信息
     * @param client
     * @throws Exception
     */
    private static void deleteEmployee(TransportClient client) throws Exception {
        DeleteResponse response = client.prepareDelete("company", "employee", "1").get();
        System.out.println(response.getResult());  
    }

搜索

咱們以前使用Restful的搜索,如今改用java實現,原有的Restful示例以下:sql

GET /company/employee/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "position": "technique"
          }
        }
      ],
      "filter": {
        "range": {
          "age": {
            "gte": 30,
            "lte": 40
          }
        }
      }
    }
  },
  "from": 0,
  "size": 1
}

等同於這樣的Java代碼:apache

SearchResponse response = client.prepareSearch("company")
        .setTypes("employee")
        .setQuery(QueryBuilders.termQuery("position", "technique"))                 // Query
        .setPostFilter(QueryBuilders.rangeQuery("age").from(30).to(40))     // Filter
        .setFrom(0).setSize(60)
        .get();

聚合查詢

聚合查詢稍微麻煩一些,請求的封裝和響應報文的解析,都是根據實際返回的結構來作的,例以下面的查詢:json

需求:設計模式

  1. 按照country國家來進行分組
  2. 在每一個country分組內,再按照入職年限進行分組
  3. 最後計算每一個分組內的平均薪資

Restful的請求以下:api

GET /company/employee/_search
{
  "size": 0,
  "aggs": {
    "group_by_country": {
      "terms": {
        "field": "country"
      },
      "aggs": {
        "group_by_join_date": {
          "date_histogram": {
            "field": "join_date",
            "interval": "year"
          },
          "aggs": {
            "avg_salary": {
              "avg": {
                "field": "salary"
              }
            }
          }
        }
      }
    }
  }
}

用Java編寫的請求以下:

SearchResponse sr = node.client().prepareSearch()
    .addAggregation(
        AggregationBuilders.terms("by_country").field("country")
        .subAggregation(AggregationBuilders.dateHistogram("by_year")
            .field("dateOfBirth")
            .dateHistogramInterval(DateHistogramInterval.YEAR)
            .subAggregation(AggregationBuilders.avg("avg_children").field("children"))
        )
    )
    .execute().actionGet();

對響應的處理,則須要一層一層獲取數據:

Map<String, Aggregation> aggrMap = searchResponse.getAggregations().asMap();
    StringTerms groupByCountry = (StringTerms) aggrMap.get("group_by_country");
    Iterator<Bucket> groupByCountryBucketIterator = groupByCountry.getBuckets().iterator();
    
    while(groupByCountryBucketIterator.hasNext()) {
        Bucket groupByCountryBucket = groupByCountryBucketIterator.next();
        
        System.out.println(groupByCountryBucket.getKey() + "\t" + groupByCountryBucket.getDocCount()); 
        
        Histogram groupByJoinDate = (Histogram) groupByCountryBucket.getAggregations().asMap().get("group_by_join_date"); 
        Iterator<org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket> groupByJoinDateBucketIterator = groupByJoinDate.getBuckets().iterator();
         
        while(groupByJoinDateBucketIterator.hasNext()) {
            org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket groupByJoinDateBucket = groupByJoinDateBucketIterator.next();
            
            System.out.println(groupByJoinDateBucket.getKey() + "\t" + groupByJoinDateBucket.getDocCount()); 
            
            Avg avgSalary = (Avg) groupByJoinDateBucket.getAggregations().asMap().get("avg_salary");
            System.out.println(avgSalary.getValue()); 
        }
    }
    
    client.close();
}

upsert請求

private static void upsert(TransportClient transport) {
    try {
        IndexRequest index = new IndexRequest("book_shop", "books", "2").source(
                XContentFactory.jsonBuilder().startObject()
                        .field("name", "mysql從入門到刪庫跑路")
                        .field("tags", "mysql")
                        .field("price", 32.8)
                        .endObject());

        UpdateRequest update = new UpdateRequest("book_shop", "books", "2")
                .doc(XContentFactory.jsonBuilder()
                        .startObject().field("price", 31.8)
                        .endObject())
                .upsert(index);
        UpdateResponse response = transport.update(update).get();
        System.out.println(response.getVersion());
    } catch (IOException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
}

mget請求

public static void mget(TransportClient transport) {
    MultiGetResponse res = transport.prepareMultiGet()
            .add("book_shop", "books", "1")
            .add("book_shop", "books", "2")
            .get();
    for (MultiGetItemResponse item : res.getResponses()) {
        System.out.println(item.getResponse());
    }
}

bulk請求

public static void bulk(TransportClient transport) {
    try {
    BulkRequestBuilder bulk = transport.prepareBulk();
    bulk.add(transport.prepareIndex("book_shop", "books", "3").setSource(
            XContentFactory.jsonBuilder().startObject()
                    .field("name", "設計模式從入門到拷貝代碼")
                    .field("tags", "設計模式")
                    .field("price", 55.9)
                    .endObject()));
        bulk.add(transport.prepareIndex("book_shop", "books", "4").setSource(
                XContentFactory.jsonBuilder().startObject()
                        .field("name", "架構設計從入門到google搜索")
                        .field("tags", "架構設計")
                        .field("price", 68.9)
                        .endObject()));
        bulk.add(transport.prepareUpdate("book_shop", "books", "1").setDoc((XContentFactory.jsonBuilder()
                .startObject().field("price", 32.8)
                .endObject())));

        BulkResponse bulkRes = bulk.get();
        if (bulkRes.hasFailures()) {
            System.out.println("Error...");
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}

scorll請求

public static void scorll(TransportClient client) {
    SearchResponse bookShop = client.prepareSearch("book_shop").setScroll(new TimeValue(60000)).setSize(1).get();

    int batchCnt = 0;
    do {
        // 循環讀取scrollid信息,直到結果爲空
        for(SearchHit hit: bookShop.getHits().getHits()) {
            System.out.println("batchCnt:" + ++batchCnt);
            System.out.println(hit.getSourceAsString());
        }
        bookShop = client.prepareSearchScroll(bookShop.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();
    } while (bookShop.getHits().getHits().length != 0);
}

搜索模板

public static void searchTemplates(TransportClient client) {
    Map<String,Object> params = new HashMap<>(10);
    params.put("from",0);
    params.put("size",10);
    params.put("tags","java");

    SearchTemplateResponse str = new SearchTemplateRequestBuilder(client)
            .setScript("page_query_by_tags")
            .setScriptType(ScriptType.STORED)
            .setScriptParams(params)
            .setRequest(new SearchRequest())
            .get();

    for(SearchHit hit:str.getResponse().getHits().getHits()) {
        System.out.println(hit.getSourceAsString());
    }
}

多條件組合查詢

public static void otherSearch(TransportClient client) {
    SearchResponse response1 = client.prepareSearch("book_shop").setQuery(QueryBuilders.termQuery("tags", "java")).get();
    SearchResponse response2 = client.prepareSearch("book_shop").setQuery(QueryBuilders.multiMatchQuery("32.8", "price","tags")).get();
    SearchResponse response3 = client.prepareSearch("book_shop").setQuery(QueryBuilders.commonTermsQuery("name", "入門")).get();
    SearchResponse response4 = client.prepareSearch("book_shop").setQuery(QueryBuilders.prefixQuery("name", "java")).get();

    System.out.println(response1.getHits().getHits()[0].getSourceAsString());
    System.out.println(response2.getHits().getHits()[0].getSourceAsString());
    System.out.println(response3.getHits().getHits()[0].getSourceAsString());
    System.out.println(response4.getHits().getHits()[0].getSourceAsString());

    // 多個條件組合
    SearchResponse response5 = client.prepareSearch("book_shop").setQuery(QueryBuilders.boolQuery()
            .must(QueryBuilders.termQuery("tags", "java"))
            .mustNot(QueryBuilders.matchQuery("name", "跑路"))
            .should(QueryBuilders.matchQuery("name", "入門"))
            .filter(QueryBuilders.rangeQuery("price").gte(23).lte(55))).get();

    System.out.println(response5.getHits().getHits()[0].getSourceAsString());
}

地理位置查詢

public static void geo(TransportClient client) {
    GeoBoundingBoxQueryBuilder query1 = QueryBuilders.geoBoundingBoxQuery("location").setCorners(23, 112, 21, 114);

    List<GeoPoint> points = new ArrayList<>();
    points.add(new GeoPoint(23,115));
    points.add(new GeoPoint(25,113));
    points.add(new GeoPoint(21,112));
    GeoPolygonQueryBuilder query2 = QueryBuilders.geoPolygonQuery("location",points);

    GeoDistanceQueryBuilder query3 = QueryBuilders.geoDistanceQuery("location").point(22.523375, 113.911231).distance(500, DistanceUnit.METERS);


    SearchResponse response = client.prepareSearch("location").setQuery(query3).get();
    for(SearchHit hit:response.getHits().getHits()) {
        System.out.println(hit.getSourceAsString());
    }
}

小結

上述的那些案例demo,快速瀏覽一下便可,若是已經在開發ES相關的項目,仍是多參考官方的API文檔:https://www.elastic.co/guide/...。上面有很詳盡的API說明和使用Demo

專一Java高併發、分佈式架構,更多技術乾貨分享與心得,請關注公衆號:Java架構社區
能夠掃左邊二維碼添加好友,邀請你加入Java架構社區微信羣共同探討技術
Java架構社區.jpg

相關文章
相關標籤/搜索