spark操做elasticsearch

本次測試爲本地單機版的elasticsearch和sparkhtml

配置:spark2.2.0,elasticsearch1.7.2(集羣)或者elasticsearch6.6.1(單機版),sdk2.11.1node

pom依賴:mysql

<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.11.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.2.0</version>
</dependency>

<!--elasticsearch-dadoop-->
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-hadoop</artifactId>
    <version>6.1.1</version>
</dependency>

備註:編譯環境必定要統一,scala的版本都是不向下兼容的。sql

代碼一:RDD讀取數據庫

val conf = new SparkConf().setMaster("local").setAppName("ScalaSparkElasticSearch")
/**
  * 根據es官網的描述,集成須要設置:
  * es.index.auto.create--->true
  * 咱們要去鏈接es集羣,要設置es集羣的位置(host, port)
  */
conf.set("es.index.auto.create", "true")
conf.set("es.nodes", "master")
//---->若是是鏈接的遠程es節點,該項必需要設置
conf.set("es.port", "9200")
val sc = new SparkContext(conf)
val esrdd:RDD[(String, collection.Map[String, AnyRef])]=sc.esRDD("kc22k2_detail/cz")//獲取整個索引的數據
esrdd.collect().foreach(s=>print(s._2))

代碼二:dataFrame讀取apache

val spark=SparkSession.builder().master("local").appName("spark_es").getOrCreate()
val options = Map("es.index.auto.create" -> "true", "pushdown" -> "true", "es.nodes" -> "10.111.121.115","es.port" -> "9200")
val sparkDF = spark.sqlContext.read.format("org.elasticsearch.spark.sql").options(options).load("kc22k2_detail/cz")
sparkDF.show(10)
sparkDF.createOrReplaceTempView("t_sn_gongshi")
spark.sql("select count(distinct name_depa) from t_sn_gongshi").show()

使用sparksql來讀取es,能夠實現sql操做數據庫很方便,但會花費大量的交互時間,日常最好直接使用es的api,與hadoop結合的時候能夠使用spark進行交互處理。api

代碼三:經過篩選條件讀取--減小數據量和網絡傳輸時間。網絡

DSL查詢參考地址:https://www.yiibai.com/elasticsearch/elasticsearch_query_dsl.htmlsession

val conf = new SparkConf()
conf.set("es.index.auto.create", "true")
conf.set("pushdown", "true")
conf.set("es.nodes", "192.168.x.xxx")
conf.set("es.port", "9200")
val spark=SparkSession.builder().master("local[*]").appName("spark_es").config(conf).getOrCreate()
val query:String =
s"""{
        "query" : {
            "terms":{
              "name":["北京","成都"]
            }

        }
}"""
val df = EsSparkSQL.esDF(spark.sqlContext,"area_map_0302_3/cz",query)
df.show()

代碼三:使用spark處理數據後寫入elasticsearchapp

def sparkWriteEs(): Unit ={
    val conf = new SparkConf()
    conf.set("es.index.auto.create", "true")
    conf.set("pushdown", "true")
    conf.set("es.nodes", "10.111.121.115")
    conf.set("es.port", "9200")
    //sparksession中能夠放多個配置文件
    val spark = SparkSession.builder()
      .appName("Spark-ES")
      .master("local[*]")
      .config(conf)
      .enableHiveSupport()
      .getOrCreate()
    /**
      * 寫入rdd數據
      */
    val data:RDD[String]=sc.textFile("file:///C:\\Users\\91BGJK2\\Desktop\\es測試數據.txt")
    val rdds:RDD[Trip]=data.map { line =>
     val s:Array[String]= line.split("\t")
      Trip(s(0),s(1),s(2))
    }
    EsSpark.saveToEs(rdds, "kc22k2_test/cz")

    //val sqlContext=new SQLContext(sc)
    /**
      * 從數據源中取---寫入datframe數據
      */
    val gongshi:DataFrame=spark.read.format("jdbc")
      .option("driver","com.mysql.jdbc.Driver")
      .option("url","jdbc:mysql://10.111.121.111:3306/test?useUnicode=true&characterEncoding=UTF-8")
      .option("dbtable","t_test_es")
      .option("user", "root")
      .option("password", "root")
      .load()
    //dataframe格式數據使用essparksql來傳遞,rdd使用esspark
    EsSparkSQL.saveToEs(gongshi, "kc22k2_detail/cz")
  }
相關文章
相關標籤/搜索