spark讀寫ES數據

主要的maven文件node

  <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark-20_2.11</artifactId>
            <version>5.6.3</version>
  </dependency>

*以前被ES的jar包坑過。由於引入的jar包有問題,一直引入不成功,按照上面的配置成功了。上面的5.6.3是我安裝ES的版本sql

ES03文件單機版
package SparkES
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark.rdd.EsSpark

object ES03 {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setAppName("DecisionTree1").setMaster("local")
    sparkConf.set("cluster.name", "es")
    sparkConf.set("es.index.auto.create", "true")
    sparkConf.set("es.nodes", "192.168.1.1*5")
    sparkConf.set("es.port", "9200")
    sparkConf.set("es.index.read.missing.as.empty","true")
    sparkConf.set("es.net.http.auth.user", "elastic") //訪問es的用戶名
    sparkConf.set("es.net.http.auth.pass", "changeme") //訪問es的密碼
    sparkConf.set("es.nodes.wan.only","true")
    val sc = new SparkContext(sparkConf)
    //write2Es(sc) //寫
    read4Es(sc);   //讀 
  }

  def write2Es(sc: SparkContext) = {
    val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
    val airports = Map("OTP" -> "Otopeni", "SFO" -> "San Fran")
    var rdd = sc.makeRDD(Seq(numbers, airports))
    EsSpark.saveToEs(rdd, "book/no")
    println("--------------------End-----------------")
  }

  def read4Es(sc: SparkContext) {
    val rdd = EsSpark.esRDD(sc, "forum/article","?q=this*")

    println("------------------rdd.count():" + rdd.count())
    rdd.foreach(line => {
      val key = line._1
      val value = line._2
      println("------------------key:" + key)
      for (tmp <- value) {
        val key1 = tmp._1
        val value1 = tmp._2
        println("------------------key1:" + key1)
        println("------------------value1:" + value1)
      }
    })
  }
 
}

運行結果apache

下面是另外一個實現讀的,但有報錯,沒有上面的好elasticsearch

package SparkES

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark.rdd.EsSpark

object E2SExample {
  def main(args: Array[String]) {
    var conf = new SparkConf()
    conf.setAppName("S2EExample")
//    conf.setMaster("spark://master:7077")
    conf.setMaster("local")
    conf.set("es.nodes", "192.168.1.1*5:9200")
    val sc = new SparkContext(conf)

    val rdd = EsSpark.esRDD(sc, "forum/article", "?q=this*")
    println("rdd count: " + rdd.count())

    rdd.collect().foreach(record => {
      print(record._1 + ":")
      for ((k, v) <- record._2) {
        print(k + ":" + v)
      }
      println()
    })

    val sqlContext = new SQLContext(sc)
    val df = sqlContext.read.format("org.elasticsearch.spark.sql").load("forum/article")
    df.printSchema()
    df.collect().foreach(println)

    sc.stop()
  }
}
相關文章
相關標籤/搜索