ElasticSearch的API(java/scala)

這裏小編先將須要的pom.xml的依賴提供給你們:(根據本身的版本進行修改)html

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
        <spark.version>2.3.2</spark.version>
    </properties>

    <dependencies>
        <dependency><!-- 依賴管理,有選擇的繼承-->
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.10</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>6.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.10.3</version>
        </dependency>
        <dependency>
            <groupId>org.json</groupId>
            <artifactId>json</artifactId>
            <version>20180813</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-hadoop</artifactId>
            <version>6.2.4</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>

1. 建立ES的編程入口

  主要是提供一個Utils,經過讀取配置文件進行建立ES的編程入口。
#elasticSearch.confjava

elastic.host=192.168.130.131
elastic.port=9300
elastic.cluster.name=zzy-application

#Constantsnode

public interface Constants {
    String ELASTIC_HOST = "elastic.host";
    String ELASTIC_PORT="elastic.port";
    String ELASTIC_CLUSTER_NAME = "elastic.cluster.name";
}

#ElasticSearchUtilgit

import com.zy.es.constant.Constants;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.util.Properties;

/**
 * 通常狀況下的工具類都是單例
 * 裏面若干方法通常都是static
 * 若是在鏈接集羣的時候,集羣的名稱對應不上:
 *  NoNodeAvailableException[None of the configured nodes are available:
 */
public class ElasticSearchUtil {
    private static TransportClient client;
    private static Properties ps;
    static {
        try {
            InputStream resourceAsStream = ElasticSearchUtil.class.getClassLoader().getResourceAsStream("elasticsearch.conf");
            ps =new Properties();
            ps.load(resourceAsStream);
            String host=ps.getProperty(Constants.ELASTIC_HOST);
            int port = Integer.parseInt(ps.getProperty(Constants.ELASTIC_PORT));
            String clusterName=ps.getProperty(Constants.ELASTIC_CLUSTER_NAME);
            Settings settings =Settings.builder()
                    .put("cluster.name",clusterName)
                    .build();
            client=new PreBuiltTransportClient(settings);
            //這裏能夠有多個,集羣模式
            TransportAddress ta=new TransportAddress(
                    InetAddress.getByName(host),
                    port
            );
            //addTransportAddresses(TransportAddress... transportAddress),參數爲一個可變參數
            client.addTransportAddresses(ta);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public static TransportClient getTransportClient(){
        return client;
    }
    public static void close(TransportClient client){
        if(client!=null){
            client.close();
        }
    }
}

2. 建立索引

  小編這裏提供了json、map、javabean、XContentBuilder四種建立方式。github

import java.util

import com.zy.es.pojo.Book
import com.zy.es.utils.ElasticSearchUtil
import org.elasticsearch.action.index.IndexResponse
import org.elasticsearch.cluster.metadata.MetaData.XContentContext
import org.elasticsearch.common.xcontent.{XContentBuilder, XContentType}
import org.elasticsearch.common.xcontent.json.JsonXContent
import org.json.JSONObject

object createIndex {
  private var index="library"
  private var `type`="books"
  private val client = ElasticSearchUtil.getTransportClient()
  def main(args: Array[String]): Unit = {
    createIndexByJson()
    //createIndexByMap()
   // createIndexByBean()
   // createIndexByXContentBuilder()
    //關閉es鏈接對象
    ElasticSearchUtil.close(client)
  }

  /**
    * 1.經過json方式建立
    * java.lang.IllegalArgumentException: The number of object passed must be even but was [1]
    * 在es5.x以上,使用XContentType.JSON來制定便可
    *setSource(json.toString(),XContentType.JSON)  必須指定第二個參數。
    */
  def createIndexByJson()={
    val json=new JSONObject
    json.put("name","我愛你中國")
    json.put("author","周迅")
    json.put("date","2018-6-6")
    //返回建立後的結果
    var response: IndexResponse = client.prepareIndex(index, `type`, "9")
      .setSource(json.toString, XContentType.JSON).get()
    //查看版本
    println(response.getVersion)
  }

  /**
    * 2.map方式
    */
  def createIndexByMap(): Unit ={
    val sourceMap=new util.HashMap[String,String]()
    sourceMap.put("name","朝花夕拾")
    sourceMap.put("author","魯迅")
    sourceMap.put("date","2009-4-5")

    var response: IndexResponse = client.prepareIndex(index, `type`, "2").setSource(sourceMap)
      .get()
    //查看版本
    println(response.getVersion)
  }

  /**
    * 3.使用普通的javabean
    */
  def createIndexByBean()={
    val book:Book=new Book("鬥破蒼穹","天蠶土豆","2012-2-6");
    val json=new JSONObject(book)
    //返回建立後的結果
    var response: IndexResponse = client.prepareIndex(index, `type`, "3")
      .setSource(json.toString, XContentType.JSON).get()
    //查看版本
    println(response.getVersion)
  }

  /**
    * 4.XContentBuilder方式
    */
  def createIndexByXContentBuilder()={
    var builder: XContentBuilder = JsonXContent.contentBuilder()
    builder.startObject()
      .field("name","西遊記")
      .field("author","吳承恩")
      .field("version","1.0")
      .endObject()
    var response: IndexResponse = client.prepareIndex(index, `type`,"4").setSource(builder)
      .get()
    println(response.getVersion)
  }
}

3.刪除數據 & 更新數據 &批量處理

  小編這裏提供了刪除數據,更新數據,批量操做。sql

import java.util

import com.zy.es.utils.ElasticSearchUtil
import org.elasticsearch.action.bulk.BulkResponse
import org.elasticsearch.action.delete.DeleteResponse
import org.elasticsearch.action.update.{UpdateRequestBuilder, UpdateResponse}
import org.elasticsearch.common.xcontent.{XContentBuilder, XContentType}
import org.elasticsearch.common.xcontent.json.JsonXContent
import org.json.JSONObject

object ElasticsearchCRUD {
  private var index="library"
  private var `type`="books"
  private val client = ElasticSearchUtil.getTransportClient()
  def main(args: Array[String]): Unit = {
    //刪除數據
    testDelete()
    //更新
    //testUpdate()
    //批量操做
    //testBulk()

    //關閉鏈接對象
    ElasticSearchUtil.close(client)
  }
  //刪除數據
  def testDelete()={
    var response: DeleteResponse = client.prepareDelete(index, `type`, "2").get()
    println("version:"+response.getVersion)
  }
  //更新
  def testUpdate()={
    var builder: XContentBuilder = JsonXContent.contentBuilder()
    builder.startObject()
      .field("version","3.0")
      .endObject()
    var response: UpdateResponse  = client.prepareUpdate(index, `type`, "4")
      .setDoc(builder).get()
    println("version:"+response.getVersion)
  }

  //批量操做
  def testBulk()={
    val map=new util.HashMap[String,String]()
    map.put("name","無雙")
    map.put("author","周潤發")
    map.put("version","2")
    val json=new JSONObject
    json.put("name","紅樓夢")
    json.put("author","曹雪芹")
    json.put("version","1.0")
    var responses: BulkResponse = client.prepareBulk().add(client.prepareIndex(index, `type`, "7")
      .setSource(map))
      .add(client.prepareIndex(index, `type`, "8").setSource(json.toString(),XContentType.JSON))
      .get()
    for(response <-responses.getItems){
      print(response.getVersion)
    }
  }
}

4.全文索引、分頁索引、高亮顯示

import java.util

import com.zy.es.utils.ElasticSearchUtil
import org.elasticsearch.action.search.{SearchResponse, SearchType}
import org.elasticsearch.index.query.QueryBuilders
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder
import org.elasticsearch.search.{SearchHit, SearchHits}
import org.json.JSONObject

import scala.collection.JavaConversions

object testSearch {
  private var index="library"
  private var `type`="books"
  private val client = ElasticSearchUtil.getTransportClient()
  def main(args: Array[String]): Unit = {
    //全文索引
    //fullTextSearch()
    //分頁索引
    //pagingSearch()
    //高亮索引
    highlightSearch()
  }
  //全文索引
  def fullTextSearch()={
    val json=new JSONObject()
    val response = client.prepareSearch(index) //設置檢索的類型
      .setSearchType(SearchType.DEFAULT) //設置檢索的類型
      .setQuery(QueryBuilders.matchQuery("author", "天蠶土豆")) //設置檢索方式
      .get()

    val hits = response.getHits  //獲取檢索結果
    println("totals:"+hits.getTotalHits)  //檢索出的數據的個數
    println("maxSource"+hits.getMaxScore) //最大的得分
    //查詢的具體的內容
    val myhits = hits.getHits
    for(hit <- myhits){
      val index = hit.getIndex
      val id = hit.getId
      val `type` = hit.getType
      val source =hit.getSourceAsString
      val score=hit.getScore
      json.put("_index",index)
      json.put("_id",id)
      json.put("_type",`type`)
      json.put("_score", score )
      json.put("_source",new JSONObject(source))
      println(json.toString())
    }
  }

  //分頁索引
  //分頁查詢:查詢第num頁,查count條   每一頁的長度*(num-1)+count
  def pagingSearch(from:Int=0,size:Int=10)={
    var response: SearchResponse = client.prepareSearch(index)
      .setSearchType(SearchType.QUERY_THEN_FETCH)
      .setQuery(QueryBuilders.matchQuery("name", "西遊記"))
      .setFrom(from)
      .setSize(size)
      .get()
    val myhits: SearchHits = response.getHits
    val total=myhits.totalHits
    println("zzy爲您查詢出"+total+"記錄:")
    val hits: Array[SearchHit] = myhits.getHits
    for (hit<-hits){
      val map: util.Map[String, AnyRef] = hit.getSourceAsMap
      val author=map.get("author")
      val name=map.get("name")
      val version=map.get("version")
      print(
        s"""
           |author:${author}
           |name:${name}
           |version:${version}
         """.stripMargin)
    }
  }

  //高亮索引
   def highlightSearch()={
     val response=client.prepareSearch(index)
       .setSearchType(SearchType.DEFAULT)
       .setQuery(QueryBuilders.matchQuery("author","周潤發"))
       .highlighter(new HighlightBuilder()
         .field("author")//給哪一個字段添加標籤
         .preTags("<font color='red' size='20px'>")//添加的前置標籤
         .postTags("</font>"))//添加的後置標籤
            .get()
     val myHits = response.getHits
     val total = myHits.totalHits
     println("zzy爲您查詢出" + total + "記錄:")
     val hits: Array[SearchHit] = myHits.getHits
     for(hit <-hits){
       //注意這裏若是想要獲取高亮的字段,必須使用高亮的方式獲取
       val HLfields = hit.getHighlightFields
       //這裏的field是設置高亮的字段名:author  highlight查詢的全部的字段值(含高亮的)
       for((field,highlight)<-JavaConversions.mapAsScalaMap(HLfields)){
         var date=""
         val fragments=highlight.getFragments
         for(fragment <-fragments){
           date+=fragment.toString
         }
         print(date)
       }
     }
   }
}

5. 中文分詞

(1)錯誤演示

首先咱們如今本身的ES集羣中添加一些數據:apache

#建立索引庫
curl -H "Content-Type: application/json" -XPUT 'http://192.168.130.131:9200/chinese'
#添加數據
curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/1 -d'{"content":"美國留給伊拉克的是個爛攤子嗎"}'
curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/2 -d'{"content":"公安部:各地校車將享最高路權"}'
curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/3 -d'{"content":"中韓漁警衝突調查:韓警平均天天扣1艘中國漁船"}'
curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/4 -d'{"content":"中國駐洛杉磯領事館遭亞裔男子槍擊 嫌犯已自首"}'

#而後使用不一樣的查詢看看效果:編程

import com.zy.es.utils.ElasticSearchUtil
import org.elasticsearch.action.search.{SearchResponse, SearchType}
import org.elasticsearch.index.query.QueryBuilders

object ChineseParticipleSearch {
  private var index="chinese"
  private var `type`="fulltext"
  private val client = ElasticSearchUtil.getTransportClient()
  def main(args: Array[String]): Unit = {
    val response: SearchResponse =client.prepareSearch(index)
      .setSearchType(SearchType.QUERY_THEN_FETCH)
      .setQuery(QueryBuilders.matchQuery("content","中國"))
      .get()
    val myHits = response.getHits.getHits
    for(hit <- myHits){
      println(hit.getSourceAsString)
    }
  }
}

注意:咱們這裏使用match查詢,查詢了是「中國」
看看運行結果:
ElasticSearch的API(java/scala)
這裏爲何美國也會被查詢出來?
這是由於:原生的查詢將‘中國’這個兩個字分開以後在進行檢索,索引會出現上圖中的查詢錯誤的狀況。
那咱們該怎麼辦呢,我只想查詢出來有關中國的內容啊,不要緊中文分詞幫你解決。json

(2)ES配置中文分詞

  常見的中文分詞插件:IK,庖丁解牛中文分詞等等。這裏咱們使用IK分詞。
① 下載: https://github.com/medcl/elasticsearch-analysis-ik 版本對應
ElasticSearch的API(java/scala)
② 使用maven對源代碼進行編譯(在IK_HOME下):(mvn clean install -DskipTests)
③ 把編譯後的target/releases下的zip文件拷貝到 ES_HOME/plugins/analysis-ik目錄下面,而後解壓將其中的plugin-descriptor.properties 和plugin-security.policy文件中的ES的版本改成本身使用的版本
④ 修改ES_HOME/config/elasticsearch.yml文件,添加(ES6.x以上版本無需此操做)index.analysis.analyzer.default.type: ik
⑤ 重啓es服務
這裏小編就有些粗暴了:
#ps -aux|grep elasticsearch
#kill -9 pid
#/ES_HOME/bin/elasticsearch -d 啓動app

(3)從新測試

第一步: 將以前數據進行刪除
curl -XDELETE 'http://192.168.130.131:9200/chinese/1'
curl -XDELETE 'http://192.168.130.131:9200/chinese/2'
curl -XDELETE 'http://192.168.130.131:9200/chinese/3'
curl -XDELETE 'http://192.168.130.131:9200/chinese/4'
第二步: 從新加載數據,並設置爲IK分詞
#設置爲ik分詞
curl -XPOST http://192.168.130.131:9200/chinese/fulltext/_mapping -H 'Content-Type:application/json' -d'
{
"properties": {
"content": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_max_word"
}
}
}'

#添加數據
    curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/1 -d'{"content":"美國留給伊拉克的是個爛攤子嗎"}'
    curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/2 -d'{"content":"公安部:各地校車將享最高路權"}'
    curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/3 -d'{"content":"中韓漁警衝突調查:韓警平均天天扣1艘中國漁船"}'
    curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/4 -d'{"content":"中國駐洛杉磯領事館遭亞裔男子槍擊 嫌犯已自首"}'

第三步
從新執行剛剛上面的代碼,這裏咱們看看結果:
ElasticSearch的API(java/scala)

6.Elasticsearch On Spark

整合條件:
ES官網:
https://www.elastic.co/guide/en/elasticsearch/hadoop/current/install.html
maven依賴:https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-hadoop/6.2.4

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-hadoop</artifactId>
    <version>6.2.4</version>
</dependency>
//若是使用spark中能夠讀到ES中的數據,須要導入隱式轉換
import java.util.Date

import com.zy.es.utils.ElasticSearchUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.cluster.metadata.MetaData.XContentContext
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.spark._

/**
  * spark整合ES
  * 經過spark去讀取es中的數據,同時將操做以後的結果落地到ES
  */

object EsOnSpark {

  private val client = ElasticSearchUtil.getTransportClient()

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("EsOnSpark")
      .setMaster("local[2]")
      .set("es.index.auto.create", "true") //寫數據的時候若是索引庫不存在,自動建立
      .set("es.nodes", "192.168.130.131") //設置ES集羣的節點
      .set("es.port", "9200") //設置ES集羣的端口

    val sc = new SparkContext(conf)
    var EsRDD: RDD[(String, String)] = sc.esJsonRDD("library/books") //指定index/type
    var index = "es-spark"
    var `type` = "book"
    EsRDD.foreach { case (id, json) => {
      client.prepareIndex(index, `type`, new Date().getTime.toString)
        .setSource(json, XContentType.JSON).get()
      println(id + "" + json)
    }
    }
    sc.stop()
  }
}

這裏只是小編介紹一些常見的API操做,你們知道ES最大的優點在於他的查詢,後期小編會進一步的補充關於ElasticSearch強大的查詢功能的API。

相關文章
相關標籤/搜索