Maven依賴html
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>6.5.2</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.8</version> </dependency> <!-- json --> <dependency> <groupId>org.json</groupId> <artifactId>json</artifactId> <version>20180813</version> </dependency> </dependencies>
由於方法較多,這裏採用junit單元測試java
目錄結構算法
Product.javaapache
package com.daniel.entity; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * @Author Daniel * @Description 這裏須要在idea中安裝lombok插件 **/ @Data @AllArgsConstructor @NoArgsConstructor public class Product { private String name; private String author; private String version; }
ElasticSearchUtil.javajson
package com.daniel.util; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.transport.client.PreBuiltTransportClient; import java.io.IOException; import java.net.InetAddress; import java.util.LinkedList; import java.util.Properties; /** * @Author Daniel * @Description 鏈接到ElasticSearch集羣 **/ public class ElasticSearchUtil { // 鏈接池 private static LinkedList<TransportClient> pool = new LinkedList<>(); static { String CLUSTER_NAME = "cluster.name"; String CLUSTER_HOSTS_PORT = "cluster.hosts.port"; Properties properties = new Properties(); try { // 加載配置文件 properties.load(ElasticSearchUtil.class.getClassLoader().getResourceAsStream("elastic.properties")); Settings setting = Settings.builder() // 若是集羣的cluster.name和elasticsearch不一樣,須要手動指定 .put(CLUSTER_NAME, properties.getProperty(CLUSTER_NAME)) .build(); // 入口 TransportClient client; for (int i = 0; i < 5; i++) { client = new PreBuiltTransportClient(setting); // 指定es集羣的地址 String[] hostAndPorts = properties.getProperty(CLUSTER_HOSTS_PORT).split(","); for (String hostAndPort : hostAndPorts) { String host = hostAndPort.split(":")[0]; int port = Integer.valueOf(hostAndPort.split(":")[1]); TransportAddress trans = new TransportAddress(InetAddress.getByName(host), port); client.addTransportAddress(trans); } pool.push(client); } } catch (IOException e) { e.printStackTrace(); } } public static TransportClient getClient() { while (pool.isEmpty()) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } return pool.poll(); } public static void release(TransportClient client) { pool.push(client); } }
elastic.propertiesapi
cluster.name=bde-es cluster.hosts.port=hadoop01:9300,hadoop02:9300,hadoop03:9300
CURD.java數組
package com.daniel.api; import com.daniel.entity.Product; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.transport.client.PreBuiltTransportClient; import org.json.JSONObject; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.beans.PropertyDescriptor; import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.HashMap; import java.util.Map; /** * @Author Daniel * @Description 增刪改查 **/ public class CURD { // 入口:TransportClient private TransportClient client; @Before // 創建鏈接 public void setUp() throws UnknownHostException { Settings setting = Settings.builder() .put("cluster.name", "bde-es") .build(); client = new PreBuiltTransportClient(setting); // 指定es集羣的地址 TransportAddress[] trans = { // 這裏不是9200 new TransportAddress(InetAddress.getByName("hadoop01"), 9300), new TransportAddress(InetAddress.getByName("hadoop02"), 9300), new TransportAddress(InetAddress.getByName("hadoop03"), 9300) }; client.addTransportAddresses(trans); /* // 獲取集羣配置 for(String key : client.settings().keySet()) { System.out.println(key + ": " + client.settings().get(key)); }*/ } final String INDEX = "product"; final String TYPE = "hadoop"; @Test public void get() { // 這裏至關於curl -XGET http://hadoop01:9200/{index}/{type}/{id} GetResponse response = client.prepareGet(INDEX, TYPE, "1") .get(); Map<String, Object> map = response.getSource(); map.forEach((k, v) -> System.out.println(k + "--->" + v)); System.out.println("create json version: " + response.getVersion()); } @Test public void createJSON() { String json = "{\"name\": \"azkaban\", \"author\": \"apache\", \"version\": \"1.4.2\"}"; IndexResponse response = client.prepareIndex(INDEX, TYPE, "1") // 傳入XContentType.JSON是爲了不json數組只能是偶數的報錯 .setSource(json, XContentType.JSON) .get(); System.out.println("create json version: " + response.getVersion()); } @Test public void createMap() { Map<String, String> map = new HashMap<>(); map.put("name", "spark"); map.put("author", "apache"); map.put("version", "2.7.6"); IndexResponse response = client.prepareIndex(INDEX, TYPE, "3") .setSource(map) .get(); System.out.println("create map version: " + response.getVersion()); } @Test public void createBean() throws Exception { Product bp = new Product(); bp.setName("hbase"); bp.setAuthor("apache"); bp.setVersion("1.2.1"); // 一般作法講bean轉換成map或者json字符串 // Bean to json // JSONObject jsonObj = new JSONObject(bp); // Bean to map Map<String, Object> beanMap = bean2Map(bp); IndexResponse response = client.prepareIndex(INDEX, TYPE, "4") // .setSource(jsonObj.toString(), XContentType.JSON) .setSource(beanMap) .get(); System.out.println("create bean version: " + response.getVersion()); } // 基本類型XContentBuilder @Test public void createXContentBuilder() throws IOException { XContentBuilder builder = JsonXContent.contentBuilder(); builder.startObject() .field("name", "elasticsearch") .field("author", "elastic") .field("version", "6.5.2") .endObject(); IndexResponse response = client.prepareIndex(INDEX, TYPE, "5") .setSource(builder) .get(); System.out.println("create bean version: " + response.getVersion()); } // 將對象轉換成Map public <T> Map<String, Object> bean2Map(T t) throws Exception { Map<String, Object> map = new HashMap<>(); Class<?> clazz = t.getClass(); Field[] fields = clazz.getDeclaredFields(); PropertyDescriptor bp = null; for (Field field : fields) { String name = field.getName(); bp = new PropertyDescriptor(name, clazz); Method getterMethod = bp.getReadMethod(); Object value = getterMethod.invoke(t); map.put(name, value); } return map; } @Test // 局部修改 public void update() { // 添加或修改一個url字段 String json = "{\"url\": \"http://www.elastic.io\"}"; UpdateResponse response = client.prepareUpdate(INDEX, TYPE, "6") .setDoc(json, XContentType.JSON) .get(); System.out.println("update index version: " + response.getVersion()); } @Test public void Delete() { DeleteResponse response = client.prepareDelete(INDEX, TYPE, "7") .get(); System.out.println("delete index version: " + response.getVersion()); } @Test // 批量操做 public void bulk() throws Exception { XContentBuilder builder = JsonXContent.contentBuilder(); // 新建一個json builder.startObject() .field("name", "elasticsearch") .field("author", "elastic") .field("version", "6.5.2") .endObject(); String json = "{\"url\": \"http://www.elastic.io\"}"; BulkResponse itemResponses = client.prepareBulk() // 增 .add(client.prepareIndex(INDEX, TYPE, "7").setSource(builder)) // 刪 .add(client.prepareDelete(INDEX, TYPE, "6")) // 改 .add(client.prepareUpdate(INDEX, TYPE, "5").setDoc(json, XContentType.JSON)) // 查 .get(); // 拿到這個操做的一些信息 for (BulkItemResponse response : itemResponses) { String id = response.getId(); long version = response.getVersion(); System.out.println(id + "--->version: " + version); } } @After public void cleanUp() { client.close(); } }
全文索引,prepareSearch,須要指定檢索的類型 SearchType:curl
以上兩種就比最上面兩種多了一個dfs的過程:d:distributed f:frequency s:scatter稱爲分佈式詞頻率和文檔頻率散發elasticsearch
常見的查詢方式:分佈式
FullTextIndex.java
package com.daniel.api; import com.daniel.util.ElasticSearchUtil; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.text.Text; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.subphase.highlight.HighlightField; import org.elasticsearch.search.sort.SortOrder; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.Map; /** * @Author Daniel * @Description 全文索引 **/ public class FullTextIndex { private TransportClient client; @Before public void setUp() { client = ElasticSearchUtil.getClient(); } String[] indices = {"account"}; @Test public void search() { // 指定索引庫 SearchResponse response = client.prepareSearch(indices) // 指定檢索類型 .setSearchType(SearchType.QUERY_THEN_FETCH) // 選擇一種查詢方式 // .setQuery(QueryBuilders.matchAllQuery()) // .setQuery(QueryBuilders.matchQuery("address", "Avenue")) .setQuery(QueryBuilders.matchPhraseQuery("address", "Clarkson Avenue")) // 查什麼內容 // 有啥要求(排序,聚合等等) .get(); // 搜索結果 SearchHits searchHits = response.getHits(); long totalHits = searchHits.totalHits; System.out.println("Daniel爲您找到相關結果約" + totalHits + "個"); // es會對數據進行評分 float maxScore = searchHits.getMaxScore(); System.out.println("最大得分:" + maxScore); SearchHit[] hits = searchHits.getHits(); for (SearchHit hit : hits) { System.out.println("-------------------------------------"); String index = hit.getIndex(); String type = hit.getType(); String id = hit.getId(); float score = hit.getScore(); String source = hit.getSourceAsString(); System.out.printf("index:\t%s\n", index); System.out.printf("type:\t: %s\n", type); System.out.printf("id:\t%s\n", id); System.out.printf("score:\t%f\n", score); System.out.printf("source:\t%s\n", source); } } @Test public void search2() { SearchResponse response = client.prepareSearch(indices) .setSearchType(SearchType.QUERY_THEN_FETCH) .setQuery(QueryBuilders.matchQuery("address", "Avenue")) // 分頁算法:from=(page - 1) * pageSize // 第五頁 .setFrom(20) .setSize(5) // 排序(二次排序) .addSort("age", SortOrder.DESC) .addSort("balance", SortOrder.DESC) //高亮操做 .highlighter( SearchSourceBuilder.highlight() // 前置標籤 .preTags("<font style=\"font-size: 20px;color:blue\">") .field("address") // 後置標籤 .postTags("</font>") ) .get(); SearchHits searchHits = response.getHits(); long totalHits = searchHits.totalHits; System.out.println("Daniel爲您找到相關結果約" + totalHits + "個"); float maxScore = searchHits.getMaxScore(); System.out.println("最大得分:" + maxScore); SearchHit[] hits = searchHits.getHits(); for (SearchHit hit : hits) { System.out.println("-------------------------------------"); String index = hit.getIndex(); String type = hit.getType(); String id = hit.getId(); float score = hit.getScore(); Map<String, HighlightField> highlightFields = hit.getHighlightFields(); //key --> 高亮字段對應的值 //value --> 高亮字段對應的高亮內容 String hl = ""; for (Map.Entry<String, HighlightField> hf : highlightFields.entrySet()) { HighlightField highlightField = hf.getValue(); Text[] texts = highlightField.fragments(); for (Text text : texts) { hl += text.toString(); } } System.out.printf("index:\t%s\n", index); System.out.printf("type:\t: %s\n", type); System.out.printf("id:\t%s\n", id); System.out.printf("score:\t%f\n", score); // 將最後的結果輸入在html頁面上便可以看到效果 System.out.println("高亮內容:" + hl); } } @After public void cleanUp() { ElasticSearchUtil.release(client); } }