Elasticsearch java api操做(一)(Java Low Level Rest Client)

1、說明:html

  1、Elasticsearch提供了兩個JAVA REST Client版本:java

  一、java low level rest client:web

  低級別的rest客戶端,經過http與集羣交互,用戶需本身編組請求JSON串,及解析響應JSON串。兼容全部Elasticsearch版本。apache

  特色:maven引入json

  使用介紹: https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-low.htmlapi

  二、java high rest client:app

  高級別的REST客戶端,基於低級別的REST客戶端,增長了編組請求JSON串、解析響應JSON串等相關API,使用的版本須要保存和ES服務一致的版本,不然會有版本問題。異步

  從6.0.0開始加入的,目的是以java面向對象的方式進行請求、響應處理。async

  每一個API支持 同步、異步 兩種方式,同步方法之間返回一個結果對象。異步的方法以async爲後綴,經過listener參數來通知結果。高級java resy客戶端依賴Elasticsearch core pprojectelasticsearch

  兼容性說明:

  依賴jdk1.8和Elasticsearch core project

2、Java Low Level Rest Client的使用

版本:

Elasticsearch 6.3.1

pom文件:

 <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>6.3.1</version>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>6.3.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>2.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.7</version>
        </dependency>

        <dependency>
            <groupId>net.sf.json-lib</groupId>
            <artifactId>json-lib</artifactId>
            <version>0.9</version>
 </dependency>

 

1、構建elasicsearch client工具類

import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import java.net.InetAddress;

/**
 * @Author: xiaolaotou
 * @Date: 2019/4/19
 */

/**
 * 構建elasticsrarch client
 */
public class ClientUtil {
    private static TransportClient client;
    public TransportClient CreateClient() throws Exception {
        // 先構建client
        System.out.println("11111111111");
        Settings settings=Settings.builder()
                .put("cluster.name","elasticsearch1")
                .put("client.transport.ignore_cluster_name", true)  //若是集羣名不對,也能鏈接
                .build();
        //建立Client
        TransportClient client = new PreBuiltTransportClient(settings)
                .addTransportAddress(
                        new TransportAddress(
                                InetAddress.getByName(
                                        "192.168.200.100"),
                                9300));
        return client;
    }
}

2、測試類

import net.sf.json.JSONObject;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHits;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;


/**
 * @Author: xiaolaotou
 * @Date: 2019/4/19
 * ElasticSearch 6.3.1
 */
public class Test {

    private static TransportClient client;

    static {
        try {
            client = new ClientUtil().CreateClient();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws Exception {


        //建立索引
//        createEmployee();
        //根據inde,type,id查詢一個document的data
//        FindIndex();
//        CreateJsonIndex();
        //批量導入
//        BulkCreateIndex();

        //批量導出
//        OutData();
        //建立帶ik分詞的index
//        CreateIndexIkTest();

        //更新索引
//        UpdateIndex();
//        createIndex2();
//        Search();
          get();
    }

    /**
     * 建立索引,普通格式
     *
     * @throws Exception
     */
    public static void createEmployee() throws Exception {
        IndexResponse response = client.prepareIndex("student", "doc", "1")
                .setSource(jsonBuilder()
                        .startObject()
                        .field("name", "jack")
                        .field("age", 27)
                        .field("position", "technique")
                        .field("country", "china")
                        .field("join_date", "2017-01-01")
                        .field("salary", 10000)
                        .endObject())
                .get();
        System.out.println("建立成功!");
    }

 

/**
     * 根據 index ,type,id查詢
     *
     * @throws Exception
     */
    public static void FindIndex() throws Exception {
        GetResponse getResponse = client.prepareGet("student", "doc", "1").get();
        System.out.println(getResponse.getSourceAsString());
    }
/**
     * 建立索引,JSON
     *
     * @throws IOException
     */
    public static void CreateJsonIndex() throws IOException {
        JSONObject json = new JSONObject();
        json.put("user", "小明");
        json.put("title", "Java Engineer");
        json.put("desc", "web 開發");
        IndexResponse response = client.prepareIndex("studentjson", "doc", "1")
                .setSource(json, XContentType.JSON)
                .get();
        String _index = response.getIndex();
        System.out.println(_index);
    }
/**
     * elasticsearch批量導入
     */
    public static void BulkCreateIndex() {
        BulkRequestBuilder builder = client.prepareBulk();
        for (int i = 0; i < 100000; i++) {
            HashMap<String, Object> map = new HashMap<>();
            map.put("recordtime", "11");
            map.put("area", "22");
            map.put("usertype", "33");
            map.put("count", 44);
            builder.add(client.prepareIndex("bulktest", "1").setSource(map));
            //每10000條提交一次
            if (i % 10000 == 0) {
                builder.execute().actionGet();
                builder = client.prepareBulk();
            }
        }
    }
/**
     * 批量導出
     */
    public static void OutData() throws IOException {
        SearchResponse response = client.prepareSearch("bulktest").setTypes("1")
                .setQuery(QueryBuilders.matchAllQuery())
                .setSize(10000).setScroll(new TimeValue(600000))
                .setSearchType(SearchType.DEFAULT).execute().actionGet();
        // setScroll(new TimeValue(600000)) 設置滾動的時間
        String scrollid = response.getScrollId();
        //把導出的結果以JSON的格式寫到文件裏

        //每次返回數據10000條。一直循環查詢知道全部的數據都被查詢出來
        while (true) {
            SearchResponse response2 = client.prepareSearchScroll(scrollid).setScroll(new TimeValue(1000000))
                    .execute().actionGet();
            SearchHits searchHit = response2.getHits();
            //再次查詢不到數據時跳出循環
            if (searchHit.getHits().length == 0) {
                break;
            }
            System.out.println("查詢數量 :" + searchHit.getHits().length);
            for (int i = 0; i < searchHit.getHits().length; i++) {
                String json = searchHit.getHits()[i].getSourceAsString();
                putData(json);
            }
            System.out.println("查詢結束");
        }
    }
public static void putData(String json) throws IOException {
        String str = json + "\n";
        //寫入本地文件
        String fileTxt = "D:\\data.txt";
        File file = new File(fileTxt);
        if (!file.getParentFile().exists()) {
            file.getParentFile().mkdirs();
        }
        if (!file.exists()) {
            file.createNewFile();
            FileWriter fw = new FileWriter(file, true);
            BufferedWriter bw = new BufferedWriter(fw);
            System.out.println("寫入完成啦啊");
            bw.write(String.valueOf(str));
            bw.flush();
            bw.close();
            fw.close();
        } else {
            FileWriter fw = new FileWriter(file, true);
            BufferedWriter bw = new BufferedWriter(fw);
            System.out.println("追加寫入完成啦啦");
            bw.write(String.valueOf(str));
            bw.flush();
            bw.close();
            fw.close();
        }
    }


    /**
     * 建立索引,並給某些字段指定ik分詞器,之後向該索引中查詢時,就會用ik分詞
     */
    public static void CreateIndexIkTest() throws Exception {
        //建立映射
        XContentBuilder mapping = XContentFactory.jsonBuilder()
                .startObject()
                .startObject("properties")
                //title:字段名,  type:文本類型       analyzer :分詞器類型
                .startObject("title").field("type", "text").field("analyzer", "ik_smart").endObject()   //該字段添加的內容,查詢時將會使用ik_smart分詞
                .startObject("content").field("type", "text").field("analyzer", "ik_max_word").endObject()
                .endObject()
                .endObject();

        //index:索引名   type:類型名(能夠本身定義)
        PutMappingRequest putmap = Requests.putMappingRequest("index").type("type").source(mapping);
        //建立索引
        client.admin().indices().prepareCreate("index").execute().actionGet();
        //爲索引添加映射
        client.admin().indices().putMapping(putmap).actionGet();

        //調用下面的方法爲建立的索引添加內容
        CreateIndex1();
    }

    //這個方法是爲上一步建立的索引中添加內容,包括id,id不能重複
    public static void CreateIndex1() throws IOException {
        IndexResponse response = client.prepareIndex("index", "type", "1") //索引,類型,id
                .setSource(jsonBuilder()
                        .startObject()
                        .field("title", "title")   //字段,值
                        .field("content", "content")
                        .endObject()
                ).get();
    }

 

/**
     * 更新索引
     */
    //更新索引,更新剛纔建立的索引,若是id相同將會覆蓋掉剛纔的內容
    public static void UpdateIndex() throws Exception {
        //每次添加id應該不一樣,至關於數據表中的主鍵,相同的話將會進行覆蓋
        UpdateResponse response=client.update(new UpdateRequest("index","type","1")
        .doc(XContentFactory.jsonBuilder()
            .startObject()
                .field("title","中華人民共和國國歌,國歌是最好聽的歌")
                .field("content","中華人民共和國國歌,國歌是最好聽的歌")
                .endObject()
        )).get();
    }

    //再插入一條數據
    public static void createIndex2() throws IOException {
        IndexResponse response = client.prepareIndex("index", "type", "2")
                .setSource(jsonBuilder()
                        .startObject()
                        .field("title", "中華民族是偉大的民族")
                        .field("content", "中華民族是偉大的民族")
                        .endObject()
                ).get();
    }

    /**
     * 下面使用index索引下的2個document進行查詢
     */
    public static  void  Search(){
        SearchResponse response1 = client.prepareSearch( "index")  //指定多個索引
                .setTypes("type")  //指定類型
                .setSearchType(SearchType.QUERY_THEN_FETCH)
                .setQuery(QueryBuilders.matchQuery("title", "中華人民共和國國歌"))  // Query
//                .setPostFilter(QueryBuilders.rangeQuery("age").from(12).to(18))     // Filter
                .setFrom(0).setSize(60).setExplain(true)
                .get();
        long totalHits1= response1.getHits().totalHits;  //命中個數
        System.out.println("response1======="+totalHits1);

        SearchResponse response2 = client.prepareSearch( "index")  //指定多個索引
                .setTypes("type")  //指定類型
                .setSearchType(SearchType.QUERY_THEN_FETCH)
                .setQuery(QueryBuilders.matchQuery("content", "中華人民共和國國歌"))  // Query
//                .setPostFilter(QueryBuilders.rangeQuery("age").from(12).to(18))     // Filter
                .setFrom(0).setSize(60).setExplain(true)
                .get();
        long totalHits2 = response2.getHits().totalHits;  //命中個數
        System.out.println("response2========="+totalHits2);
    }

    /**
     * GET操做
     */
    public static void get() {
        GetResponse response = client.prepareGet("index", "type", "2").get();
        Map<String, Object> source = response.getSource();
        Set<String> strings = source.keySet();
        Iterator<String> iterator = strings.iterator();
        while (iterator.hasNext()) {
            System.out.println(source.get(iterator.next()));
        }
    }
}
相關文章
相關標籤/搜索