1、說明:html
1、Elasticsearch提供了兩個JAVA REST Client版本:java
一、java low level rest client:web
低級別的rest客戶端,經過http與集羣交互,用戶需本身編組請求JSON串,及解析響應JSON串。兼容全部Elasticsearch版本。apache
特色:maven引入json
使用介紹: https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-low.htmlapi
二、java high rest client:app
高級別的REST客戶端,基於低級別的REST客戶端,增長了編組請求JSON串、解析響應JSON串等相關API,使用的版本須要保存和ES服務一致的版本,不然會有版本問題。異步
從6.0.0開始加入的,目的是以java面向對象的方式進行請求、響應處理。async
每一個API支持 同步、異步 兩種方式,同步方法之間返回一個結果對象。異步的方法以async爲後綴,經過listener參數來通知結果。高級java resy客戶端依賴Elasticsearch core pprojectelasticsearch
兼容性說明:
依賴jdk1.8和Elasticsearch core project
2、Java Low Level Rest Client的使用
版本:
Elasticsearch 6.3.1
pom文件:
<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>6.3.1</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>6.3.1</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>2.7</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.7</version> </dependency> <dependency> <groupId>net.sf.json-lib</groupId> <artifactId>json-lib</artifactId> <version>0.9</version> </dependency>
1、構建elasicsearch client工具類
import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.transport.client.PreBuiltTransportClient; import java.net.InetAddress; /** * @Author: xiaolaotou * @Date: 2019/4/19 */ /** * 構建elasticsrarch client */ public class ClientUtil { private static TransportClient client; public TransportClient CreateClient() throws Exception { // 先構建client System.out.println("11111111111"); Settings settings=Settings.builder() .put("cluster.name","elasticsearch1") .put("client.transport.ignore_cluster_name", true) //若是集羣名不對,也能鏈接 .build(); //建立Client TransportClient client = new PreBuiltTransportClient(settings) .addTransportAddress( new TransportAddress( InetAddress.getByName( "192.168.200.100"), 9300)); return client; } }
2、測試類
import net.sf.json.JSONObject; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Requests; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHits; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Set; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; /** * @Author: xiaolaotou * @Date: 2019/4/19 * ElasticSearch 6.3.1 */ public class Test { private static TransportClient client; static { try { client = new ClientUtil().CreateClient(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { //建立索引 // createEmployee(); //根據inde,type,id查詢一個document的data // FindIndex(); // CreateJsonIndex(); //批量導入 // BulkCreateIndex(); //批量導出 // OutData(); //建立帶ik分詞的index // CreateIndexIkTest(); //更新索引 // UpdateIndex(); // createIndex2(); // Search(); get(); } /** * 建立索引,普通格式 * * @throws Exception */ public static void createEmployee() throws Exception { IndexResponse response = client.prepareIndex("student", "doc", "1") .setSource(jsonBuilder() .startObject() .field("name", "jack") .field("age", 27) .field("position", "technique") .field("country", "china") .field("join_date", "2017-01-01") .field("salary", 10000) .endObject()) .get(); System.out.println("建立成功!"); }
/** * 根據 index ,type,id查詢 * * @throws Exception */ public static void FindIndex() throws Exception { GetResponse getResponse = client.prepareGet("student", "doc", "1").get(); System.out.println(getResponse.getSourceAsString()); }
/** * 建立索引,JSON * * @throws IOException */ public static void CreateJsonIndex() throws IOException { JSONObject json = new JSONObject(); json.put("user", "小明"); json.put("title", "Java Engineer"); json.put("desc", "web 開發"); IndexResponse response = client.prepareIndex("studentjson", "doc", "1") .setSource(json, XContentType.JSON) .get(); String _index = response.getIndex(); System.out.println(_index); }
/** * elasticsearch批量導入 */ public static void BulkCreateIndex() { BulkRequestBuilder builder = client.prepareBulk(); for (int i = 0; i < 100000; i++) { HashMap<String, Object> map = new HashMap<>(); map.put("recordtime", "11"); map.put("area", "22"); map.put("usertype", "33"); map.put("count", 44); builder.add(client.prepareIndex("bulktest", "1").setSource(map)); //每10000條提交一次 if (i % 10000 == 0) { builder.execute().actionGet(); builder = client.prepareBulk(); } } }
/** * 批量導出 */ public static void OutData() throws IOException { SearchResponse response = client.prepareSearch("bulktest").setTypes("1") .setQuery(QueryBuilders.matchAllQuery()) .setSize(10000).setScroll(new TimeValue(600000)) .setSearchType(SearchType.DEFAULT).execute().actionGet(); // setScroll(new TimeValue(600000)) 設置滾動的時間 String scrollid = response.getScrollId(); //把導出的結果以JSON的格式寫到文件裏 //每次返回數據10000條。一直循環查詢知道全部的數據都被查詢出來 while (true) { SearchResponse response2 = client.prepareSearchScroll(scrollid).setScroll(new TimeValue(1000000)) .execute().actionGet(); SearchHits searchHit = response2.getHits(); //再次查詢不到數據時跳出循環 if (searchHit.getHits().length == 0) { break; } System.out.println("查詢數量 :" + searchHit.getHits().length); for (int i = 0; i < searchHit.getHits().length; i++) { String json = searchHit.getHits()[i].getSourceAsString(); putData(json); } System.out.println("查詢結束"); } }
public static void putData(String json) throws IOException { String str = json + "\n"; //寫入本地文件 String fileTxt = "D:\\data.txt"; File file = new File(fileTxt); if (!file.getParentFile().exists()) { file.getParentFile().mkdirs(); } if (!file.exists()) { file.createNewFile(); FileWriter fw = new FileWriter(file, true); BufferedWriter bw = new BufferedWriter(fw); System.out.println("寫入完成啦啊"); bw.write(String.valueOf(str)); bw.flush(); bw.close(); fw.close(); } else { FileWriter fw = new FileWriter(file, true); BufferedWriter bw = new BufferedWriter(fw); System.out.println("追加寫入完成啦啦"); bw.write(String.valueOf(str)); bw.flush(); bw.close(); fw.close(); } } /** * 建立索引,並給某些字段指定ik分詞器,之後向該索引中查詢時,就會用ik分詞 */ public static void CreateIndexIkTest() throws Exception { //建立映射 XContentBuilder mapping = XContentFactory.jsonBuilder() .startObject() .startObject("properties") //title:字段名, type:文本類型 analyzer :分詞器類型 .startObject("title").field("type", "text").field("analyzer", "ik_smart").endObject() //該字段添加的內容,查詢時將會使用ik_smart分詞 .startObject("content").field("type", "text").field("analyzer", "ik_max_word").endObject() .endObject() .endObject(); //index:索引名 type:類型名(能夠本身定義) PutMappingRequest putmap = Requests.putMappingRequest("index").type("type").source(mapping); //建立索引 client.admin().indices().prepareCreate("index").execute().actionGet(); //爲索引添加映射 client.admin().indices().putMapping(putmap).actionGet(); //調用下面的方法爲建立的索引添加內容 CreateIndex1(); } //這個方法是爲上一步建立的索引中添加內容,包括id,id不能重複 public static void CreateIndex1() throws IOException { IndexResponse response = client.prepareIndex("index", "type", "1") //索引,類型,id .setSource(jsonBuilder() .startObject() .field("title", "title") //字段,值 .field("content", "content") .endObject() ).get(); }
/** * 更新索引 */ //更新索引,更新剛纔建立的索引,若是id相同將會覆蓋掉剛纔的內容 public static void UpdateIndex() throws Exception { //每次添加id應該不一樣,至關於數據表中的主鍵,相同的話將會進行覆蓋 UpdateResponse response=client.update(new UpdateRequest("index","type","1") .doc(XContentFactory.jsonBuilder() .startObject() .field("title","中華人民共和國國歌,國歌是最好聽的歌") .field("content","中華人民共和國國歌,國歌是最好聽的歌") .endObject() )).get(); } //再插入一條數據 public static void createIndex2() throws IOException { IndexResponse response = client.prepareIndex("index", "type", "2") .setSource(jsonBuilder() .startObject() .field("title", "中華民族是偉大的民族") .field("content", "中華民族是偉大的民族") .endObject() ).get(); } /** * 下面使用index索引下的2個document進行查詢 */ public static void Search(){ SearchResponse response1 = client.prepareSearch( "index") //指定多個索引 .setTypes("type") //指定類型 .setSearchType(SearchType.QUERY_THEN_FETCH) .setQuery(QueryBuilders.matchQuery("title", "中華人民共和國國歌")) // Query // .setPostFilter(QueryBuilders.rangeQuery("age").from(12).to(18)) // Filter .setFrom(0).setSize(60).setExplain(true) .get(); long totalHits1= response1.getHits().totalHits; //命中個數 System.out.println("response1======="+totalHits1); SearchResponse response2 = client.prepareSearch( "index") //指定多個索引 .setTypes("type") //指定類型 .setSearchType(SearchType.QUERY_THEN_FETCH) .setQuery(QueryBuilders.matchQuery("content", "中華人民共和國國歌")) // Query // .setPostFilter(QueryBuilders.rangeQuery("age").from(12).to(18)) // Filter .setFrom(0).setSize(60).setExplain(true) .get(); long totalHits2 = response2.getHits().totalHits; //命中個數 System.out.println("response2========="+totalHits2); } /** * GET操做 */ public static void get() { GetResponse response = client.prepareGet("index", "type", "2").get(); Map<String, Object> source = response.getSource(); Set<String> strings = source.keySet(); Iterator<String> iterator = strings.iterator(); while (iterator.hasNext()) { System.out.println(source.get(iterator.next())); } } }