elasticsearch基本操做之--java基本操做 api

/**html

 * 系統環境: vm12 下的centos 7.2java

 * 當前安裝版本: elasticsearch-2.4.0.tar.gznode

 */git

默認進行了elasticsearch安裝和ik安裝, 超時配置, 分頁壓力配置等github

添加maven依賴spring

<dependency>  
            <groupId>org.elasticsearch</groupId>  
            <artifactId>elasticsearch</artifactId>  
            <version>2.4.0</version>  
        </dependency>    
        <dependency>  
            <groupId>com.fasterxml.jackson.core</groupId>  
            <artifactId>jackson-databind</artifactId>  
            <version>2.1.3</version>  
 </dependency>  

  注: 由於spring-data-elasticsearch目前只支持到2.4.0, 因此, 暫不使用5.2.0版本編程

    關於版本控制可見: https://github.com/spring-projects/spring-data-elasticsearchjson

    

 

第一部分, 使用java操做elasticsearchcentos

  建立客戶端的兩種方式可見spring-data-elasticsearch的源碼, 官方, 須要合起來才能使用, 浪費我一下午的時間....api

org.springframework.data.elasticsearch.client 包下, 2種建立客戶端的方式

 

  1, 建立一個node節點, 加入集羣中, 經過這個node獲取cilent (不建議使用)

      這種方式至關於建立了一個節點, 不存儲數據, 可經過claspath下的elasticsearch.yml設置, 也可經過編程的方式配置, 

Settings settings = ImmutableSettings.settingsBuilder().put("client.transport.ping_timeout", 1000)
                .put("discovery.zen.ping.multicast.enabled", "false").put("timeout", 1)
                .putArray("discovery.zen.ping.unicast.hosts", "l-flightdev18.f.dev.cn0.qunar.com:9300", "l-flightdev17.f.dev.cn0.qunar.com:9300")
                .build();
        Node node = NodeBuilder.nodeBuilder().clusterName("flight_fuwu_order_index").client(true).settings(settings).node();
        Client client = node.client();

     獲取了全部node節點的client, 發送請求時遍歷可用的client, 

Elasticsearch爲Java用戶提供了兩種內置客戶端:
節點客戶端(node client):

節點客戶端以無數據節點(none data node)身份加入集羣,換言之,它本身不存儲任何數據,可是它知道數據在集羣中的具體位置,而且可以直接轉發請求到對應的節點上。
傳輸客戶端(Transport client):

這個更輕量的傳輸客戶端可以發送請求到遠程集羣。它本身不加入集羣,只是簡單轉發請求給集羣中的節點。
兩個Java客戶端都經過9300端口與集羣交互,使用Elasticsearch傳輸協議(Elasticsearch Transport Protocol)。集羣中的節點之間也經過9300端口進行通訊。若是此端口未開放,你的節點將不能組成集羣。+

TIP

Java客戶端所在的Elasticsearch版本必須與集羣中其餘節點一致,不然,它們可能互相沒法識別。

 https://www.elastic.co/guide/en/elasticsearch/client/java-api/2.4/index.html

下面介紹第二種鏈接方式: 

 
  2, 經過transportClient來鏈接集羣

package com.wenbronk.javaes;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkProcessor.Listener;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.script.Script;
import org.junit.Before;
import org.junit.Test;

import com.alibaba.fastjson.JSONObject;

/**
 * 使用java API操做elasticSearch
 * 
 * @author 231
 *
 */
public class JavaESTest {

    private TransportClient client;
    private IndexRequest source;
    
    /**
     * 獲取鏈接, 第一種方式
     * @throws Exception
     */
//    @Before
    public void before() throws Exception {
        Map<String, String> map = new HashMap<String, String>();  
        map.put("cluster.name", "elasticsearch_wenbronk");  
        Settings.Builder settings = Settings.builder().put(map);  
        client = TransportClient.builder().settings(settings).build()  
                        .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("www.wenbronk.com"), Integer.parseInt("9300"))); 
    }
    
    /**
     * 獲取鏈接, 第二種方式
     * @throws Exception
     */
    @Before
    public void before11() throws Exception {
        // 建立客戶端, 使用的默認集羣名, "elasticSearch"
//        client = TransportClient.builder().build()
//                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("www.wenbronk.com"), 9300));

        // 經過setting對象指定集羣配置信息, 配置的集羣名
        Settings settings = Settings.settingsBuilder().put("cluster.name", "elasticsearch_wenbronk") // 設置集羣名
//                .put("client.transport.sniff", true) // 開啓嗅探 , 開啓後會一直鏈接不上, 緣由未知
//                .put("network.host", "192.168.50.37")
                .put("client.transport.ignore_cluster_name", true) // 忽略集羣名字驗證, 打開後集羣名字不對也能鏈接上
//                .put("client.transport.nodes_sampler_interval", 5) //報錯,
//                .put("client.transport.ping_timeout", 5) // 報錯, ping等待時間,
                .build();
         client = TransportClient.builder().settings(settings).build()
                 .addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress("192.168.50.37", 9300)));
         // 默認5s
         // 多久打開鏈接, 默認5s
         System.out.println("success connect");
    }
    
    /**
     * 查看集羣信息
     */
    @Test
    public void testInfo() {
        List<DiscoveryNode> nodes = client.connectedNodes();
        for (DiscoveryNode node : nodes) {
            System.out.println(node.getHostAddress());
        }
    }
    
    /**
     * 組織json串, 方式1,直接拼接
     */
    public String createJson1() {
        String json = "{" +
                "\"user\":\"kimchy\"," +
                "\"postDate\":\"2013-01-30\"," +
                "\"message\":\"trying out Elasticsearch\"" +
            "}";
        return json;
    }
    
    /**
     * 使用map建立json
     */
    public Map<String, Object> createJson2() {
        Map<String,Object> json = new HashMap<String, Object>();
        json.put("user", "kimchy");
        json.put("postDate", new Date());
        json.put("message", "trying out elasticsearch");
        return json;
    }
    
    /**
     * 使用fastjson建立
     */
    public JSONObject createJson3() {
        JSONObject json = new JSONObject();
        json.put("user", "kimchy");
        json.put("postDate", new Date());
        json.put("message", "trying out elasticsearch");
        return json;
    }
    
    /**
     * 使用es的幫助類
     */
    public XContentBuilder createJson4() throws Exception {
        // 建立json對象, 其中一個建立json的方式
        XContentBuilder source = XContentFactory.jsonBuilder()
            .startObject()
                .field("user", "kimchy")
                .field("postDate", new Date())
                .field("message", "trying to out ElasticSearch")
            .endObject();
        return source;
    }
    
    /**
     * 進行鏈接測試
     * @throws Exception
     */
    @Test
    public void test1() throws Exception {
        XContentBuilder source = createJson4();
        // 存json入索引中
        IndexResponse response = client.prepareIndex("twitter", "tweet", "1").setSource(source).get();
//        // 結果獲取
        String index = response.getIndex();
        String type = response.getType();
        String id = response.getId();
        long version = response.getVersion();
        boolean created = response.isCreated();
        System.out.println(index + " : " + type + ": " + id + ": " + version + ": " + created);
    }

    /**
     * get API 獲取指定文檔信息
     */
    @Test
    public void testGet() {
//        GetResponse response = client.prepareGet("twitter", "tweet", "1")
//                                .get();
        GetResponse response = client.prepareGet("twitter", "tweet", "1")
                .setOperationThreaded(false)    // 線程安全
                .get();
        System.out.println(response.getSourceAsString());
    }
    
    /**
     * 測試 delete api
     */
    @Test
    public void testDelete() {
        DeleteResponse response = client.prepareDelete("twitter", "tweet", "1")
                .get();
        String index = response.getIndex();
        String type = response.getType();
        String id = response.getId();
        long version = response.getVersion();
        System.out.println(index + " : " + type + ": " + id + ": " + version);
    }
    
    /**
     * 測試更新 update API
     * 使用 updateRequest 對象
     * @throws Exception 
     */
    @Test
    public void testUpdate() throws Exception {
        UpdateRequest updateRequest = new UpdateRequest();
        updateRequest.index("twitter");
        updateRequest.type("tweet");
        updateRequest.id("1");
        updateRequest.doc(XContentFactory.jsonBuilder()
                .startObject()
                // 對沒有的字段添加, 對已有的字段替換
                    .field("gender", "male")
                    .field("message", "hello")
                .endObject());
        UpdateResponse response = client.update(updateRequest).get();
        
        // 打印
        String index = response.getIndex();
        String type = response.getType();
        String id = response.getId();
        long version = response.getVersion();
        System.out.println(index + " : " + type + ": " + id + ": " + version);
    }
    
    /**
     * 測試update api, 使用client
     * @throws Exception 
     */
    @Test
    public void testUpdate2() throws Exception {
        // 使用Script對象進行更新
//        UpdateResponse response = client.prepareUpdate("twitter", "tweet", "1")
//                .setScript(new Script("hits._source.gender = \"male\""))
//                .get();
        
        // 使用XContFactory.jsonBuilder() 進行更新
//        UpdateResponse response = client.prepareUpdate("twitter", "tweet", "1")
//                .setDoc(XContentFactory.jsonBuilder()
//                        .startObject()
//                            .field("gender", "malelelele")
//                        .endObject()).get();
        
        // 使用updateRequest對象及script
//        UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "1")
//                .script(new Script("ctx._source.gender=\"male\""));
//        UpdateResponse response = client.update(updateRequest).get();
        
        // 使用updateRequest對象及documents進行更新
        UpdateResponse response = client.update(new UpdateRequest("twitter", "tweet", "1")
                .doc(XContentFactory.jsonBuilder()
                        .startObject()
                            .field("gender", "male")
                        .endObject()
                    )).get();
        System.out.println(response.getIndex());
    }
    
    /**
     * 測試update
     * 使用updateRequest
     * @throws Exception 
     * @throws InterruptedException 
     */
    @Test
    public void testUpdate3() throws InterruptedException, Exception {
        UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "1")
            .script(new Script("ctx._source.gender=\"male\""));
        UpdateResponse response = client.update(updateRequest).get();
    }
    
    /**
     * 測試upsert方法
     * @throws Exception 
     * 
     */
    @Test
    public void testUpsert() throws Exception {
        // 設置查詢條件, 查找不到則添加生效
        IndexRequest indexRequest = new IndexRequest("twitter", "tweet", "1")
            .source(XContentFactory.jsonBuilder()
                .startObject()
                    .field("name", "qergef")
                    .field("gender", "malfdsae")
                .endObject());
        // 設置更新, 查找到更新下面的設置
        UpdateRequest upsert = new UpdateRequest("twitter", "tweet", "1")
            .doc(XContentFactory.jsonBuilder()
                    .startObject()
                        .field("user", "wenbronk")
                    .endObject())
            .upsert(indexRequest);
        
        client.update(upsert).get();
    }
    
    /**
     * 測試multi get api
     * 從不一樣的index, type, 和id中獲取
     */
    @Test
    public void testMultiGet() {
        MultiGetResponse multiGetResponse = client.prepareMultiGet()
        .add("twitter", "tweet", "1")
        .add("twitter", "tweet", "2", "3", "4")
        .add("anothoer", "type", "foo")
        .get();
        
        for (MultiGetItemResponse itemResponse : multiGetResponse) {
            GetResponse response = itemResponse.getResponse();
            if (response.isExists()) {
                String sourceAsString = response.getSourceAsString();
                System.out.println(sourceAsString);
            }
        }
    }
    
    /**
     * bulk 批量執行
     * 一次查詢能夠update 或 delete多個document
     */
    @Test
    public void testBulk() throws Exception {
        BulkRequestBuilder bulkRequest = client.prepareBulk();
        bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
                .setSource(XContentFactory.jsonBuilder()
                        .startObject()
                            .field("user", "kimchy")
                            .field("postDate", new Date())
                            .field("message", "trying out Elasticsearch")
                        .endObject()));
        bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
                .setSource(XContentFactory.jsonBuilder()
                        .startObject()
                            .field("user", "kimchy")
                            .field("postDate", new Date())
                            .field("message", "another post")
                        .endObject()));
        BulkResponse response = bulkRequest.get();
        System.out.println(response.getHeaders());
    }
    
    /**
     * 使用bulk processor
     * @throws Exception 
     */
    @Test
    public void testBulkProcessor() throws Exception {
        // 建立BulkPorcessor對象
        BulkProcessor bulkProcessor = BulkProcessor.builder(client, new Listener() {
            public void beforeBulk(long paramLong, BulkRequest paramBulkRequest) {
                // TODO Auto-generated method stub
            }
            
            // 執行出錯時執行
            public void afterBulk(long paramLong, BulkRequest paramBulkRequest, Throwable paramThrowable) {
                // TODO Auto-generated method stub
            }
            
            public void afterBulk(long paramLong, BulkRequest paramBulkRequest, BulkResponse paramBulkResponse) {
                // TODO Auto-generated method stub
            }
        })
        // 1w次請求執行一次bulk
        .setBulkActions(10000)
        // 1gb的數據刷新一次bulk
        .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
        // 固定5s必須刷新一次
        .setFlushInterval(TimeValue.timeValueSeconds(5))
        // 併發請求數量, 0不併發, 1併發容許執行
        .setConcurrentRequests(1)
        // 設置退避, 100ms後執行, 最大請求3次
        .setBackoffPolicy(
                BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
        .build();
        
        // 添加單次請求
        bulkProcessor.add(new IndexRequest("twitter", "tweet", "1"));
        bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));
        
        // 關閉
        bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
        // 或者
        bulkProcessor.close();
    }
}
相關文章
相關標籤/搜索