/**html
* 系統環境: vm12 下的centos 7.2java
* 當前安裝版本: elasticsearch-2.4.0.tar.gznode
*/git
默認進行了elasticsearch安裝和ik安裝, 超時配置, 分頁壓力配置等github
添加maven依賴spring
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>2.4.0</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.1.3</version> </dependency>
注: 由於spring-data-elasticsearch目前只支持到2.4.0, 因此, 暫不使用5.2.0版本編程
關於版本控制可見: https://github.com/spring-projects/spring-data-elasticsearchjson
第一部分, 使用java操做elasticsearchcentos
建立客戶端的兩種方式可見spring-data-elasticsearch的源碼, 官方, 須要合起來才能使用, 浪費我一下午的時間....api
org.springframework.data.elasticsearch.client 包下, 2種建立客戶端的方式
1, 建立一個node節點, 加入集羣中, 經過這個node獲取cilent (不建議使用)
這種方式至關於建立了一個節點, 不存儲數據, 可經過claspath下的elasticsearch.yml設置, 也可經過編程的方式配置,
Settings settings = ImmutableSettings.settingsBuilder().put("client.transport.ping_timeout", 1000) .put("discovery.zen.ping.multicast.enabled", "false").put("timeout", 1) .putArray("discovery.zen.ping.unicast.hosts", "l-flightdev18.f.dev.cn0.qunar.com:9300", "l-flightdev17.f.dev.cn0.qunar.com:9300") .build(); Node node = NodeBuilder.nodeBuilder().clusterName("flight_fuwu_order_index").client(true).settings(settings).node(); Client client = node.client();
獲取了全部node節點的client, 發送請求時遍歷可用的client,
Elasticsearch爲Java用戶提供了兩種內置客戶端: 節點客戶端(node client): 節點客戶端以無數據節點(none data node)身份加入集羣,換言之,它本身不存儲任何數據,可是它知道數據在集羣中的具體位置,而且可以直接轉發請求到對應的節點上。 傳輸客戶端(Transport client): 這個更輕量的傳輸客戶端可以發送請求到遠程集羣。它本身不加入集羣,只是簡單轉發請求給集羣中的節點。 兩個Java客戶端都經過9300端口與集羣交互,使用Elasticsearch傳輸協議(Elasticsearch Transport Protocol)。集羣中的節點之間也經過9300端口進行通訊。若是此端口未開放,你的節點將不能組成集羣。+ TIP Java客戶端所在的Elasticsearch版本必須與集羣中其餘節點一致,不然,它們可能互相沒法識別。
https://www.elastic.co/guide/en/elasticsearch/client/java-api/2.4/index.html
下面介紹第二種鏈接方式:
2, 經過transportClient來鏈接集羣
package com.wenbronk.javaes; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkProcessor.Listener; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.MultiGetItemResponse; import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.script.Script; import org.junit.Before; import org.junit.Test; import com.alibaba.fastjson.JSONObject; /** * 使用java API操做elasticSearch * * @author 231 * */ public class JavaESTest { private TransportClient client; private IndexRequest source; /** * 獲取鏈接, 第一種方式 * @throws Exception */ // @Before public void before() throws Exception { Map<String, String> map = new HashMap<String, String>(); map.put("cluster.name", "elasticsearch_wenbronk"); Settings.Builder settings = Settings.builder().put(map); client = TransportClient.builder().settings(settings).build() .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("www.wenbronk.com"), Integer.parseInt("9300"))); } /** * 獲取鏈接, 第二種方式 * @throws Exception */ @Before public void before11() throws Exception { // 建立客戶端, 使用的默認集羣名, "elasticSearch" // client = TransportClient.builder().build() // .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("www.wenbronk.com"), 9300)); // 經過setting對象指定集羣配置信息, 配置的集羣名 Settings settings = Settings.settingsBuilder().put("cluster.name", "elasticsearch_wenbronk") // 設置集羣名 // .put("client.transport.sniff", true) // 開啓嗅探 , 開啓後會一直鏈接不上, 緣由未知 // .put("network.host", "192.168.50.37") .put("client.transport.ignore_cluster_name", true) // 忽略集羣名字驗證, 打開後集羣名字不對也能鏈接上 // .put("client.transport.nodes_sampler_interval", 5) //報錯, // .put("client.transport.ping_timeout", 5) // 報錯, ping等待時間, .build(); client = TransportClient.builder().settings(settings).build() .addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress("192.168.50.37", 9300))); // 默認5s // 多久打開鏈接, 默認5s System.out.println("success connect"); } /** * 查看集羣信息 */ @Test public void testInfo() { List<DiscoveryNode> nodes = client.connectedNodes(); for (DiscoveryNode node : nodes) { System.out.println(node.getHostAddress()); } } /** * 組織json串, 方式1,直接拼接 */ public String createJson1() { String json = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}"; return json; } /** * 使用map建立json */ public Map<String, Object> createJson2() { Map<String,Object> json = new HashMap<String, Object>(); json.put("user", "kimchy"); json.put("postDate", new Date()); json.put("message", "trying out elasticsearch"); return json; } /** * 使用fastjson建立 */ public JSONObject createJson3() { JSONObject json = new JSONObject(); json.put("user", "kimchy"); json.put("postDate", new Date()); json.put("message", "trying out elasticsearch"); return json; } /** * 使用es的幫助類 */ public XContentBuilder createJson4() throws Exception { // 建立json對象, 其中一個建立json的方式 XContentBuilder source = XContentFactory.jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying to out ElasticSearch") .endObject(); return source; } /** * 進行鏈接測試 * @throws Exception */ @Test public void test1() throws Exception { XContentBuilder source = createJson4(); // 存json入索引中 IndexResponse response = client.prepareIndex("twitter", "tweet", "1").setSource(source).get(); // // 結果獲取 String index = response.getIndex(); String type = response.getType(); String id = response.getId(); long version = response.getVersion(); boolean created = response.isCreated(); System.out.println(index + " : " + type + ": " + id + ": " + version + ": " + created); } /** * get API 獲取指定文檔信息 */ @Test public void testGet() { // GetResponse response = client.prepareGet("twitter", "tweet", "1") // .get(); GetResponse response = client.prepareGet("twitter", "tweet", "1") .setOperationThreaded(false) // 線程安全 .get(); System.out.println(response.getSourceAsString()); } /** * 測試 delete api */ @Test public void testDelete() { DeleteResponse response = client.prepareDelete("twitter", "tweet", "1") .get(); String index = response.getIndex(); String type = response.getType(); String id = response.getId(); long version = response.getVersion(); System.out.println(index + " : " + type + ": " + id + ": " + version); } /** * 測試更新 update API * 使用 updateRequest 對象 * @throws Exception */ @Test public void testUpdate() throws Exception { UpdateRequest updateRequest = new UpdateRequest(); updateRequest.index("twitter"); updateRequest.type("tweet"); updateRequest.id("1"); updateRequest.doc(XContentFactory.jsonBuilder() .startObject() // 對沒有的字段添加, 對已有的字段替換 .field("gender", "male") .field("message", "hello") .endObject()); UpdateResponse response = client.update(updateRequest).get(); // 打印 String index = response.getIndex(); String type = response.getType(); String id = response.getId(); long version = response.getVersion(); System.out.println(index + " : " + type + ": " + id + ": " + version); } /** * 測試update api, 使用client * @throws Exception */ @Test public void testUpdate2() throws Exception { // 使用Script對象進行更新 // UpdateResponse response = client.prepareUpdate("twitter", "tweet", "1") // .setScript(new Script("hits._source.gender = \"male\"")) // .get(); // 使用XContFactory.jsonBuilder() 進行更新 // UpdateResponse response = client.prepareUpdate("twitter", "tweet", "1") // .setDoc(XContentFactory.jsonBuilder() // .startObject() // .field("gender", "malelelele") // .endObject()).get(); // 使用updateRequest對象及script // UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "1") // .script(new Script("ctx._source.gender=\"male\"")); // UpdateResponse response = client.update(updateRequest).get(); // 使用updateRequest對象及documents進行更新 UpdateResponse response = client.update(new UpdateRequest("twitter", "tweet", "1") .doc(XContentFactory.jsonBuilder() .startObject() .field("gender", "male") .endObject() )).get(); System.out.println(response.getIndex()); } /** * 測試update * 使用updateRequest * @throws Exception * @throws InterruptedException */ @Test public void testUpdate3() throws InterruptedException, Exception { UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "1") .script(new Script("ctx._source.gender=\"male\"")); UpdateResponse response = client.update(updateRequest).get(); } /** * 測試upsert方法 * @throws Exception * */ @Test public void testUpsert() throws Exception { // 設置查詢條件, 查找不到則添加生效 IndexRequest indexRequest = new IndexRequest("twitter", "tweet", "1") .source(XContentFactory.jsonBuilder() .startObject() .field("name", "qergef") .field("gender", "malfdsae") .endObject()); // 設置更新, 查找到更新下面的設置 UpdateRequest upsert = new UpdateRequest("twitter", "tweet", "1") .doc(XContentFactory.jsonBuilder() .startObject() .field("user", "wenbronk") .endObject()) .upsert(indexRequest); client.update(upsert).get(); } /** * 測試multi get api * 從不一樣的index, type, 和id中獲取 */ @Test public void testMultiGet() { MultiGetResponse multiGetResponse = client.prepareMultiGet() .add("twitter", "tweet", "1") .add("twitter", "tweet", "2", "3", "4") .add("anothoer", "type", "foo") .get(); for (MultiGetItemResponse itemResponse : multiGetResponse) { GetResponse response = itemResponse.getResponse(); if (response.isExists()) { String sourceAsString = response.getSourceAsString(); System.out.println(sourceAsString); } } } /** * bulk 批量執行 * 一次查詢能夠update 或 delete多個document */ @Test public void testBulk() throws Exception { BulkRequestBuilder bulkRequest = client.prepareBulk(); bulkRequest.add(client.prepareIndex("twitter", "tweet", "1") .setSource(XContentFactory.jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject())); bulkRequest.add(client.prepareIndex("twitter", "tweet", "2") .setSource(XContentFactory.jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "another post") .endObject())); BulkResponse response = bulkRequest.get(); System.out.println(response.getHeaders()); } /** * 使用bulk processor * @throws Exception */ @Test public void testBulkProcessor() throws Exception { // 建立BulkPorcessor對象 BulkProcessor bulkProcessor = BulkProcessor.builder(client, new Listener() { public void beforeBulk(long paramLong, BulkRequest paramBulkRequest) { // TODO Auto-generated method stub } // 執行出錯時執行 public void afterBulk(long paramLong, BulkRequest paramBulkRequest, Throwable paramThrowable) { // TODO Auto-generated method stub } public void afterBulk(long paramLong, BulkRequest paramBulkRequest, BulkResponse paramBulkResponse) { // TODO Auto-generated method stub } }) // 1w次請求執行一次bulk .setBulkActions(10000) // 1gb的數據刷新一次bulk .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) // 固定5s必須刷新一次 .setFlushInterval(TimeValue.timeValueSeconds(5)) // 併發請求數量, 0不併發, 1併發容許執行 .setConcurrentRequests(1) // 設置退避, 100ms後執行, 最大請求3次 .setBackoffPolicy( BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) .build(); // 添加單次請求 bulkProcessor.add(new IndexRequest("twitter", "tweet", "1")); bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2")); // 關閉 bulkProcessor.awaitClose(10, TimeUnit.MINUTES); // 或者 bulkProcessor.close(); } }