這裏小編先將須要的pom.xml的依賴提供給你們:(根據本身的版本進行修改)html
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> <spark.version>2.3.2</spark.version> </properties> <dependencies> <dependency><!-- 依賴管理,有選擇的繼承--> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.10</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>6.2.0</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.10.3</version> </dependency> <dependency> <groupId>org.json</groupId> <artifactId>json</artifactId> <version>20180813</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-hadoop</artifactId> <version>6.2.4</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> </dependencies>
主要是提供一個Utils,經過讀取配置文件進行建立ES的編程入口。
#elasticSearch.confjava
elastic.host=192.168.130.131 elastic.port=9300 elastic.cluster.name=zzy-application
#Constantsnode
public interface Constants { String ELASTIC_HOST = "elastic.host"; String ELASTIC_PORT="elastic.port"; String ELASTIC_CLUSTER_NAME = "elastic.cluster.name"; }
#ElasticSearchUtilgit
import com.zy.es.constant.Constants; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.transport.client.PreBuiltTransportClient; import java.io.IOException; import java.io.InputStream; import java.net.InetAddress; import java.util.Properties; /** * 通常狀況下的工具類都是單例 * 裏面若干方法通常都是static * 若是在鏈接集羣的時候,集羣的名稱對應不上: * NoNodeAvailableException[None of the configured nodes are available: */ public class ElasticSearchUtil { private static TransportClient client; private static Properties ps; static { try { InputStream resourceAsStream = ElasticSearchUtil.class.getClassLoader().getResourceAsStream("elasticsearch.conf"); ps =new Properties(); ps.load(resourceAsStream); String host=ps.getProperty(Constants.ELASTIC_HOST); int port = Integer.parseInt(ps.getProperty(Constants.ELASTIC_PORT)); String clusterName=ps.getProperty(Constants.ELASTIC_CLUSTER_NAME); Settings settings =Settings.builder() .put("cluster.name",clusterName) .build(); client=new PreBuiltTransportClient(settings); //這裏能夠有多個,集羣模式 TransportAddress ta=new TransportAddress( InetAddress.getByName(host), port ); //addTransportAddresses(TransportAddress... transportAddress),參數爲一個可變參數 client.addTransportAddresses(ta); } catch (IOException e) { e.printStackTrace(); } } public static TransportClient getTransportClient(){ return client; } public static void close(TransportClient client){ if(client!=null){ client.close(); } } }
小編這裏提供了json、map、javabean、XContentBuilder四種建立方式。github
import java.util import com.zy.es.pojo.Book import com.zy.es.utils.ElasticSearchUtil import org.elasticsearch.action.index.IndexResponse import org.elasticsearch.cluster.metadata.MetaData.XContentContext import org.elasticsearch.common.xcontent.{XContentBuilder, XContentType} import org.elasticsearch.common.xcontent.json.JsonXContent import org.json.JSONObject object createIndex { private var index="library" private var `type`="books" private val client = ElasticSearchUtil.getTransportClient() def main(args: Array[String]): Unit = { createIndexByJson() //createIndexByMap() // createIndexByBean() // createIndexByXContentBuilder() //關閉es鏈接對象 ElasticSearchUtil.close(client) } /** * 1.經過json方式建立 * java.lang.IllegalArgumentException: The number of object passed must be even but was [1] * 在es5.x以上,使用XContentType.JSON來制定便可 *setSource(json.toString(),XContentType.JSON) 必須指定第二個參數。 */ def createIndexByJson()={ val json=new JSONObject json.put("name","我愛你中國") json.put("author","周迅") json.put("date","2018-6-6") //返回建立後的結果 var response: IndexResponse = client.prepareIndex(index, `type`, "9") .setSource(json.toString, XContentType.JSON).get() //查看版本 println(response.getVersion) } /** * 2.map方式 */ def createIndexByMap(): Unit ={ val sourceMap=new util.HashMap[String,String]() sourceMap.put("name","朝花夕拾") sourceMap.put("author","魯迅") sourceMap.put("date","2009-4-5") var response: IndexResponse = client.prepareIndex(index, `type`, "2").setSource(sourceMap) .get() //查看版本 println(response.getVersion) } /** * 3.使用普通的javabean */ def createIndexByBean()={ val book:Book=new Book("鬥破蒼穹","天蠶土豆","2012-2-6"); val json=new JSONObject(book) //返回建立後的結果 var response: IndexResponse = client.prepareIndex(index, `type`, "3") .setSource(json.toString, XContentType.JSON).get() //查看版本 println(response.getVersion) } /** * 4.XContentBuilder方式 */ def createIndexByXContentBuilder()={ var builder: XContentBuilder = JsonXContent.contentBuilder() builder.startObject() .field("name","西遊記") .field("author","吳承恩") .field("version","1.0") .endObject() var response: IndexResponse = client.prepareIndex(index, `type`,"4").setSource(builder) .get() println(response.getVersion) } }
小編這裏提供了刪除數據,更新數據,批量操做。sql
import java.util import com.zy.es.utils.ElasticSearchUtil import org.elasticsearch.action.bulk.BulkResponse import org.elasticsearch.action.delete.DeleteResponse import org.elasticsearch.action.update.{UpdateRequestBuilder, UpdateResponse} import org.elasticsearch.common.xcontent.{XContentBuilder, XContentType} import org.elasticsearch.common.xcontent.json.JsonXContent import org.json.JSONObject object ElasticsearchCRUD { private var index="library" private var `type`="books" private val client = ElasticSearchUtil.getTransportClient() def main(args: Array[String]): Unit = { //刪除數據 testDelete() //更新 //testUpdate() //批量操做 //testBulk() //關閉鏈接對象 ElasticSearchUtil.close(client) } //刪除數據 def testDelete()={ var response: DeleteResponse = client.prepareDelete(index, `type`, "2").get() println("version:"+response.getVersion) } //更新 def testUpdate()={ var builder: XContentBuilder = JsonXContent.contentBuilder() builder.startObject() .field("version","3.0") .endObject() var response: UpdateResponse = client.prepareUpdate(index, `type`, "4") .setDoc(builder).get() println("version:"+response.getVersion) } //批量操做 def testBulk()={ val map=new util.HashMap[String,String]() map.put("name","無雙") map.put("author","周潤發") map.put("version","2") val json=new JSONObject json.put("name","紅樓夢") json.put("author","曹雪芹") json.put("version","1.0") var responses: BulkResponse = client.prepareBulk().add(client.prepareIndex(index, `type`, "7") .setSource(map)) .add(client.prepareIndex(index, `type`, "8").setSource(json.toString(),XContentType.JSON)) .get() for(response <-responses.getItems){ print(response.getVersion) } } }
import java.util import com.zy.es.utils.ElasticSearchUtil import org.elasticsearch.action.search.{SearchResponse, SearchType} import org.elasticsearch.index.query.QueryBuilders import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder import org.elasticsearch.search.{SearchHit, SearchHits} import org.json.JSONObject import scala.collection.JavaConversions object testSearch { private var index="library" private var `type`="books" private val client = ElasticSearchUtil.getTransportClient() def main(args: Array[String]): Unit = { //全文索引 //fullTextSearch() //分頁索引 //pagingSearch() //高亮索引 highlightSearch() } //全文索引 def fullTextSearch()={ val json=new JSONObject() val response = client.prepareSearch(index) //設置檢索的類型 .setSearchType(SearchType.DEFAULT) //設置檢索的類型 .setQuery(QueryBuilders.matchQuery("author", "天蠶土豆")) //設置檢索方式 .get() val hits = response.getHits //獲取檢索結果 println("totals:"+hits.getTotalHits) //檢索出的數據的個數 println("maxSource"+hits.getMaxScore) //最大的得分 //查詢的具體的內容 val myhits = hits.getHits for(hit <- myhits){ val index = hit.getIndex val id = hit.getId val `type` = hit.getType val source =hit.getSourceAsString val score=hit.getScore json.put("_index",index) json.put("_id",id) json.put("_type",`type`) json.put("_score", score ) json.put("_source",new JSONObject(source)) println(json.toString()) } } //分頁索引 //分頁查詢:查詢第num頁,查count條 每一頁的長度*(num-1)+count def pagingSearch(from:Int=0,size:Int=10)={ var response: SearchResponse = client.prepareSearch(index) .setSearchType(SearchType.QUERY_THEN_FETCH) .setQuery(QueryBuilders.matchQuery("name", "西遊記")) .setFrom(from) .setSize(size) .get() val myhits: SearchHits = response.getHits val total=myhits.totalHits println("zzy爲您查詢出"+total+"記錄:") val hits: Array[SearchHit] = myhits.getHits for (hit<-hits){ val map: util.Map[String, AnyRef] = hit.getSourceAsMap val author=map.get("author") val name=map.get("name") val version=map.get("version") print( s""" |author:${author} |name:${name} |version:${version} """.stripMargin) } } //高亮索引 def highlightSearch()={ val response=client.prepareSearch(index) .setSearchType(SearchType.DEFAULT) .setQuery(QueryBuilders.matchQuery("author","周潤發")) .highlighter(new HighlightBuilder() .field("author")//給哪一個字段添加標籤 .preTags("<font color='red' size='20px'>")//添加的前置標籤 .postTags("</font>"))//添加的後置標籤 .get() val myHits = response.getHits val total = myHits.totalHits println("zzy爲您查詢出" + total + "記錄:") val hits: Array[SearchHit] = myHits.getHits for(hit <-hits){ //注意這裏若是想要獲取高亮的字段,必須使用高亮的方式獲取 val HLfields = hit.getHighlightFields //這裏的field是設置高亮的字段名:author highlight查詢的全部的字段值(含高亮的) for((field,highlight)<-JavaConversions.mapAsScalaMap(HLfields)){ var date="" val fragments=highlight.getFragments for(fragment <-fragments){ date+=fragment.toString } print(date) } } } }
首先咱們如今本身的ES集羣中添加一些數據:apache
#建立索引庫 curl -H "Content-Type: application/json" -XPUT 'http://192.168.130.131:9200/chinese' #添加數據 curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/1 -d'{"content":"美國留給伊拉克的是個爛攤子嗎"}' curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/2 -d'{"content":"公安部:各地校車將享最高路權"}' curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/3 -d'{"content":"中韓漁警衝突調查:韓警平均天天扣1艘中國漁船"}' curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/4 -d'{"content":"中國駐洛杉磯領事館遭亞裔男子槍擊 嫌犯已自首"}'
#而後使用不一樣的查詢看看效果:編程
import com.zy.es.utils.ElasticSearchUtil import org.elasticsearch.action.search.{SearchResponse, SearchType} import org.elasticsearch.index.query.QueryBuilders object ChineseParticipleSearch { private var index="chinese" private var `type`="fulltext" private val client = ElasticSearchUtil.getTransportClient() def main(args: Array[String]): Unit = { val response: SearchResponse =client.prepareSearch(index) .setSearchType(SearchType.QUERY_THEN_FETCH) .setQuery(QueryBuilders.matchQuery("content","中國")) .get() val myHits = response.getHits.getHits for(hit <- myHits){ println(hit.getSourceAsString) } } }
注意:咱們這裏使用match查詢,查詢了是「中國」
看看運行結果:
這裏爲何美國也會被查詢出來?
這是由於:原生的查詢將‘中國’這個兩個字分開以後在進行檢索,索引會出現上圖中的查詢錯誤的狀況。
那咱們該怎麼辦呢,我只想查詢出來有關中國的內容啊,不要緊中文分詞幫你解決。json
常見的中文分詞插件:IK,庖丁解牛中文分詞等等。這裏咱們使用IK分詞。
① 下載: https://github.com/medcl/elasticsearch-analysis-ik 版本對應
② 使用maven對源代碼進行編譯(在IK_HOME下):(mvn clean install -DskipTests)
③ 把編譯後的target/releases下的zip文件拷貝到 ES_HOME/plugins/analysis-ik目錄下面,而後解壓將其中的plugin-descriptor.properties 和plugin-security.policy文件中的ES的版本改成本身使用的版本
④ 修改ES_HOME/config/elasticsearch.yml文件,添加(ES6.x以上版本無需此操做)index.analysis.analyzer.default.type: ik
⑤ 重啓es服務
這裏小編就有些粗暴了:
#ps -aux|grep elasticsearch
#kill -9 pid
#/ES_HOME/bin/elasticsearch -d 啓動app
第一步: 將以前數據進行刪除
curl -XDELETE 'http://192.168.130.131:9200/chinese/1'
curl -XDELETE 'http://192.168.130.131:9200/chinese/2'
curl -XDELETE 'http://192.168.130.131:9200/chinese/3'
curl -XDELETE 'http://192.168.130.131:9200/chinese/4'
第二步: 從新加載數據,並設置爲IK分詞
#設置爲ik分詞
curl -XPOST http://192.168.130.131:9200/chinese/fulltext/_mapping -H 'Content-Type:application/json' -d'
{
"properties": {
"content": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_max_word"
}
}
}'
#添加數據 curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/1 -d'{"content":"美國留給伊拉克的是個爛攤子嗎"}' curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/2 -d'{"content":"公安部:各地校車將享最高路權"}' curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/3 -d'{"content":"中韓漁警衝突調查:韓警平均天天扣1艘中國漁船"}' curl -H "Content-Type: application/json" -XPOST http://192.168.130.131:9200/chinese/fulltext/4 -d'{"content":"中國駐洛杉磯領事館遭亞裔男子槍擊 嫌犯已自首"}'
第三步:
從新執行剛剛上面的代碼,這裏咱們看看結果:
整合條件:
ES官網:
https://www.elastic.co/guide/en/elasticsearch/hadoop/current/install.html
maven依賴:https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-hadoop/6.2.4
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-hadoop</artifactId> <version>6.2.4</version> </dependency>
//若是使用spark中能夠讀到ES中的數據,須要導入隱式轉換 import java.util.Date import com.zy.es.utils.ElasticSearchUtil import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import org.elasticsearch.cluster.metadata.MetaData.XContentContext import org.elasticsearch.common.xcontent.XContentType import org.elasticsearch.spark._ /** * spark整合ES * 經過spark去讀取es中的數據,同時將操做以後的結果落地到ES */ object EsOnSpark { private val client = ElasticSearchUtil.getTransportClient() def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("EsOnSpark") .setMaster("local[2]") .set("es.index.auto.create", "true") //寫數據的時候若是索引庫不存在,自動建立 .set("es.nodes", "192.168.130.131") //設置ES集羣的節點 .set("es.port", "9200") //設置ES集羣的端口 val sc = new SparkContext(conf) var EsRDD: RDD[(String, String)] = sc.esJsonRDD("library/books") //指定index/type var index = "es-spark" var `type` = "book" EsRDD.foreach { case (id, json) => { client.prepareIndex(index, `type`, new Date().getTime.toString) .setSource(json, XContentType.JSON).get() println(id + "" + json) } } sc.stop() } }
這裏只是小編介紹一些常見的API操做,你們知道ES最大的優點在於他的查詢,後期小編會進一步的補充關於ElasticSearch強大的查詢功能的API。