ES提供了支持包來方便的操做ES。首先添加ES的依賴maven:
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>6.2.0</version>
<exclusions>
<exclusion>
<artifactId>log4j-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>node
而後在Spark程序中設置SparkConf,將ES的屬性設置好:elasticsearch
import org.elasticsearch.spark._maven
val sparkconf = new SparkConf().setAppName("sevs_spark3")
.set("spark.driver.userClassPathFirst", "true")
.set("spark.executor.userClassPathFirst", "true")
.set("HADOOP_USER_NAME", getProp("hbase.hadoop.username"))
.set("HADOOP_GROUP_NAME", getProp("hbase.hadoop.groupname"))
.set("es.index.auto.create", "true")
.set("es.nodes", "127.0.0.1")
.set("es.port", "9200")
.setMaster("local")oop
最後經過esRDD來讀寫ES,很是方便spa
def read_es(sc:SparkContext){
val rdd = sc.esRDD("test/login")
rdd.foreach(x=>{
println("######",x._1,x._2 )
})
}
def save_es(sc:SparkContext){
sc.parallelize(Seq("abc","def")).map(x=>{
val map = Map("hostIp" -> x, "remoteIp" -> x.concat("#"))
map
}).saveToEs("snprime_login/login")
}hadoop
Spark操做ES就是這麼簡單,趕快來試試吧。rem