主要的maven文件node
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-20_2.11</artifactId> <version>5.6.3</version> </dependency>
*以前被ES的jar包坑過。由於引入的jar包有問題,一直引入不成功,按照上面的配置成功了。上面的5.6.3是我安裝ES的版本sql
ES03文件單機版
package SparkES import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext} import org.elasticsearch.spark.rdd.EsSpark object ES03 { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("DecisionTree1").setMaster("local") sparkConf.set("cluster.name", "es") sparkConf.set("es.index.auto.create", "true") sparkConf.set("es.nodes", "192.168.1.1*5") sparkConf.set("es.port", "9200") sparkConf.set("es.index.read.missing.as.empty","true") sparkConf.set("es.net.http.auth.user", "elastic") //訪問es的用戶名 sparkConf.set("es.net.http.auth.pass", "changeme") //訪問es的密碼 sparkConf.set("es.nodes.wan.only","true") val sc = new SparkContext(sparkConf) //write2Es(sc) //寫 read4Es(sc); //讀 } def write2Es(sc: SparkContext) = { val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3) val airports = Map("OTP" -> "Otopeni", "SFO" -> "San Fran") var rdd = sc.makeRDD(Seq(numbers, airports)) EsSpark.saveToEs(rdd, "book/no") println("--------------------End-----------------") } def read4Es(sc: SparkContext) { val rdd = EsSpark.esRDD(sc, "forum/article","?q=this*") println("------------------rdd.count():" + rdd.count()) rdd.foreach(line => { val key = line._1 val value = line._2 println("------------------key:" + key) for (tmp <- value) { val key1 = tmp._1 val value1 = tmp._2 println("------------------key1:" + key1) println("------------------value1:" + value1) } }) } }
運行結果apache
下面是另外一個實現讀的,但有報錯,沒有上面的好elasticsearch
package SparkES import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} import org.elasticsearch.spark.rdd.EsSpark object E2SExample { def main(args: Array[String]) { var conf = new SparkConf() conf.setAppName("S2EExample") // conf.setMaster("spark://master:7077") conf.setMaster("local") conf.set("es.nodes", "192.168.1.1*5:9200") val sc = new SparkContext(conf) val rdd = EsSpark.esRDD(sc, "forum/article", "?q=this*") println("rdd count: " + rdd.count()) rdd.collect().foreach(record => { print(record._1 + ":") for ((k, v) <- record._2) { print(k + ":" + v) } println() }) val sqlContext = new SQLContext(sc) val df = sqlContext.read.format("org.elasticsearch.spark.sql").load("forum/article") df.printSchema() df.collect().foreach(println) sc.stop() } }