感謝全科的ElasticSearch講解,大部分來源於此java
MySQL | ElasticSearch |
---|---|
Database(數據庫) | Index(索引) |
Table(表) | Type(類型) |
Row(行) | Document(文檔) |
Column(列) | Field(字段) |
Schema(方案) | Mapping(映射) |
Index(索引) | Everthing Indexed by default(全部字段都被索引) |
SQL(結構化查詢語言) | Query DSL(查詢專用語言) |
Index API 容許咱們存儲一個JSON格式的文檔,使得數據能夠被搜索到。文檔經過index、type、id惟一肯定。id能夠本身提供一個ID,也可使用Index API爲咱們生成一個。git
有四種不一樣的方式來產生JSON格式的文檔(document)github
/** * 手動方式 * @throws UnknownHostException */
@Test
public void JsonDocument() throws UnknownHostException {
String json = "{" +
"\"user\":\"deepredapple\"," +
"\"postDate\":\"2018-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
IndexResponse indexResponse = client.prepareIndex("fendo", "fendodate").setSource(json).get();
System.out.println(indexResponse.getResult());
}
複製代碼
/** * Map方式 */
@Test
public void MapDocument() {
Map<String, Object> json = new HashMap<String, Object>();
json.put("user", "hhh");
json.put("postDate", "2018-06-28");
json.put("message", "trying out Elasticsearch");
IndexResponse response = client.prepareIndex(INDEX, TYPE).setSource(json).get();
System.out.println(response.getResult());
}
複製代碼
/** * 使用JACKSON序列化 */
@Test
public void JACKSONDocument() throws JsonProcessingException {
Blog blog = new Blog();
blog.setUser("123");
blog.setPostDate("2018-06-29");
blog.setMessage("try out ElasticSearch");
ObjectMapper mapper = new ObjectMapper();
byte[] bytes = mapper.writeValueAsBytes(blog);
IndexResponse response = client.prepareIndex(INDEX, TYPE).setSource(bytes).get();
System.out.println(response.getResult());
}
複製代碼
/** * 使用XContentBuilder幫助類方式 */
@Test
public void XContentBuilderDocument() throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject()
.field("user", "xcontentdocument")
.field("postDate", "2018-06-30")
.field("message", "this is ElasticSearch").endObject();
IndexResponse response = client.prepareIndex(INDEX, TYPE).setSource(builder).get();
System.out.println(response.getResult());
}
複製代碼
package com.deepredapple.es.document;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
/** * @author DeepRedApple */
public class TestClient {
TransportClient client = null;
public static final String INDEX = "fendo";
public static final String TYPE = "fendodate";
@Before
public void beforeClient() throws UnknownHostException {
client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
}
/** * 手動方式 * @throws UnknownHostException */
@Test
public void JsonDocument() throws UnknownHostException {
String json = "{" +
"\"user\":\"deepredapple\"," +
"\"postDate\":\"2018-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
IndexResponse indexResponse = client.prepareIndex(INDEX, TYPE).setSource(json).get();
System.out.println(indexResponse.getResult());
}
/** * Map方式 */
@Test
public void MapDocument() {
Map<String, Object> json = new HashMap<String, Object>();
json.put("user", "hhh");
json.put("postDate", "2018-06-28");
json.put("message", "trying out Elasticsearch");
IndexResponse response = client.prepareIndex(INDEX, TYPE).setSource(json).get();
System.out.println(response.getResult());
}
/** * 使用JACKSON序列化 */
@Test
public void JACKSONDocument() throws JsonProcessingException {
Blog blog = new Blog();
blog.setUser("123");
blog.setPostDate("2018-06-29");
blog.setMessage("try out ElasticSearch");
ObjectMapper mapper = new ObjectMapper();
byte[] bytes = mapper.writeValueAsBytes(blog);
IndexResponse response = client.prepareIndex(INDEX, TYPE).setSource(bytes).get();
System.out.println(response.getResult());
}
/** * 使用XContentBuilder幫助類方式 */
@Test
public void XContentBuilderDocument() throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject()
.field("user", "xcontentdocument")
.field("postDate", "2018-06-30")
.field("message", "this is ElasticSearch").endObject();
IndexResponse response = client.prepareIndex(INDEX, TYPE).setSource(builder).get();
System.out.println(response.getResult());
}
}
複製代碼
get API 能夠經過id查看文檔正則表達式
GetResponse getResponse = client.prepareGet(INDEX, TYPE, "AWRJCXMhro3r8sDxIpir").get();
複製代碼
參數分別爲索引、類型、_id數據庫
setOperationThreaded設置爲true是在不一樣的線程裏執行此操做json
/** * Get API */
@Test
public void testGetApi() {
GetResponse getResponse = client.prepareGet(INDEX, TYPE, "AWRJCXMhro3r8sDxIpir").setOperationThreaded(false).get();
Map<String, Object> map = getResponse.getSource();
Set<String> keySet = map.keySet();
for (String str : keySet) {
Object o = map.get(str);
System.out.println(o.toString());
}
}
複製代碼
根據ID刪除:併發
DeleteResponse deleteResponse = client.prepareDelete(INDEX, TYPE, "AWRJCXMhro3r8sDxIpir").get();
複製代碼
參數爲索引、類型、_idapp
setOperationThreaded設置爲true是在不一樣的線程裏執行此操做框架
/** * deleteAPI */
@Test
public void testDeleteAPI() {
GetResponse getResponse = client.prepareGet(INDEX, TYPE, "AWRJCXMhro3r8sDxIpir").setOperationThreaded(false).get();
System.out.println(getResponse.getSource());
DeleteResponse deleteResponse = client.prepareDelete(INDEX, TYPE, "AWRJCXMhro3r8sDxIpir").get();
System.out.println(deleteResponse.getResult());
}
複製代碼
經過查詢條件刪除異步
/** * 經過查詢條件刪除 */
@Test
public void deleteByQuery() {
BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("user", "hhh")) //查詢條件
.source(INDEX).get();//索引名
long deleted = response.getDeleted();//刪除文檔數量
System.out.println(deleted);
}
複製代碼
參數說明 QueryBuilders.matchQuery("user", "hhh") 的參數爲字段和查詢條件,source(INDEX)參數爲索引名
當執行的刪除的時間過長時,可使用異步回調的方式執行刪除操做,執行的結果在回調裏面獲取
/** * 回調的方式執行刪除 適合大數據量的刪除操做 */
@Test
public void DeleteByQueryAsync() {
for (int i = 1300; i < 3000; i++) {
DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("user", "hhh " + i))
.source(INDEX)
.execute(new ActionListener<BulkByScrollResponse>() {
public void onResponse(BulkByScrollResponse response) {
long deleted = response.getDeleted();
System.out.println("刪除的文檔數量爲= "+deleted);
}
public void onFailure(Exception e) {
System.out.println("Failure");
}
});
}
}
複製代碼
當程序中止時,在ElasticSearch的控制檯依舊在執行刪除操做,異步的執行操做
監聽回調方法是execute方法
.execute(new ActionListener<BulkByScrollResponse>() { //回調方法
public void onResponse(BulkByScrollResponse response) {
long deleted = response.getDeleted();
System.out.println("刪除的文檔數量爲= "+deleted);
}
public void onFailure(Exception e) {
System.out.println("Failure");
}
});
複製代碼
更新索引
主要有兩種方法進行更新操做
/** * 使用UpdateRequest進行更新 */
@Test
public void testUpdateAPI() throws IOException, ExecutionException, InterruptedException {
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index(INDEX);
updateRequest.type(TYPE);
updateRequest.id("AWRFv-yAro3r8sDxIpib");
updateRequest.doc(jsonBuilder()
.startObject()
.field("user", "hhh")
.endObject());
client.update(updateRequest).get();
}
複製代碼
/** * 使用PrepareUpdate */
@Test
public void testUpdatePrepareUpdate() throws IOException {
client.prepareUpdate(INDEX, TYPE, "AWRFvA7k0udstXU4tl60")
.setScript(new Script("ctx._source.user = \"DeepRedApple\"")).get();
client.prepareUpdate(INDEX, TYPE, "AWRFvA7k0udstXU4tl60")
.setDoc(jsonBuilder()
.startObject()
.field("user", "DeepRedApple")
.endObject()).get();
}
複製代碼
client.prepareUpdate中的setScript方法不一樣的版本的參數不一樣,這裏直接傳入值,也能夠直接插入文件存儲的腳本,而後直接執行腳本里面的數據進行更新操做。
使用腳本更新文檔
/** * 經過腳本更新 */
@Test
public void testUpdateByScript() throws ExecutionException, InterruptedException {
UpdateRequest updateRequest = new UpdateRequest(INDEX, TYPE, "AWRFvLSTro3r8sDxIpia")
.script(new Script("ctx._source.user = \"LZH\""));
client.update(updateRequest).get();
}
複製代碼
更新文檔,若是存在文檔就更新,若是不存在就插入
/** * 更新文檔 若是存在更新,不然插入 */
@Test
public void testUpsert() throws IOException, ExecutionException, InterruptedException {
IndexRequest indexRequest = new IndexRequest(INDEX, TYPE, "AWRFvLSTro3r8sDxIp12")
.source(jsonBuilder()
.startObject()
.field("user", "hhh")
.field("postDate", "2018-02-14")
.field("message", "ElasticSearch")
.endObject());
UpdateRequest updateRequest = new UpdateRequest(INDEX, TYPE, "AWRFvLSTro3r8sDxIp12")
.doc(jsonBuilder()
.startObject()
.field("user", "LZH")
.endObject())
.upsert(indexRequest); //若是不存在,就增長indexRequest
client.update(updateRequest).get();
}
複製代碼
若是參數中的_id存在,即index/type/_id存在,那麼就會執行UpdateRequest,若是index/type/_id不存在,那麼就直接插入
一次獲取多個文檔,
/** * 一次獲取多個文檔 */
@Test
public void TestMultiGetApi() {
MultiGetResponse responses = client.prepareMultiGet()
.add(INDEX, TYPE, "AWRFv-yAro3r8sDxIpib") //一個ID的方式
.add(INDEX, TYPE, "AWRFvA7k0udstXU4tl60", "AWRJA72Uro3r8sDxIpip")//多個ID的方式
.add("blog", "blog", "AWG9GKCwhg1e21lmGSLH") //從另外一個索引裏面獲取
.get();
for (MultiGetItemResponse itemResponse : responses) {
GetResponse response = itemResponse.getResponse();
if (response.isExists()) {
String source = response.getSourceAsString(); //_source
JSONObject jsonObject = JSON.parseObject(source);
Set<String> sets = jsonObject.keySet();
for (String str : sets) {
System.out.println("key -> " + str);
System.out.println("value -> "+jsonObject.get(str));
System.out.println("===============");
}
}
}
}
複製代碼
Buli API 能夠實現批量插入
/** * 批量插入 */
@Test
public void testBulkApi() throws IOException {
BulkRequestBuilder requestBuilder = client.prepareBulk();
requestBuilder.add(client.prepareIndex(INDEX, TYPE, "1")
.setSource(jsonBuilder()
.startObject()
.field("user", "張三")
.field("postDate", "2018-05-01")
.field("message", "zhangSan message")
.endObject()));
requestBuilder.add(client.prepareIndex(INDEX, TYPE, "2")
.setSource(jsonBuilder()
.startObject()
.field("user", "李四")
.field("postDate", "2016-09-10")
.field("message", "Lisi message")
.endObject()));
BulkResponse bulkResponse = requestBuilder.get();
if (bulkResponse.hasFailures()) {
System.out.println("error");
}
}
複製代碼
使用Bulk Processor,Bulk Processor提供了一個簡單的接口,在給定的大小的數量上定時批量自動請求
首先建立Bulk Processor實例
/** * 建立Processor實例 */
@Test
public void testCreateBulkProcessor() {
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
//調用Bulk以前執行,例如能夠經過request.numberOfActions()方法知道numberOfActions
public void beforeBulk(long l, BulkRequest request) {
}
//調用Bulk以後執行,例如能夠經過response.hasFailures()方法知道是否執行失敗
public void afterBulk(long l, BulkRequest request, BulkResponse response) {
}
//調用失敗拋出throwable
public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
}
}).setBulkActions(10000) //每次10000個請求
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) //拆成5MB一塊
.setFlushInterval(TimeValue.timeValueSeconds(5))//不管請求數量多少,每5秒鐘請求一次
.setConcurrentRequests(1)//設置併發請求的數量。值爲0意味着只容許執行一個請求。值爲1意味着容許1併發請求
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
//設置自定義重複請求機制,最開始等待100毫秒,以後成倍增長,重試3次,當一次或者屢次重複請求失敗後由於計算資源不夠拋出EsRejectedExecutionException
// 異常,能夠經過BackoffPolicy.noBackoff()方法關閉重試機制
.build();
}
複製代碼
BulkProcess默認設計
/** * 建立Processor實例 */
@Test
public void testCreateBulkProcessor() throws IOException {
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
//調用Bulk以前執行,例如能夠經過request.numberOfActions()方法知道numberOfActions
public void beforeBulk(long l, BulkRequest request) {
}
//調用Bulk以後執行,例如能夠經過response.hasFailures()方法知道是否執行失敗
public void afterBulk(long l, BulkRequest request, BulkResponse response) {
}
//調用失敗拋出throwable
public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
}
}).setBulkActions(10000) //每次10000個請求
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) //拆成5MB一塊
.setFlushInterval(TimeValue.timeValueSeconds(5))//不管請求數量多少,每5秒鐘請求一次
.setConcurrentRequests(1)//設置併發請求的數量。值爲0意味着只容許執行一個請求。值爲1意味着容許1併發請求
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
//設置自定義重複請求機制,最開始等待100毫秒,以後成倍增長,重試3次,當一次或者屢次重複請求失敗後由於計算資源不夠拋出EsRejectedExecutionException
// 異常,能夠經過BackoffPolicy.noBackoff()方法關閉重試機制
.build();
//增長requests
bulkProcessor.add(new IndexRequest(INDEX, TYPE, "3").source(
jsonBuilder()
.startObject()
.field("user", "王五")
.field("postDate", "2019-10-05")
.field("message", "wangwu message")
.endObject()));
bulkProcessor.add(new DeleteRequest(INDEX, TYPE, "1"));
bulkProcessor.flush();
//關閉bulkProcessor
bulkProcessor.close();
client.admin().indices().prepareRefresh().get();
client.prepareSearch().get();
}
複製代碼
搜索API能夠支持搜索查詢,返回查詢匹配的結果,它能夠搜索一個index/type或者多個index/type,可使用Query Java API 做爲查詢條件
Java 默認提供QUERY_AND_FETCH和DFS_QUERY_AND_FETCH兩種search Types,可是這種模式應該由系統選擇,而不是用戶手動指定
實例
@Test
public void testSearchApi() {
SearchResponse response = client.prepareSearch(INDEX).setTypes(TYPE)
.setQuery(QueryBuilders.matchQuery("user", "hhh")).get();
SearchHit[] hits = response.getHits().getHits();
for (int i = 0; i < hits.length; i++) {
String json = hits[i].getSourceAsString();
JSONObject object = JSON.parseObject(json);
Set<String> strings = object.keySet();
for (String str : strings) {
System.out.println(object.get(str));
}
}
}
複製代碼
通常的搜索請求都時返回一頁的數據,不管多大的數據量都會返回給用戶,Scrolls API 能夠容許咱們檢索大量的數據(甚至是所有數據)。Scroll API容許咱們作一個初始階段搜索頁而且持續批量從ElasticSearch裏面拉去結果知道結果沒有剩下。Scroll API的建立並非爲了實時的用戶響應,而是爲了處理大量的數據。
/** * 滾動查詢 * @throws ExecutionException * @throws InterruptedException */
@Test
public void testScrollApi() throws ExecutionException, InterruptedException {
MatchQueryBuilder qb = matchQuery("user", "hhh");
SearchResponse response = client.prepareSearch(INDEX).addSort(FieldSortBuilder.DOC_FIELD_NAME,
SortOrder.ASC)
.setScroll(new TimeValue(60000)) //爲了使用scroll,初始搜索請求應該在查詢中指定scroll參數,告訴ElasticSearch須要保持搜索的上下文環境多長時間
.setQuery(qb)
.setSize(100).get();
do {
for (SearchHit hit : response.getHits().getHits()) {
String json = hit.getSourceAsString();
JSONObject object = JSON.parseObject(json);
Set<String> strings = object.keySet();
for (String str : strings) {
System.out.println(object.get(str));
}
}
response = client.prepareSearchScroll(response.getScrollId()).setScroll(new TimeValue(60000)).execute().get();
} while (response.getHits().getHits().length != 0);
}
複製代碼
若是超過滾動時間,繼續使用該滾動ID搜索數據,則會報錯
雖然滾動時間已過,搜索上下文會自動被清除,可是一直保持滾動代價會很大,因此當咱們不在使用滾動時要儘快使用Clear-Scroll API進行清除。
ClearScrollRequestBuilder clearBuilder = client.prepareClearScroll();
clearBuilder.addScrollId(response.getScrollId());
ClearScrollResponse scrollResponse = clearBuilder.get();
System.out.println("是否清楚成功:"+scrollResponse.isSucceeded());
複製代碼
MultiSearch API 容許在同一個API中執行多個搜索請求。它的端點是_msearch
@Test
public void testMultiSearchApi() {
SearchRequestBuilder srb1 = client.prepareSearch().setQuery(QueryBuilders.queryStringQuery("elasticsearch")).setSize(1);
SearchRequestBuilder srb2 = client.prepareSearch().setQuery(QueryBuilders.matchQuery("user", "hhh")).setSize(1);
MultiSearchResponse multiSearchResponse = client.prepareMultiSearch().add(srb1).add(srb2).get();
long nbHits = 0;
for (MultiSearchResponse.Item item : multiSearchResponse.getResponses()) {
SearchResponse response = item.getResponse();
nbHits += response.getHits().getTotalHits();
}
System.out.println(nbHits);
}
複製代碼
聚合框架有助於根據搜索查詢提供數據。它是基於簡單的構建塊也稱爲整合,整合就是將複雜的數據摘要有序的放在一塊。聚合能夠被看作是從一組文件中獲取分析信息的一系列工做的統稱。聚合的實現過程就是定義這個文檔集的過程
@Test
public void testAggregations() {
SearchResponse searchResponse = client.prepareSearch()
.setQuery(QueryBuilders.matchAllQuery())
.addAggregation(AggregationBuilders.terms("LZH").field("user"))
.addAggregation(AggregationBuilders.dateHistogram("2013-01-30").field("postDate")
.dateHistogramInterval(DateHistogramInterval.YEAR)).get();
Terms lzh = searchResponse.getAggregations().get("user");
Histogram postDate = searchResponse.getAggregations().get("2013-01-30");
}
複製代碼
獲取文檔的最大數量,若是設置了,須要經過SearchResponse對象裏面的isTerminatedEarly()判斷返回文檔是否達到設置的數量
@Test
public void TestTerminate() {
SearchResponse searchResponse = client.prepareSearch(INDEX)
.setTerminateAfter(2) //若是達到這個數量,提早終止
.get();
if (searchResponse.isTerminatedEarly()) {
System.out.println(searchResponse.getHits().totalHits);
}
}
複製代碼
聚合。ElasticSearch提供完整的Java API來使用聚合。使用AggregationBuilders構建對象,增長到搜索請求中。
SearchResponse response = client.prepareSearch().setQuery(/*查詢*/).addAggregation(/*聚合*/).execute().actionGet();
複製代碼
結構化聚合。
在計算度量類的這類聚合操做是以使用一種方式或者從文檔中提取須要聚合的值爲基礎。
在這中間主要使用的類是**
AggregationBuilders
**,這裏麪包含了大量的一下的聚合方法調用,直接使用便可
MinAggregationBuilder aggregation = AggregationBuilders.min("agg").field("age");
SearchResponse sr = client.prepareSearch("twitter").addAggregation(aggregation).get();
Min agg = sr.getAggregations().get("agg");
String value = agg.getValueAsString();//這個統計的是日期,通常用下面方法得到最小值
System.out.println("min value:" + value);
複製代碼
debug模式下
第一行MinAggregationBuilder的toString()執行的內容以下
{
"error": "JsonGenerationException[Can not write a field name, expecting a value]"
}
複製代碼
SearchResponse sr = client.prepareSearch("twitter").addAggregation(aggregation).get(); 複製代碼
在SearchResponse的toString()的內容以下, 這個內容就是查詢的JSON結果,這裏面的JSON結果的結構與SearchResponse的API操做相配套使用能夠獲取到裏面的每個值。
{
"took": 1,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 4,
"max_score": 1.0,
"hits": [
{
"_index": "twitter",
"_type": "tweet",
"_id": "10",
"_score": 1.0,
"_source": {
"user": "kimchy",
"postDate": "2018-06-29T09:10:21.396Z",
"age": 30,
"gender": "female",
"message": "trying out Elasticsearch"
}
},
{
"_index": "twitter",
"_type": "tweet",
"_id": "2",
"_score": 1.0,
"_source": {
"user": "kimchy",
"postDate": "2018-06-29T09:05:33.943Z",
"age": 20,
"gender": "female",
"message": "trying out Elasticsearch"
}
},
{
"_index": "twitter",
"_type": "tweet",
"_id": "1",
"_score": 1.0,
"_source": {
"user": "kimchy",
"postDate": "2018-06-29T08:59:00.191Z",
"age": 10,
"gender": "male",
"message": "trying out Elasticsearch"
}
},
{
"_index": "twitter",
"_type": "tweet",
"_id": "11",
"_score": 1.0,
"_source": {
"user": "kimchy",
"postDate": "2018-06-29T09:10:54.386Z",
"age": 30,
"gender": "female",
"message": "trying out Elasticsearch"
}
}
]
},
"aggregations": {
"agg": {
"value": 10.0
}
}
}
複製代碼
經過觀察能夠發現sr.getAggregations().get("agg");
方法就是獲取其中的聚合統計的數據,其中整個代碼中的參數agg能夠自定義
MaxAggregationBuilder aggregation = AggregationBuilders.max("agg").field("readSize");
SearchResponse sr = client.prepareSearch("blog").addAggregation(aggregation).get();
Max agg = sr.getAggregations().get("agg");
String value = agg.getValueAsString();
System.out.println("max value:" + value);
複製代碼
具體分析方法如Min Aggregation聚合同樣,可是不能統計出是哪一條數據的最大最小值
SumAggregationBuilder aggregation = AggregationBuilders.sum("agg").field("readSize");
SearchResponse sr = client.prepareSearch("blog").addAggregation(aggregation).get();
Sum agg = sr.getAggregations().get("agg");
String value = agg.getValueAsString();
System.out.println("sum value:" + value);
複製代碼
AvgAggregationBuilder aggregation = AggregationBuilders.avg("agg").field("age");
SearchResponse searchResponse = client.prepareSearch("twitter").addAggregation(aggregation).get();
Avg avg = searchResponse.getAggregations().get("agg");
String value = avg.getValueAsString();
System.out.println("avg value: "+ value);
複製代碼
統計聚合——基於文檔的某個值,計算出一些統計信息(min、max、sum、count、avg), 用於計算的值能夠是特定的數值型字段,也能夠經過腳本計算而來。
StatsAggregationBuilder aggregation = AggregationBuilders.stats("agg").field("age");
SearchResponse searchResponse = client.prepareSearch("twitter").addAggregation(aggregation).get();
Stats stats = searchResponse.getAggregations().get("agg");
String max = stats.getMaxAsString();
String min = stats.getMinAsString();
String avg = stats.getAvgAsString();
String sum = stats.getSumAsString();
long count = stats.getCount();
System.out.println("max value: "+max);
System.out.println("min value: "+min);
System.out.println("avg value: "+avg);
System.out.println("sum value: "+sum);
System.out.println("count value: "+count);
複製代碼
這個聚合統計能夠統計出上面的日常的統計值。當須要統計上面的大部分的值時,可使用這種方式
擴展統計聚合——基於文檔的某個值,計算出一些統計信息(比普通的stats聚合多了sum_of_squares、variance、std_deviation、std_deviation_bounds),用於計算的值能夠是特定的數值型字段,也能夠經過腳本計算而來。主要的結果值就是最大、最小、方差、平方差等統計值
ExtendedStatsAggregationBuilder aggregation = AggregationBuilders.extendedStats("agg").field("age");
SearchResponse response = client.prepareSearch("twitter").addAggregation(aggregation).get();
ExtendedStats extended = response.getAggregations().get("agg");
String max = extended.getMaxAsString();
String min = extended.getMinAsString();
String avg = extended.getAvgAsString();
String sum = extended.getSumAsString();
long count = extended.getCount();
double stdDeviation = extended.getStdDeviation();
double sumOfSquares = extended.getSumOfSquares();
double variance = extended.getVariance();
System.out.println("max value: " +max);
System.out.println("min value: " +min);
System.out.println("avg value: " +avg);
System.out.println("sum value: " +sum);
System.out.println("count value: " +count);
System.out.println("stdDeviation value: " +stdDeviation);
System.out.println("sumOfSquares value: " +sumOfSquares);
System.out.println("variance value: "+variance);
複製代碼
值計數聚合——計算聚合文檔中某個值的個數, 用於計算的值能夠是特定的數值型字段,也能夠經過腳本計算而來。該聚合通常域其它 single-value 聚合聯合使用,好比在計算一個字段的平均值的時候,可能還會關注這個平均值是由多少個值計算而來。
ValueCountAggregationBuilder aggregation = AggregationBuilders.count("agg").field("age");
SearchResponse response = client.prepareSearch("twitter").addAggregation(aggregation).get();
ValueCount count = response.getAggregations().get("agg");
long value = count.getValue();
System.out.println("ValueCount value: " +value);
複製代碼
PercentilesAggregationBuilder aggregation = AggregationBuilders.percentiles("agg").field("age");
SearchResponse response = client.prepareSearch("twitter").addAggregation(aggregation).get();
Percentiles agg = response.getAggregations().get("agg");
for (Percentile entry : agg) {
double percent = entry.getPercent();
double value = entry.getValue();
System.out.println("percent value: " + percent + "value value: " + value);
}
複製代碼
去除重複的個數的基數
CardinalityAggregationBuilder aggregation = AggregationBuilders.cardinality("agg").field("age");
SearchResponse response = client.prepareSearch("twitter").addAggregation(aggregation).get();
Cardinality agg = response.getAggregations().get("agg");
long value = agg.getValue();
System.out.println("value value: "+ value);
複製代碼
查詢出匹配的文檔的字段的個數
TermsAggregationBuilder aggregation = AggregationBuilders.terms("agg").field("gender.keyword")
.subAggregation(AggregationBuilders.topHits("top").explain(true).size(1).from(10));
SearchResponse response = client.prepareSearch("twitter").addAggregation(aggregation).get();
Terms agg = response.getAggregations().get("agg");
for (Terms.Bucket bucket : agg.getBuckets()) {
String key = (String) bucket.getKey();
long docCount = bucket.getDocCount();
System.out.println("key value: " + key + " docCount value: " + docCount);
TopHits topHits = bucket.getAggregations().get("top");
for (SearchHit searchHitFields : topHits.getHits().getHits()) {
System.out.println("id value: " + searchHitFields.getId() + " source value: " + searchHitFields.getSourceAsString());
}
}
複製代碼
查詢全局的一個數量統計
AggregationBuilder aggregation = AggregationBuilders
.global("agg")
.subAggregation(
AggregationBuilders.terms("users").field("user.keyword")
);
SearchResponse sr = client.prepareSearch("twitter")
.addAggregation(aggregation)
.get();
System.out.println(sr);
Global agg = sr.getAggregations().get("agg");
long count = agg.getDocCount(); // Doc count
System.out.println("global count:" + count);
複製代碼
過濾統計
AggregationBuilder aggregation = AggregationBuilders.filters("aaa", new FiltersAggregator.KeyedFilter("men", QueryBuilders.termQuery("gender", "male")));
SearchResponse sr = client.prepareSearch("twitter").setTypes("tweet").addAggregation(aggregation).get();
Filters agg = sr.getAggregations().get("aaa");
for (Filters.Bucket entry : agg.getBuckets()) {
String key = entry.getKeyAsString(); // bucket key
long docCount = entry.getDocCount(); // Doc count
System.out.println("global " + key + " count:" + docCount);
}
複製代碼
多個條件過濾,查詢出個數
AggregationBuilder aggregation = AggregationBuilders.filters("aaa",new FiltersAggregator.KeyedFilter("men", QueryBuilders.termQuery("gender", "male")),new FiltersAggregator.KeyedFilter("women", QueryBuilders.termQuery("gender", "female")));
SearchResponse sr = client.prepareSearch("twitter").setTypes("tweet").addAggregation(aggregation).get();
Filters agg = sr.getAggregations().get("aaa");
for (Filters.Bucket entry : agg.getBuckets()) {
String key = entry.getKeyAsString(); // bucket key
long docCount = entry.getDocCount(); // Doc count
System.out.println("global " + key + " count:" + docCount);
}
複製代碼
TermsAggregationBuilder fieldAggregation = AggregationBuilders.terms("genders").field("gender.keyword")
.order(Terms.Order.term(true));
SearchResponse response = client.prepareSearch("twitter").setTypes("tweet").addAggregation(fieldAggregation).get();
Terms terms = response.getAggregations().get("genders");
for (Terms.Bucket bucket : terms.getBuckets()) {
System.out.println("key value: " + bucket.getKey());
System.out.println("docCount value: " + bucket.getDocCount());
}
複製代碼
TermsAggregationBuilder fieldAggregation = AggregationBuilders.terms("genders").field("gender.keyword")
.order(Terms.Order.term(true));
複製代碼
匹配全部文檔
QueryBuilder qb = matchAllQuery();
複製代碼
模糊匹配和字段詞組查詢
QueryBuilder qb = matchQuery("gender", "female");
複製代碼
多個字段進行查詢,字段能夠有多個
QueryBuilder qb = multiMatchQuery("female","gender", "message");
複製代碼
對一些比較專業的偏門詞語進行更加專業的查詢
QueryBuilder qb = commonTermsQuery("gender","female");
複製代碼
一種與Lucene查詢語法結合的查詢,容許使用特殊條件去查詢(AND|OR|NOT)
QueryBuilder qb = queryStringQuery("+male -female");
複製代碼
一種簡單的查詢語法
QueryBuilder qb = queryStringQuery("+male -female");
複製代碼
在指定字段中查詢確切的值的文檔
QueryBuilder qb = termQuery("gender","male");
複製代碼
查詢一個字段內的多個確切的值
QueryBuilder qb = termsQuery("age","10", "20");
複製代碼
範圍查詢
- gte():範圍查詢將匹配字段值大於或等於此參數值的文檔
- gt():範圍查詢將匹配字段值大於此參數值的文檔
- lte():範圍查詢將匹配字段值小於或等於此參數值的文檔
- lt():範圍查詢將匹配字段值小於此參數值的文檔
- from()開始值to()結果值,這兩個函數與includeLower()和includeUpper()函數配套使用
- includeLower(true)表示from()查詢將匹配字段值大於或等於此參數值的文檔
- includeLower(false)表示from()查詢將匹配字段值大於此參數值的文檔
- includeUpper(true)表示to()查詢將匹配字段值小於或等於此參數值的文檔
- includeUpper(false)表示to()查詢將匹配字段值小於此參數值的文檔
QueryBuilder qb = QueryBuilders.rangeQuery("age").gte(10).includeLower(true).lte(20).includeUpper(true);
複製代碼
其中,includeLower()和includeUpper()方法表示這個範圍是否包含查詢
根據指定的字段名查詢是否存在
QueryBuilder qb = existsQuery("user");
複製代碼
根據指定字段名和指定精確前綴進行查詢
QueryBuilder qb = prefixQuery("gender","m");
複製代碼
通配符查詢,指定字段名和通配符。其中?表示單字符通配符,*表示多字符通配符。通配符查詢的字段都是未通過分析的字段
QueryBuilder qb = wildcardQuery("gender","f?*");
複製代碼
根據指定字段名和正則表達式進行查詢。查詢的字段也是未通過分析的字段
QueryBuilder qb = regexpQuery("gender","f.*");
複製代碼
模糊查詢:指定的確切的字段名和拼寫錯誤的查詢內容
QueryBuilder qb = fuzzyQuery("gender","mala").fuzziness(Fuzziness.ONE);
複製代碼
查詢指定類型的文檔
QueryBuilder qb = typeQuery("tweet");
複製代碼
根據type類型和ID查詢,type類型能夠不寫
QueryBuilder qb = idsQuery("tweet").addIds("1", "11");
複製代碼