ElasticSearch Java API使用

感謝全科的ElasticSearch講解,大部分來源於此java

ElasticSearch

MySQL與ElasticSearch的比較

MySQL ElasticSearch
Database(數據庫) Index(索引)
Table(表) Type(類型)
Row(行) Document(文檔)
Column(列) Field(字段)
Schema(方案) Mapping(映射)
Index(索引) Everthing Indexed by default(全部字段都被索引)
SQL(結構化查詢語言) Query DSL(查詢專用語言)

Document APIs

Index API

Index API 容許咱們存儲一個JSON格式的文檔,使得數據能夠被搜索到。文檔經過index、type、id惟一肯定。id能夠本身提供一個ID,也可使用Index API爲咱們生成一個。git

有四種不一樣的方式來產生JSON格式的文檔(document)github

  • 手動方式,使用原生的byte[]或者String
  • 使用Map方式,會自動轉換成與之等價的JSON
  • 使用第三方庫來生成序列化beans,如JackJSON、FastJSON等
  • 使用內置的幫助類XContentFactory.jsonBuilder()

手動方式

/** * 手動方式 * @throws UnknownHostException */
    @Test
    public void JsonDocument() throws UnknownHostException {
        String json = "{" +
                "\"user\":\"deepredapple\"," +
                "\"postDate\":\"2018-01-30\"," +
                "\"message\":\"trying out Elasticsearch\"" +
                "}";
        IndexResponse indexResponse = client.prepareIndex("fendo", "fendodate").setSource(json).get();
        System.out.println(indexResponse.getResult());
    }
複製代碼

Map方式

/** * Map方式 */
    @Test
    public void MapDocument() {
        Map<String, Object> json = new HashMap<String, Object>();
        json.put("user", "hhh");
        json.put("postDate", "2018-06-28");
        json.put("message", "trying out Elasticsearch");
        IndexResponse response = client.prepareIndex(INDEX, TYPE).setSource(json).get();
        System.out.println(response.getResult());
    }
複製代碼

序列化方式

/** * 使用JACKSON序列化 */
    @Test
    public void JACKSONDocument() throws JsonProcessingException {
        Blog blog = new Blog();
        blog.setUser("123");
        blog.setPostDate("2018-06-29");
        blog.setMessage("try out ElasticSearch");

        ObjectMapper mapper = new ObjectMapper();
        byte[] bytes = mapper.writeValueAsBytes(blog);
        IndexResponse response = client.prepareIndex(INDEX, TYPE).setSource(bytes).get();
        System.out.println(response.getResult());
    }
複製代碼

XContentBuilder幫助類方式

/** * 使用XContentBuilder幫助類方式 */
    @Test
    public void XContentBuilderDocument() throws IOException {
        XContentBuilder builder = XContentFactory.jsonBuilder().startObject()
                .field("user", "xcontentdocument")
                .field("postDate", "2018-06-30")
                .field("message", "this is ElasticSearch").endObject();
        IndexResponse response = client.prepareIndex(INDEX, TYPE).setSource(builder).get();
        System.out.println(response.getResult());
    }
複製代碼

綜合示例

package com.deepredapple.es.document;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

/** * @author DeepRedApple */
public class TestClient {

    TransportClient client = null;

    public static final String INDEX = "fendo";

    public static final String TYPE = "fendodate";

    @Before
    public void beforeClient() throws UnknownHostException {
        client = new PreBuiltTransportClient(Settings.EMPTY)
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
    }

    /** * 手動方式 * @throws UnknownHostException */
    @Test
    public void JsonDocument() throws UnknownHostException {
        String json = "{" +
                "\"user\":\"deepredapple\"," +
                "\"postDate\":\"2018-01-30\"," +
                "\"message\":\"trying out Elasticsearch\"" +
                "}";
        IndexResponse indexResponse = client.prepareIndex(INDEX, TYPE).setSource(json).get();
        System.out.println(indexResponse.getResult());
    }

    /** * Map方式 */
    @Test
    public void MapDocument() {
        Map<String, Object> json = new HashMap<String, Object>();
        json.put("user", "hhh");
        json.put("postDate", "2018-06-28");
        json.put("message", "trying out Elasticsearch");
        IndexResponse response = client.prepareIndex(INDEX, TYPE).setSource(json).get();
        System.out.println(response.getResult());
    }

    /** * 使用JACKSON序列化 */
    @Test
    public void JACKSONDocument() throws JsonProcessingException {
        Blog blog = new Blog();
        blog.setUser("123");
        blog.setPostDate("2018-06-29");
        blog.setMessage("try out ElasticSearch");

        ObjectMapper mapper = new ObjectMapper();
        byte[] bytes = mapper.writeValueAsBytes(blog);
        IndexResponse response = client.prepareIndex(INDEX, TYPE).setSource(bytes).get();
        System.out.println(response.getResult());
    }

    /** * 使用XContentBuilder幫助類方式 */
    @Test
    public void XContentBuilderDocument() throws IOException {
        XContentBuilder builder = XContentFactory.jsonBuilder().startObject()
                .field("user", "xcontentdocument")
                .field("postDate", "2018-06-30")
                .field("message", "this is ElasticSearch").endObject();
        IndexResponse response = client.prepareIndex(INDEX, TYPE).setSource(builder).get();
        System.out.println(response.getResult());
    }

}
複製代碼

Get API

get API 能夠經過id查看文檔正則表達式

GetResponse getResponse = client.prepareGet(INDEX, TYPE, "AWRJCXMhro3r8sDxIpir").get();
複製代碼

參數分別爲索引、類型、_id數據庫

配置線程

setOperationThreaded設置爲true是在不一樣的線程裏執行此操做json

/** * Get API */
    @Test
    public void testGetApi() {
        GetResponse getResponse = client.prepareGet(INDEX, TYPE, "AWRJCXMhro3r8sDxIpir").setOperationThreaded(false).get();
        Map<String, Object> map = getResponse.getSource();
        Set<String> keySet = map.keySet();
        for (String str : keySet) {
            Object o = map.get(str);
            System.out.println(o.toString());
        }
    }
複製代碼

Delete API

根據ID刪除:併發

DeleteResponse deleteResponse = client.prepareDelete(INDEX, TYPE, "AWRJCXMhro3r8sDxIpir").get();
複製代碼

參數爲索引、類型、_idapp

配置線程

setOperationThreaded設置爲true是在不一樣的線程裏執行此操做框架

/** * deleteAPI */
    @Test
    public void testDeleteAPI() {
        GetResponse getResponse = client.prepareGet(INDEX, TYPE, "AWRJCXMhro3r8sDxIpir").setOperationThreaded(false).get();
        System.out.println(getResponse.getSource());
        DeleteResponse deleteResponse = client.prepareDelete(INDEX, TYPE, "AWRJCXMhro3r8sDxIpir").get();
        System.out.println(deleteResponse.getResult());
    }
複製代碼

Delete By Query API

經過查詢條件刪除異步

/** * 經過查詢條件刪除 */
    @Test
    public void deleteByQuery() {
        BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
                .filter(QueryBuilders.matchQuery("user", "hhh")) //查詢條件
                .source(INDEX).get();//索引名
        long deleted = response.getDeleted();//刪除文檔數量
        System.out.println(deleted);
    }
複製代碼

參數說明 QueryBuilders.matchQuery("user", "hhh") 的參數爲字段和查詢條件,source(INDEX)參數爲索引名

異步回調

當執行的刪除的時間過長時,可使用異步回調的方式執行刪除操做,執行的結果在回調裏面獲取

/** * 回調的方式執行刪除 適合大數據量的刪除操做 */
    @Test
    public void DeleteByQueryAsync() {
        for (int i = 1300; i < 3000; i++) {
            DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
                .filter(QueryBuilders.matchQuery("user", "hhh " + i))
                .source(INDEX)
                .execute(new ActionListener<BulkByScrollResponse>() {
                    public void onResponse(BulkByScrollResponse response) {
                        long deleted = response.getDeleted();
                        System.out.println("刪除的文檔數量爲= "+deleted);
                    }

                    public void onFailure(Exception e) {
                        System.out.println("Failure");
                    }
                });
        }
    }
複製代碼

當程序中止時,在ElasticSearch的控制檯依舊在執行刪除操做,異步的執行操做

監聽回調方法是execute方法

.execute(new ActionListener<BulkByScrollResponse>() { //回調方法
      public void onResponse(BulkByScrollResponse response) {
        long deleted = response.getDeleted();
        System.out.println("刪除的文檔數量爲= "+deleted);
      }

      public void onFailure(Exception e) {
        System.out.println("Failure");
      }
    });
複製代碼

Update API

更新索引

主要有兩種方法進行更新操做

  • 建立UpdateRequest,經過client發送
  • 使用prepareUpdate()方法。

使用UpdateRequest

/** * 使用UpdateRequest進行更新 */
    @Test
    public void testUpdateAPI() throws IOException, ExecutionException, InterruptedException {
        UpdateRequest updateRequest = new UpdateRequest();
        updateRequest.index(INDEX);
        updateRequest.type(TYPE);
        updateRequest.id("AWRFv-yAro3r8sDxIpib");
        updateRequest.doc(jsonBuilder()
                .startObject()
                    .field("user", "hhh")
                .endObject());
        client.update(updateRequest).get();
    }
複製代碼

使用prepareUpdate()

/** * 使用PrepareUpdate */
    @Test
    public void testUpdatePrepareUpdate() throws IOException {
        client.prepareUpdate(INDEX, TYPE, "AWRFvA7k0udstXU4tl60")
                .setScript(new Script("ctx._source.user = \"DeepRedApple\"")).get();
        client.prepareUpdate(INDEX, TYPE, "AWRFvA7k0udstXU4tl60")
                .setDoc(jsonBuilder()
                .startObject()
                    .field("user", "DeepRedApple")
                .endObject()).get();
    }
複製代碼

client.prepareUpdate中的setScript方法不一樣的版本的參數不一樣,這裏直接傳入值,也能夠直接插入文件存儲的腳本,而後直接執行腳本里面的數據進行更新操做。

Update By Script

使用腳本更新文檔

/** * 經過腳本更新 */
    @Test
    public void testUpdateByScript() throws ExecutionException, InterruptedException {
        UpdateRequest updateRequest = new UpdateRequest(INDEX, TYPE, "AWRFvLSTro3r8sDxIpia")
                .script(new Script("ctx._source.user = \"LZH\""));
        client.update(updateRequest).get();
    }
複製代碼

Upsert

更新文檔,若是存在文檔就更新,若是不存在就插入

/** * 更新文檔 若是存在更新,不然插入 */
    @Test
    public void testUpsert() throws IOException, ExecutionException, InterruptedException {
        IndexRequest indexRequest = new IndexRequest(INDEX, TYPE, "AWRFvLSTro3r8sDxIp12")
                .source(jsonBuilder()
                    .startObject()
                        .field("user", "hhh")
                        .field("postDate", "2018-02-14")
                        .field("message", "ElasticSearch")
                    .endObject());
        UpdateRequest updateRequest = new UpdateRequest(INDEX, TYPE, "AWRFvLSTro3r8sDxIp12")
                .doc(jsonBuilder()
                    .startObject()
                        .field("user", "LZH")
                    .endObject())
                .upsert(indexRequest); //若是不存在,就增長indexRequest
        client.update(updateRequest).get();
    }
複製代碼

若是參數中的_id存在,即index/type/_id存在,那麼就會執行UpdateRequest,若是index/type/_id不存在,那麼就直接插入

Multi Get API

一次獲取多個文檔,

/** * 一次獲取多個文檔 */
    @Test
    public void TestMultiGetApi() {
        MultiGetResponse responses = client.prepareMultiGet()
                .add(INDEX, TYPE, "AWRFv-yAro3r8sDxIpib") //一個ID的方式
                .add(INDEX, TYPE, "AWRFvA7k0udstXU4tl60", "AWRJA72Uro3r8sDxIpip")//多個ID的方式
                .add("blog", "blog", "AWG9GKCwhg1e21lmGSLH") //從另外一個索引裏面獲取
                .get();
        for (MultiGetItemResponse itemResponse : responses) {
            GetResponse response = itemResponse.getResponse();
            if (response.isExists()) {
                String source = response.getSourceAsString(); //_source
                JSONObject jsonObject = JSON.parseObject(source);
                Set<String> sets = jsonObject.keySet();
                for (String str : sets) {
                    System.out.println("key -> " + str);
                    System.out.println("value -> "+jsonObject.get(str));
                    System.out.println("===============");
                }
            }
        }
    }
複製代碼

Bulk API

Buli API 能夠實現批量插入

/** * 批量插入 */
    @Test
    public void testBulkApi() throws IOException {
        BulkRequestBuilder requestBuilder = client.prepareBulk();
        requestBuilder.add(client.prepareIndex(INDEX, TYPE, "1")
            .setSource(jsonBuilder()
                .startObject()
                    .field("user", "張三")
                    .field("postDate", "2018-05-01")
                    .field("message", "zhangSan message")
                .endObject()));
        requestBuilder.add(client.prepareIndex(INDEX, TYPE, "2")
            .setSource(jsonBuilder()
                .startObject()
                    .field("user", "李四")
                    .field("postDate", "2016-09-10")
                    .field("message", "Lisi message")
                .endObject()));
        BulkResponse bulkResponse = requestBuilder.get();
        if (bulkResponse.hasFailures()) {
            System.out.println("error");
        }
    }
複製代碼

Useing Bulk Processor

使用Bulk Processor,Bulk Processor提供了一個簡單的接口,在給定的大小的數量上定時批量自動請求

建立Bulk Processor實例

首先建立Bulk Processor實例

/** * 建立Processor實例 */
    @Test
    public void testCreateBulkProcessor() {
        BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
            //調用Bulk以前執行,例如能夠經過request.numberOfActions()方法知道numberOfActions
            public void beforeBulk(long l, BulkRequest request) {
                
            }

            //調用Bulk以後執行,例如能夠經過response.hasFailures()方法知道是否執行失敗
            public void afterBulk(long l, BulkRequest request, BulkResponse response) {
                
            }

            //調用失敗拋出throwable
            public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {

            }
        }).setBulkActions(10000) //每次10000個請求
          .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) //拆成5MB一塊
          .setFlushInterval(TimeValue.timeValueSeconds(5))//不管請求數量多少,每5秒鐘請求一次
          .setConcurrentRequests(1)//設置併發請求的數量。值爲0意味着只容許執行一個請求。值爲1意味着容許1併發請求
          .setBackoffPolicy(
                  BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
                //設置自定義重複請求機制,最開始等待100毫秒,以後成倍增長,重試3次,當一次或者屢次重複請求失敗後由於計算資源不夠拋出EsRejectedExecutionException
                // 異常,能夠經過BackoffPolicy.noBackoff()方法關閉重試機制
          .build();
    }
複製代碼

BulkProcess默認設計

  • bulkActions 1000
  • bulkSize 5mb
  • 不設置flushInterval
  • concurrentRequests爲1,異步執行
  • backoffPolicy重試8次,等待50毫秒
/** * 建立Processor實例 */
    @Test
    public void testCreateBulkProcessor() throws IOException {
        BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
            //調用Bulk以前執行,例如能夠經過request.numberOfActions()方法知道numberOfActions
            public void beforeBulk(long l, BulkRequest request) {

            }

            //調用Bulk以後執行,例如能夠經過response.hasFailures()方法知道是否執行失敗
            public void afterBulk(long l, BulkRequest request, BulkResponse response) {

            }

            //調用失敗拋出throwable
            public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {

            }
        }).setBulkActions(10000) //每次10000個請求
          .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) //拆成5MB一塊
          .setFlushInterval(TimeValue.timeValueSeconds(5))//不管請求數量多少,每5秒鐘請求一次
          .setConcurrentRequests(1)//設置併發請求的數量。值爲0意味着只容許執行一個請求。值爲1意味着容許1併發請求
          .setBackoffPolicy(
                  BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
                //設置自定義重複請求機制,最開始等待100毫秒,以後成倍增長,重試3次,當一次或者屢次重複請求失敗後由於計算資源不夠拋出EsRejectedExecutionException
                // 異常,能夠經過BackoffPolicy.noBackoff()方法關閉重試機制
          .build();

        //增長requests
        bulkProcessor.add(new IndexRequest(INDEX, TYPE, "3").source(
                jsonBuilder()
                        .startObject()
                            .field("user", "王五")
                            .field("postDate", "2019-10-05")
                            .field("message", "wangwu message")
                        .endObject()));
        bulkProcessor.add(new DeleteRequest(INDEX, TYPE, "1"));
        bulkProcessor.flush();
        //關閉bulkProcessor
        bulkProcessor.close();
        client.admin().indices().prepareRefresh().get();
        client.prepareSearch().get();
    }
複製代碼

Search API

搜索API能夠支持搜索查詢,返回查詢匹配的結果,它能夠搜索一個index/type或者多個index/type,可使用Query Java API 做爲查詢條件

Java 默認提供QUERY_AND_FETCH和DFS_QUERY_AND_FETCH兩種search Types,可是這種模式應該由系統選擇,而不是用戶手動指定

​ 實例

@Test
    public void testSearchApi() {
        SearchResponse response = client.prepareSearch(INDEX).setTypes(TYPE)
                .setQuery(QueryBuilders.matchQuery("user", "hhh")).get();
        SearchHit[] hits = response.getHits().getHits();
        for (int i = 0; i < hits.length; i++) {
            String json = hits[i].getSourceAsString();
            JSONObject object = JSON.parseObject(json);
            Set<String> strings = object.keySet();
            for (String str : strings) {
                System.out.println(object.get(str));
            }
        }
    }
複製代碼

Using scrolls in Java

通常的搜索請求都時返回一頁的數據,不管多大的數據量都會返回給用戶,Scrolls API 能夠容許咱們檢索大量的數據(甚至是所有數據)。Scroll API容許咱們作一個初始階段搜索頁而且持續批量從ElasticSearch裏面拉去結果知道結果沒有剩下。Scroll API的建立並非爲了實時的用戶響應,而是爲了處理大量的數據。

/** * 滾動查詢 * @throws ExecutionException * @throws InterruptedException */
  @Test
  public void testScrollApi() throws ExecutionException, InterruptedException {
      MatchQueryBuilder qb = matchQuery("user", "hhh");
      SearchResponse response = client.prepareSearch(INDEX).addSort(FieldSortBuilder.DOC_FIELD_NAME,
              SortOrder.ASC)
              .setScroll(new TimeValue(60000)) //爲了使用scroll,初始搜索請求應該在查詢中指定scroll參數,告訴ElasticSearch須要保持搜索的上下文環境多長時間
              .setQuery(qb)
              .setSize(100).get();
      do {
          for (SearchHit hit : response.getHits().getHits()) {
              String json = hit.getSourceAsString();
              JSONObject object = JSON.parseObject(json);
              Set<String> strings = object.keySet();
              for (String str : strings) {
                  System.out.println(object.get(str));
              }
          }
          response = client.prepareSearchScroll(response.getScrollId()).setScroll(new TimeValue(60000)).execute().get();
      } while (response.getHits().getHits().length != 0);
  }
複製代碼

若是超過滾動時間,繼續使用該滾動ID搜索數據,則會報錯

雖然滾動時間已過,搜索上下文會自動被清除,可是一直保持滾動代價會很大,因此當咱們不在使用滾動時要儘快使用Clear-Scroll API進行清除。

清除滾動ID

ClearScrollRequestBuilder clearBuilder = client.prepareClearScroll();
        clearBuilder.addScrollId(response.getScrollId());
        ClearScrollResponse scrollResponse = clearBuilder.get();
        System.out.println("是否清楚成功:"+scrollResponse.isSucceeded());
複製代碼

MultiSearch API

MultiSearch API 容許在同一個API中執行多個搜索請求。它的端點是_msearch

@Test
    public void testMultiSearchApi() {
        SearchRequestBuilder srb1 = client.prepareSearch().setQuery(QueryBuilders.queryStringQuery("elasticsearch")).setSize(1);
        SearchRequestBuilder srb2 = client.prepareSearch().setQuery(QueryBuilders.matchQuery("user", "hhh")).setSize(1);
        MultiSearchResponse multiSearchResponse = client.prepareMultiSearch().add(srb1).add(srb2).get();
        long nbHits = 0;
        for (MultiSearchResponse.Item item : multiSearchResponse.getResponses()) {
            SearchResponse response = item.getResponse();
            nbHits += response.getHits().getTotalHits();
        }
        System.out.println(nbHits);
    }
複製代碼

Using Aggregations

聚合框架有助於根據搜索查詢提供數據。它是基於簡單的構建塊也稱爲整合,整合就是將複雜的數據摘要有序的放在一塊。聚合能夠被看作是從一組文件中獲取分析信息的一系列工做的統稱。聚合的實現過程就是定義這個文檔集的過程

@Test
    public void testAggregations() {
        SearchResponse searchResponse = client.prepareSearch()
                .setQuery(QueryBuilders.matchAllQuery())
                .addAggregation(AggregationBuilders.terms("LZH").field("user"))
                .addAggregation(AggregationBuilders.dateHistogram("2013-01-30").field("postDate")
                        .dateHistogramInterval(DateHistogramInterval.YEAR)).get();
        Terms lzh = searchResponse.getAggregations().get("user");
        Histogram postDate = searchResponse.getAggregations().get("2013-01-30");

    }
複製代碼

Terminate After

獲取文檔的最大數量,若是設置了,須要經過SearchResponse對象裏面的isTerminatedEarly()判斷返回文檔是否達到設置的數量

@Test
    public void TestTerminate() {
        SearchResponse searchResponse = client.prepareSearch(INDEX)
                .setTerminateAfter(2) //若是達到這個數量,提早終止
                .get();
        if (searchResponse.isTerminatedEarly()) {
            System.out.println(searchResponse.getHits().totalHits);
        }
    }
複製代碼

Aggregations

聚合。ElasticSearch提供完整的Java API來使用聚合。使用AggregationBuilders構建對象,增長到搜索請求中。

SearchResponse response = client.prepareSearch().setQuery(/*查詢*/).addAggregation(/*聚合*/).execute().actionGet();
複製代碼

Structuring aggregations

結構化聚合。

Metrics aggregations

在計算度量類的這類聚合操做是以使用一種方式或者從文檔中提取須要聚合的值爲基礎。

在這中間主要使用的類是**AggregationBuilders**,這裏麪包含了大量的一下的聚合方法調用,直接使用便可

Min Aggregation最小聚合

MinAggregationBuilder aggregation = AggregationBuilders.min("agg").field("age");

    SearchResponse sr = client.prepareSearch("twitter").addAggregation(aggregation).get();
    Min agg = sr.getAggregations().get("agg");
    String value = agg.getValueAsString();//這個統計的是日期,通常用下面方法得到最小值
    System.out.println("min value:" + value);
複製代碼

debug模式下

第一行MinAggregationBuilder的toString()執行的內容以下

{
  "error": "JsonGenerationException[Can not write a field name, expecting a value]"
}
複製代碼
SearchResponse sr = client.prepareSearch("twitter").addAggregation(aggregation).get();
複製代碼

在SearchResponse的toString()的內容以下, 這個內容就是查詢的JSON結果,這裏面的JSON結果的結構與SearchResponse的API操做相配套使用能夠獲取到裏面的每個值。

{
  "took": 1,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 4,
    "max_score": 1.0,
    "hits": [
      {
        "_index": "twitter",
        "_type": "tweet",
        "_id": "10",
        "_score": 1.0,
        "_source": {
          "user": "kimchy",
          "postDate": "2018-06-29T09:10:21.396Z",
          "age": 30,
          "gender": "female",
          "message": "trying out Elasticsearch"
        }
      },
      {
        "_index": "twitter",
        "_type": "tweet",
        "_id": "2",
        "_score": 1.0,
        "_source": {
          "user": "kimchy",
          "postDate": "2018-06-29T09:05:33.943Z",
          "age": 20,
          "gender": "female",
          "message": "trying out Elasticsearch"
        }
      },
      {
        "_index": "twitter",
        "_type": "tweet",
        "_id": "1",
        "_score": 1.0,
        "_source": {
          "user": "kimchy",
          "postDate": "2018-06-29T08:59:00.191Z",
          "age": 10,
          "gender": "male",
          "message": "trying out Elasticsearch"
        }
      },
      {
        "_index": "twitter",
        "_type": "tweet",
        "_id": "11",
        "_score": 1.0,
        "_source": {
          "user": "kimchy",
          "postDate": "2018-06-29T09:10:54.386Z",
          "age": 30,
          "gender": "female",
          "message": "trying out Elasticsearch"
        }
      }
    ]
  },
  "aggregations": {
    "agg": {
      "value": 10.0
    }
  }
}
複製代碼

經過觀察能夠發現sr.getAggregations().get("agg");方法就是獲取其中的聚合統計的數據,其中整個代碼中的參數agg能夠自定義

Max Aggregation最大聚合

MaxAggregationBuilder aggregation = AggregationBuilders.max("agg").field("readSize");

    SearchResponse sr = client.prepareSearch("blog").addAggregation(aggregation).get();
    Max agg = sr.getAggregations().get("agg");
    String value = agg.getValueAsString();

    System.out.println("max value:" + value);
複製代碼

具體分析方法如Min Aggregation聚合同樣,可是不能統計出是哪一條數據的最大最小值

Sum Aggregation求和聚合

SumAggregationBuilder aggregation = AggregationBuilders.sum("agg").field("readSize");

    SearchResponse sr = client.prepareSearch("blog").addAggregation(aggregation).get();
    Sum agg = sr.getAggregations().get("agg");
    String value = agg.getValueAsString();

    System.out.println("sum value:" + value);
複製代碼

Avg Aggregation平均值聚合

AvgAggregationBuilder aggregation = AggregationBuilders.avg("agg").field("age");
SearchResponse searchResponse = client.prepareSearch("twitter").addAggregation(aggregation).get();
Avg avg = searchResponse.getAggregations().get("agg");
String value = avg.getValueAsString();
System.out.println("avg value: "+ value);
複製代碼

Stats Aggreagtin統計聚合

統計聚合——基於文檔的某個值,計算出一些統計信息(min、max、sum、count、avg), 用於計算的值能夠是特定的數值型字段,也能夠經過腳本計算而來。

StatsAggregationBuilder aggregation = AggregationBuilders.stats("agg").field("age");
        SearchResponse searchResponse = client.prepareSearch("twitter").addAggregation(aggregation).get();
        Stats stats = searchResponse.getAggregations().get("agg");
        String max = stats.getMaxAsString();
        String min = stats.getMinAsString();
        String avg = stats.getAvgAsString();
        String sum = stats.getSumAsString();
        long count = stats.getCount();
        System.out.println("max value: "+max);
        System.out.println("min value: "+min);
        System.out.println("avg value: "+avg);
        System.out.println("sum value: "+sum);
        System.out.println("count value: "+count);
複製代碼

這個聚合統計能夠統計出上面的日常的統計值。當須要統計上面的大部分的值時,可使用這種方式

Extended Stats Aggregation擴展統計聚合

擴展統計聚合——基於文檔的某個值,計算出一些統計信息(比普通的stats聚合多了sum_of_squares、variance、std_deviation、std_deviation_bounds),用於計算的值能夠是特定的數值型字段,也能夠經過腳本計算而來。主要的結果值就是最大、最小、方差、平方差等統計值

ExtendedStatsAggregationBuilder aggregation = AggregationBuilders.extendedStats("agg").field("age");
        SearchResponse response = client.prepareSearch("twitter").addAggregation(aggregation).get();
        ExtendedStats extended = response.getAggregations().get("agg");
        String max = extended.getMaxAsString();
        String min = extended.getMinAsString();
        String avg = extended.getAvgAsString();
        String sum = extended.getSumAsString();
        long count = extended.getCount();
        double stdDeviation = extended.getStdDeviation();
        double sumOfSquares = extended.getSumOfSquares();
        double variance = extended.getVariance();
        System.out.println("max value: " +max);
        System.out.println("min value: " +min);
        System.out.println("avg value: " +avg);
        System.out.println("sum value: " +sum);
        System.out.println("count value: " +count);
        System.out.println("stdDeviation value: " +stdDeviation);
        System.out.println("sumOfSquares value: " +sumOfSquares);
        System.out.println("variance value: "+variance);
複製代碼

Value Count Aggregation值計數聚合

值計數聚合——計算聚合文檔中某個值的個數, 用於計算的值能夠是特定的數值型字段,也能夠經過腳本計算而來。該聚合通常域其它 single-value 聚合聯合使用,好比在計算一個字段的平均值的時候,可能還會關注這個平均值是由多少個值計算而來。

ValueCountAggregationBuilder aggregation = AggregationBuilders.count("agg").field("age");
SearchResponse response = client.prepareSearch("twitter").addAggregation(aggregation).get();
ValueCount count = response.getAggregations().get("agg");
long value = count.getValue();
System.out.println("ValueCount value: " +value);
複製代碼

Precentile Aggregation百分百聚合

PercentilesAggregationBuilder aggregation = AggregationBuilders.percentiles("agg").field("age");
    SearchResponse response = client.prepareSearch("twitter").addAggregation(aggregation).get();
    Percentiles agg = response.getAggregations().get("agg");
    for (Percentile entry : agg) {
        double percent = entry.getPercent();
        double value = entry.getValue();
        System.out.println("percent value: " + percent + "value value: " + value);
    }
複製代碼

Cardinality Aggreagion基數聚合

去除重複的個數的基數

CardinalityAggregationBuilder aggregation = AggregationBuilders.cardinality("agg").field("age");
SearchResponse response = client.prepareSearch("twitter").addAggregation(aggregation).get();
Cardinality agg = response.getAggregations().get("agg");
long value = agg.getValue();
System.out.println("value value: "+ value);
複製代碼

Top Hits Aggregation最高匹配權值聚合

查詢出匹配的文檔的字段的個數

TermsAggregationBuilder aggregation = AggregationBuilders.terms("agg").field("gender.keyword")
.subAggregation(AggregationBuilders.topHits("top").explain(true).size(1).from(10));
SearchResponse response = client.prepareSearch("twitter").addAggregation(aggregation).get();
Terms agg = response.getAggregations().get("agg");
        for (Terms.Bucket bucket : agg.getBuckets()) {
            String key = (String) bucket.getKey();
            long docCount = bucket.getDocCount();
            System.out.println("key value: " + key + " docCount value: " + docCount);
            TopHits topHits = bucket.getAggregations().get("top");
            for (SearchHit searchHitFields : topHits.getHits().getHits()) {
                System.out.println("id value: " + searchHitFields.getId() + " source value: " + searchHitFields.getSourceAsString());
            }
        }
複製代碼

Bucket aggregations

Global Aggregation全局聚合

查詢全局的一個數量統計

AggregationBuilder aggregation = AggregationBuilders
                .global("agg")
                .subAggregation(
                        AggregationBuilders.terms("users").field("user.keyword")
                );

        SearchResponse sr = client.prepareSearch("twitter")
                .addAggregation(aggregation)
                .get();
        System.out.println(sr);
        Global agg = sr.getAggregations().get("agg");
        long count = agg.getDocCount(); // Doc count

        System.out.println("global count:" + count);
複製代碼

Filter Aggreagion過濾聚合

過濾統計

AggregationBuilder aggregation = AggregationBuilders.filters("aaa", new FiltersAggregator.KeyedFilter("men", QueryBuilders.termQuery("gender", "male")));

SearchResponse sr = client.prepareSearch("twitter").setTypes("tweet").addAggregation(aggregation).get();
Filters agg = sr.getAggregations().get("aaa");
for (Filters.Bucket entry : agg.getBuckets()) {
  String key = entry.getKeyAsString();            // bucket key
  long docCount = entry.getDocCount();            // Doc count

  System.out.println("global " + key + " count:" + docCount);
}
複製代碼

Filters Aggregation多過濾聚合

多個條件過濾,查詢出個數

AggregationBuilder aggregation = AggregationBuilders.filters("aaa",new FiltersAggregator.KeyedFilter("men", QueryBuilders.termQuery("gender", "male")),new FiltersAggregator.KeyedFilter("women", QueryBuilders.termQuery("gender", "female")));

SearchResponse sr = client.prepareSearch("twitter").setTypes("tweet").addAggregation(aggregation).get();
Filters agg = sr.getAggregations().get("aaa");
for (Filters.Bucket entry : agg.getBuckets()) {
  String key = entry.getKeyAsString();            // bucket key
  long docCount = entry.getDocCount();            // Doc count

  System.out.println("global " + key + " count:" + docCount);
}
複製代碼

Missing Aggregation基於字段數據的單桶聚合

Nested Aggregation嵌套類型聚合

Reverse nested Aggregation

Children Aggregation

Terms Aggregation詞元聚合

TermsAggregationBuilder fieldAggregation = AggregationBuilders.terms("genders").field("gender.keyword")
                .order(Terms.Order.term(true));
        SearchResponse response = client.prepareSearch("twitter").setTypes("tweet").addAggregation(fieldAggregation).get();

        Terms terms = response.getAggregations().get("genders");
        for (Terms.Bucket bucket : terms.getBuckets()) {
            System.out.println("key value: " + bucket.getKey());
            System.out.println("docCount value: " + bucket.getDocCount());
        }
複製代碼

Order排序

TermsAggregationBuilder fieldAggregation = AggregationBuilders.terms("genders").field("gender.keyword")
                .order(Terms.Order.term(true));
複製代碼

Significant Terms Aggregation

Range Aggregation範圍聚合

Date Range Aggregation日期聚合

Ip Range Aggregation Ip範圍聚合

Histogram Aggregation直方圖聚合

Date Histogram Aggregation日期範圍直方圖聚合

Geo Distance Aggregation地理距離聚合

Geo Hash Grid Aggregation GeoHash網格聚合

Query DSL

Match All Query

匹配全部文檔

QueryBuilder qb = matchAllQuery();
複製代碼

Full text Query

match Query 匹配查詢

模糊匹配和字段詞組查詢

QueryBuilder qb = matchQuery("gender", "female");
複製代碼

multi_mathc query 多字段查詢

多個字段進行查詢,字段能夠有多個

QueryBuilder qb = multiMatchQuery("female","gender", "message");
複製代碼

common_terms query經常使用術語查詢

對一些比較專業的偏門詞語進行更加專業的查詢

QueryBuilder qb = commonTermsQuery("gender","female");
複製代碼

query_string query查詢語句查詢

一種與Lucene查詢語法結合的查詢,容許使用特殊條件去查詢(AND|OR|NOT)

QueryBuilder qb = queryStringQuery("+male -female");
複製代碼

simple_string query簡單查詢語句

一種簡單的查詢語法

QueryBuilder qb = queryStringQuery("+male -female");
複製代碼

Term level Query

Term Query項查詢

在指定字段中查詢確切的值的文檔

QueryBuilder qb = termQuery("gender","male");
複製代碼

Terms Query多項查詢

查詢一個字段內的多個確切的值

QueryBuilder qb = termsQuery("age","10", "20");
複製代碼

Range Query範圍查詢

範圍查詢

  • gte():範圍查詢將匹配字段值大於或等於此參數值的文檔
  • gt():範圍查詢將匹配字段值大於此參數值的文檔
  • lte():範圍查詢將匹配字段值小於或等於此參數值的文檔
  • lt():範圍查詢將匹配字段值小於此參數值的文檔
  • from()開始值to()結果值,這兩個函數與includeLower()和includeUpper()函數配套使用
  • includeLower(true)表示from()查詢將匹配字段值大於或等於此參數值的文檔
  • includeLower(false)表示from()查詢將匹配字段值大於此參數值的文檔
  • includeUpper(true)表示to()查詢將匹配字段值小於或等於此參數值的文檔
  • includeUpper(false)表示to()查詢將匹配字段值小於此參數值的文檔
QueryBuilder qb = QueryBuilders.rangeQuery("age").gte(10).includeLower(true).lte(20).includeUpper(true);
複製代碼

其中,includeLower()和includeUpper()方法表示這個範圍是否包含查詢

Exists Query存在查詢

根據指定的字段名查詢是否存在

QueryBuilder qb = existsQuery("user");
複製代碼

Prefix Query前綴查詢

根據指定字段名和指定精確前綴進行查詢

QueryBuilder qb = prefixQuery("gender","m");
複製代碼

Wildcard Query通配符查詢

通配符查詢,指定字段名和通配符。其中?表示單字符通配符,*表示多字符通配符。通配符查詢的字段都是未通過分析的字段

QueryBuilder qb = wildcardQuery("gender","f?*");
複製代碼

Regexp Query正則表達式查詢

根據指定字段名和正則表達式進行查詢。查詢的字段也是未通過分析的字段

QueryBuilder qb = regexpQuery("gender","f.*");
複製代碼

Fuzzy Query模糊查詢

模糊查詢:指定的確切的字段名和拼寫錯誤的查詢內容

QueryBuilder qb = fuzzyQuery("gender","mala").fuzziness(Fuzziness.ONE);
複製代碼

Type Query類型查詢

查詢指定類型的文檔

QueryBuilder qb = typeQuery("tweet");
複製代碼

Ids Query ID查詢

根據type類型和ID查詢,type類型能夠不寫

QueryBuilder qb = idsQuery("tweet").addIds("1", "11");
複製代碼
相關文章
相關標籤/搜索