在scala中使用spark sql解決特定需求(2)

接着上篇文章,本篇來看下如何在scala中完成使用spark sql將不一樣日期的數據導入不一樣的es索引裏面。node

首下看下用到的依賴包有哪些:sql

elasticsearch-spark-20_2.11   5.3.2
elasticsearch                 2.3.4
spark-sql_2.11                2.1.0
spark-hive_2.11               2.1.0
spark-core_2.11               2.1.0
hadoop-client                 2.7.3
scala-library                 2.11.8

下面看相關的代碼,代碼可直接在跑在win上的idea中,使用的是local模式,數據是模擬造的:apache

import org.apache.spark.sql.types.{DataTypes, StructField}
import org.apache.spark.sql.{Row, SparkSession}//導入Row對象

/**
  * spark sql to es 本地測試例子
  */
object SparkGroupES {


  def main(args: Array[String]): Unit = {

    //構建spark session
    val spark = SparkSession
      .builder().master("local[1]")
      .appName("Spark SQL basic example")
      .config("es.nodes","192.168.10.125").config("es.port","9200")
      .getOrCreate()

    //導入es-spark的包
    import org.elasticsearch.spark.sql._
    import spark.implicits._


    //使用Seq造數據,四列數據
    val df = spark.sparkContext.parallelize(Seq(
      (0,"p1",30.9,"2017-03-04"),
      (0,"u",22.1,"2017-03-05"),
      (1,"r",19.6,"2017-03-04"),
      (2,"cat40",20.7,"2017-03-05"),
      (3,"cat187",27.9,"2017-03-04"),
      (4,"cat183",11.3,"2017-03-06"),
      (5,"cat8",35.6,"2017-03-08"))

     ).toDF("id", "name", "price","dt")//轉化df的四列數據s
    //建立代表爲pro
    df.createTempView("pro")

    import spark.sql //導入sql函數

    //按照id分組,統計每組數量,統計每組裏面最小的價格,而後收集每組裏面的數據
    val ds=sql("select dt, count(*) as c ,collect_list(struct(id,name, price)) as res  from pro   group by dt ")
    //須要屢次查詢的數據,能夠緩存起來
    ds.cache()
    //獲取查詢的結果,遍歷獲取結果集
    ds.select("dt","c","res").collect().foreach(line=>{
      val dt=line.getAs[String]("dt") //獲取日期
      val count=line.getAs[Long]("c")//獲取數量
      val value=line.getAs[Seq[Row]]("res")//獲取每組內的數據集合,注意是一個Row實體
      println("日期:"+dt+" 銷售數量: "+count)

      //建立一個schema針對struct結構
      val schema = DataTypes
        .createStructType( Array[StructField](
          DataTypes.createStructField("id", DataTypes.IntegerType, false), //不容許爲null
          DataTypes.createStructField("name", DataTypes.StringType, true),
          DataTypes.createStructField("price", DataTypes.DoubleType, true)
        ))
        //將value轉化成rdd
        val rdd=spark.sparkContext.makeRDD(value)
        //將rdd註冊成DataFrame
        val df =spark.createDataFrame(rdd,schema)
        //保存每個分組的數據到es索引裏面
        EsSparkSQL.saveToEs(df,"spark"+dt+"/spark",Map("es.mapping.id" -> "id"))
//      value.foreach(row=>{//遍歷組內數據集合,而後打印
//        println(row.getAs[String]("name")+" "+row.getAs[Double]("price"))
//      })

    })
    println("索引成功")
    spark.stop()
  }

}

分析下,代碼執行過程:緩存

(1)首先建立了一個SparkSession對象,注意這是新版本的寫法,而後加入了es相關配置session

(2)導入了隱式轉化的es相關的包app

(3)經過Seq+Tuple建立了一個DataFrame對象,並註冊成一個表elasticsearch

(4)導入spark sql後,執行了一個sql分組查詢ide

(5)獲取每一組的數據函數

(6)處理組內的Struct結構oop

(7)將組內的Seq[Row]轉換爲rdd,最終轉化爲df

(8)執行導入es的方法,按天插入不一樣的索引裏面

(9)結束

須要注意的是必須在執行collect方法後,才能在循環內使用sparkContext,不然會報錯的,在服務端是不能使用sparkContext的,只有在Driver端才能夠。

相關文章
相關標籤/搜索