pom.xmlhtml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>java
<groupId>cn.edu360.es</groupId>
<artifactId>HelloES</artifactId>
<version>1.0-SNAPSHOT</version>apache
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
</properties>json
<dependencies>api
<!-- es的客戶端-->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>5.4.3</version>
</dependency>app
<!-- 依賴2.x的log4j -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>curl
<!-- 單元測試 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>異步
</dependencies>elasticsearch
</project>maven
package cn.edu360.es;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import java.net.InetAddress;
/**
* Created by zx on 2017/8/15.
*/
public class HelloWorld {
public static void main(String[] args) {
try {
//設置集羣名稱
Settings settings = Settings.builder()
.put("cluster.name", "my-es")
.build();
//建立client
TransportClient client = new PreBuiltTransportClient(settings).addTransportAddresses(
//用java訪問ES用的端口是9300
new InetSocketTransportAddress(InetAddress.getByName("192.168.100.211"), 9300),
new InetSocketTransportAddress(InetAddress.getByName("192.168.100.212"), 9300),
new InetSocketTransportAddress(InetAddress.getByName("192.168.100.213"), 9300));
//搜索數據(.actionGet()方法是同步的,沒有返回就等待)
GetResponse response = client.prepareGet("news", "fulltext", "1").execute().actionGet();
//輸出結果
System.out.println(response);
//關閉client
client.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
package cn.edu360.es;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.DoubleTerms;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.avg.AvgAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.avg.InternalAvg;
import org.elasticsearch.search.aggregations.metrics.max.InternalMax;
import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.sum.InternalSum;
import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;
/**
* Created by zx on 2017/9/5.
* https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.4/index.html
*/
public class EsCRUD {
private TransportClient client = null;
@Before
public void init() throws Exception {
//設置集羣名稱
Settings settings = Settings.builder()
.put("cluster.name", "my-es")
//自動感知的功能(能夠經過當前指定的節點獲取全部es節點的信息)
.put("client.transport.sniff", true)
.build();
//建立client
client = new PreBuiltTransportClient(settings).addTransportAddresses(
new InetSocketTransportAddress(InetAddress.getByName("192.168.100.211"), 9300),
new InetSocketTransportAddress(InetAddress.getByName("192.168.100.212"), 9300),
new InetSocketTransportAddress(InetAddress.getByName("192.168.100.213"), 9300));
}
@Test
public void testCreate() throws IOException {
IndexResponse response = client.prepareIndex("gamelog", "users", "1")
.setSource(
jsonBuilder()
.startObject()
.field("username", "老趙")
.field("gender", "male")
.field("birthday", new Date())
.field("fv", 9999)
.field("message", "trying out Elasticsearch")
.endObject()
).get();
}
//查找一條
@Test
public void testGet() throws IOException {
GetResponse response = client.prepareGet("gamelog", "users", "1").get();
System.out.println(response.getSourceAsString());
}
//查找多條
@Test
public void testMultiGet() throws IOException {
MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
.add("gamelog", "users", "1")
.add("gamelog", "users", "2", "3")
.add("news", "fulltext", "1")
.get();
for (MultiGetItemResponse itemResponse : multiGetItemResponses) {
GetResponse response = itemResponse.getResponse();
if (response.isExists()) {
String json = response.getSourceAsString();
System.out.println(json);
}
}
}
@Test
public void testUpdate() throws Exception {
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("gamelog");
updateRequest.type("users");
updateRequest.id("2");
updateRequest.doc(
jsonBuilder()
.startObject()
.field("fv", 999.9)
.endObject());
client.update(updateRequest).get();
}
@Test
public void testDelete() {
DeleteResponse response = client.prepareDelete("gamelog", "users", "2").get();
System.out.println(response);
}
@Test
public void testDeleteByQuery() {
BulkByScrollResponse response =
DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
//指定查詢條件
.filter(QueryBuilders.matchQuery("username", "老段"))
//指定索引名稱
.source("gamelog")
.get();
long deleted = response.getDeleted();
System.out.println(deleted);
}
//異步刪除
@Test
public void testDeleteByQueryAsync() {
DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("gender", "male"))
.source("gamelog")
.execute(new ActionListener<BulkByScrollResponse>() {
@Override
public void onResponse(BulkByScrollResponse response) {
long deleted = response.getDeleted();
System.out.println("數據刪除了");
System.out.println(deleted);
}
@Override
public void onFailure(Exception e) {
e.printStackTrace();
}
});
try {
System.out.println("異步刪除");
Thread.sleep(10000);
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testRange() {
QueryBuilder qb = rangeQuery("fv")
// [88.99, 10000)
.from(88.99)
.to(10000)
.includeLower(true)
.includeUpper(false);
SearchResponse response = client.prepareSearch("gamelog").setQuery(qb).get();
System.out.println(response);
}
/**
* curl -XPUT 'http://192.168.5.251:9200/player_info/player/1' -d '{ "name": "curry", "age": 29, "salary": 3500,"team": "war", "position": "pg"}'
* curl -XPUT 'http://192.168.5.251:9200/player_info/player/2' -d '{ "name": "thompson", "age": 26, "salary": 2000,"team": "war", "position": "pg"}'
* curl -XPUT 'http://192.168.5.251:9200/player_info/player/3' -d '{ "name": "irving", "age": 25, "salary": 2000,"team": "cav", "position": "pg"}'
* curl -XPUT 'http://192.168.5.251:9200/player_info/player/4' -d '{ "name": "green", "age": 26, "salary": 2000,"team": "war", "position": "pf"}'
* curl -XPUT 'http://192.168.5.251:9200/player_info/player/5' -d '{ "name": "james", "age": 33, "salary": 4000,"team": "cav", "position": "sf"}'
*/
@Test
public void testAddPlayer() throws IOException {
IndexResponse response = client.prepareIndex("player_info", "player", "1")
.setSource(
jsonBuilder()
.startObject()
.field("name", "James")
.field("age", 33)
.field("salary", 3000)
.field("team", "cav")
.field("position", "sf")
.endObject()
).get();
}
/**
* https://elasticsearch.cn/article/102
*
* select team, count(*) as player_count from player group by team;
*/
@Test
public void testAgg1() {
//指定索引和type
SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
//按team分組而後聚合,可是並無指定聚合函數
TermsAggregationBuilder teamAgg = AggregationBuilders.terms("player_count").field("team");
//添加聚合器
builder.addAggregation(teamAgg);
//觸發
SearchResponse response = builder.execute().actionGet();
//System.out.println(response);
//將返回的結果放入到一個map中
Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
// Set<String> keys = aggMap.keySet();
//
// for (String key: keys) {
// System.out.println(key);
// }
// //取出聚合屬性
StringTerms terms = (StringTerms) aggMap.get("player_count");
//
//// //依次迭代出分組聚合數據
// for (Terms.Bucket bucket : terms.getBuckets()) {
// //分組的名字
// String team = (String) bucket.getKey();
// //count,分組後一個組有多少數據
// long count = bucket.getDocCount();
// System.out.println(team + " " + count);
// }
Iterator<Terms.Bucket> teamBucketIt = terms.getBuckets().iterator();
while (teamBucketIt .hasNext()) {
Terms.Bucket bucket = teamBucketIt.next();
String team = (String) bucket.getKey();
long count = bucket.getDocCount();
System.out.println(team + " " + count);
}
}
/**
* select team, position, count(*) as pos_count from player group by team, position;
*/
@Test
public void testAgg2() {
SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
//指定別名和分組的字段
TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_name").field("team");
TermsAggregationBuilder posAgg= AggregationBuilders.terms("pos_count").field("position");
//添加兩個聚合構建器
builder.addAggregation(teamAgg.subAggregation(posAgg));
//執行查詢
SearchResponse response = builder.execute().actionGet();
//將查詢結果放入map中
Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
//根據屬性名到map中查找
StringTerms teams = (StringTerms) aggMap.get("team_name");
//循環查找結果
for (Terms.Bucket teamBucket : teams.getBuckets()) {
//先按球隊進行分組
String team = (String) teamBucket.getKey();
Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
StringTerms positions = (StringTerms) subAggMap.get("pos_count");
//由於一個球隊有不少位置,那麼還要依次拿出位置信息
for (Terms.Bucket posBucket : positions.getBuckets()) {
//拿到位置的名字
String pos = (String) posBucket.getKey();
//拿出該位置的數量
long docCount = posBucket.getDocCount();
//打印球隊,位置,人數
System.out.println(team + " " + pos + " " + docCount);
}
}
}
/**
* select team, max(age) as max_age from player group by team;
*/
@Test
public void testAgg3() {
SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
//指定安球隊進行分組
TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_name").field("team");
//指定分組求最大值
MaxAggregationBuilder maxAgg = AggregationBuilders.max("max_age").field("age");
//分組後求最大值
builder.addAggregation(teamAgg.subAggregation(maxAgg));
//查詢
SearchResponse response = builder.execute().actionGet();
Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
//根據team屬性,獲取map中的內容
StringTerms teams = (StringTerms) aggMap.get("team_name");
for (Terms.Bucket teamBucket : teams.getBuckets()) {
//分組的屬性名
String team = (String) teamBucket.getKey();
//在將聚合後取最大值的內容取出來放到map中
Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
//取分組後的最大值
InternalMax ages = (InternalMax)subAggMap.get("max_age");
double max = ages.getValue();
System.out.println(team + " " + max);
}
}
/**
* select team, avg(age) as avg_age, sum(salary) as total_salary from player group by team;
*/
@Test
public void testAgg4() {
SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
//指定分組字段
TermsAggregationBuilder termsAgg = AggregationBuilders.terms("team_name").field("team");
//指定聚合函數是求平均數據
AvgAggregationBuilder avgAgg = AggregationBuilders.avg("avg_age").field("age");
//指定另一個聚合函數是求和
SumAggregationBuilder sumAgg = AggregationBuilders.sum("total_salary").field("salary");
//分組的聚合器關聯了兩個聚合函數
builder.addAggregation(termsAgg.subAggregation(avgAgg).subAggregation(sumAgg));
SearchResponse response = builder.execute().actionGet();
Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
//按分組的名字取出數據
StringTerms teams = (StringTerms) aggMap.get("team_name");
for (Terms.Bucket teamBucket : teams.getBuckets()) {
//獲取球隊名字
String team = (String) teamBucket.getKey();
Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
//根據別名取出平均年齡
InternalAvg avgAge = (InternalAvg)subAggMap.get("avg_age");
//根據別名取出薪水總和
InternalSum totalSalary = (InternalSum)subAggMap.get("total_salary");
double avgAgeValue = avgAge.getValue();
double totalSalaryValue = totalSalary.getValue();
System.out.println(team + " " + avgAgeValue + " " + totalSalaryValue);
}
}
/**
* select team, sum(salary) as total_salary from player group by team order by total_salary desc;
*/
@Test
public void testAgg5() {
SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
//按team進行分組,而後指定排序規則
TermsAggregationBuilder termsAgg = AggregationBuilders.terms("team_name").field("team").order(Terms.Order.aggregation("total_salary ", true));
SumAggregationBuilder sumAgg = AggregationBuilders.sum("total_salary").field("salary");
builder.addAggregation(termsAgg.subAggregation(sumAgg));
SearchResponse response = builder.execute().actionGet();
Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
StringTerms teams = (StringTerms) aggMap.get("team_name");
for (Terms.Bucket teamBucket : teams.getBuckets()) {
String team = (String) teamBucket.getKey();
Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
InternalSum totalSalary = (InternalSum)subAggMap.get("total_salary");
double totalSalaryValue = totalSalary.getValue();
System.out.println(team + " " + totalSalaryValue);
}
}
}
package cn.edu360.es;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.net.InetAddress;
import java.util.HashMap;
/**
* Created by zx on 2017/9/6.
*/
public class AdminAPI {
private TransportClient client = null;
//在全部的測試方法以前執行
@Before
public void init() throws Exception {
//設置集羣名稱
Settings settings = Settings.builder().put("cluster.name", "my-es").build();
//建立client
client = new PreBuiltTransportClient(settings).addTransportAddresses(
new InetSocketTransportAddress(InetAddress.getByName("192.168.100.211"), 9300),
new InetSocketTransportAddress(InetAddress.getByName("192.168.100.212"), 9300),
new InetSocketTransportAddress(InetAddress.getByName("192.168.100.213"), 9300));
}
//建立索引,並配置一些參數
@Test
public void createIndexWithSettings() {
//獲取Admin的API
AdminClient admin = client.admin();
//使用Admin API對索引進行操做
IndicesAdminClient indices = admin.indices();
//準備建立索引
indices.prepareCreate("gamelog")
//配置索引參數
.setSettings(
//參數配置器
Settings.builder()//指定索引分區的數量
.put("index.number_of_shards", 4)
//指定索引副本的數量(注意:不包括自己,若是設置數據存儲副本爲2,實際上數據存儲了3份)
.put("index.number_of_replicas", 2)
)
//真正執行
.get();
}
//跟索引添加mapping信息(給表添加schema信息)
@Test
public void putMapping() {
//建立索引
client.admin().indices().prepareCreate("twitter")
//建立一個type,並指定type中屬性的名字和類型
.addMapping("tweet",
"{\n" +
" \"tweet\": {\n" +
" \"properties\": {\n" +
" \"message\": {\n" +
" \"type\": \"string\"\n" +
" }\n" +
" }\n" +
" }\n" +
" }")
.get();
}
/**
* 你能夠經過dynamic設置來控制這一行爲,它可以接受如下的選項:
* true:默認值。動態添加字段
* false:忽略新字段
* strict:若是碰到陌生字段,拋出異常
* @throws IOException
*/
@Test
public void testSettingsMappings() throws IOException {
//1:settings
HashMap<String, Object> settings_map = new HashMap<String, Object>(2);
settings_map.put("number_of_shards", 3);
settings_map.put("number_of_replicas", 2);
//2:mappings(映射、schema)
XContentBuilder builder = XContentFactory.jsonBuilder()
.startObject()
.field("dynamic", "true")
//設置type中的屬性
.startObject("properties")
//id屬性
.startObject("num")
//類型是integer
.field("type", "integer")
//不分詞,可是建索引
.field("index", "not_analyzed")
//在文檔中存儲
.field("store", "yes")
.endObject()
//name屬性
.startObject("name")
//string類型
.field("type", "string")
//在文檔中存儲
.field("store", "yes")
//創建索引
.field("index", "analyzed")
//使用ik_smart進行分詞
.field("analyzer", "ik_smart")
.endObject()
.endObject()
.endObject();
CreateIndexRequestBuilder prepareCreate = client.admin().indices().prepareCreate("user_info");
//管理索引(user_info)而後關聯type(user)
prepareCreate.setSettings(settings_map).addMapping("user", builder).get();
}
/**
* XContentBuilder mapping = jsonBuilder()
.startObject()
.startObject("productIndex")
.startObject("properties")
.startObject("title").field("type", "string").field("store", "yes").endObject()
.startObject("description").field("type", "string").field("index", "not_analyzed").endObject()
.startObject("price").field("type", "double").endObject()
.startObject("onSale").field("type", "boolean").endObject()
.startObject("type").field("type", "integer").endObject()
.startObject("createDate").field("type", "date").endObject()
.endObject()
.endObject()
.endObject();
PutMappingRequest mappingRequest = Requests.putMappingRequest("productIndex").type("productIndex").source(mapping);
client.admin().indices().putMapping(mappingRequest).actionGet();
*/
/**
* index這個屬性,no表明不建索引
* not_analyzed,建索引不分詞
* analyzed 即分詞,又創建索引
* expected [no], [not_analyzed] or [analyzed]
* @throws IOException
*/
@Test
public void testSettingsPlayerMappings() throws IOException {
//1:settings
HashMap<String, Object> settings_map = new HashMap<String, Object>(2);
settings_map.put("number_of_shards", 3);
settings_map.put("number_of_replicas", 1);
//2:mappings
XContentBuilder builder = XContentFactory.jsonBuilder()
.startObject()//
.field("dynamic", "true")
.startObject("properties")
.startObject("id")
.field("type", "integer")
.field("store", "yes")
.endObject()
.startObject("name")
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject("age")
.field("type", "integer")
.endObject()
.startObject("salary")
.field("type", "integer")
.endObject()
.startObject("team")
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject("position")
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject("description")
.field("type", "string")
.field("store", "no")
.field("index", "analyzed")
.field("analyzer", "ik_smart")
.endObject()
.startObject("addr")
.field("type", "string")
.field("store", "yes")
.field("index", "analyzed")
.field("analyzer", "ik_smart")
.endObject()
.endObject()
.endObject();
CreateIndexRequestBuilder prepareCreate = client.admin().indices().prepareCreate("player_info");
prepareCreate.setSettings(settings_map).addMapping("player", builder).get();
}}