IDEA建立SparkSQL程序

             IDEA建立SparkSQL程序sql

                                     做者:尹正傑apache

版權聲明:原創做品,謝絕轉載!不然將追究法律責任。json

 

 

 

一.建立DataFrameide

      <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
pom.xml文件內容(添加依賴關係)
package com.yinzhengjie.bigdata.spark.sql

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.SparkConf

object SparkSQLDemo {
  def main(args: Array[String]): Unit = {
    //建立spark配置信息
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo")

    //建立SparkSQL的環境對象,即建立SparkSession
    val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    //讀取json文件,構建DataFrame對象
    val frame:DataFrame = spark.read.json("E:\\yinzhengjie\\bigdata\\input\\json\\user.json")

    //展現數據
    frame.show()

    //釋放資源
    spark.close()
  }
}

 

二.採用SQL的語法訪問數據ui

package com.yinzhengjie.bigdata.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

object SparkSQLDemo2 {
  def main(args: Array[String]): Unit = {
    //建立spark配置信息
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo2")

    //建立SparkSQL的環境對象,即建立SparkSession
    val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    //讀取json文件,構建DataFrame對象
    val frame:DataFrame = spark.read.json("E:\\yinzhengjie\\bigdata\\input\\json\\user.json")

    //建立一張臨時視圖
    frame.createTempView("user")

    //展現數據
//    frame.show()
    spark.sql("select * from user").show()  //採用SQL的語法訪問數據

    //釋放資源
    spark.close()
  }
}

 

三.RDD,DataFrame和DataSet相互轉換案例spa

package com.yinzhengjie.bigdata.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/**
  *   定義樣例類
  */
case class User(id:Int,name:String,age:Int)

object SparkSQLDemo3 {
  def main(args: Array[String]): Unit = {
    //建立spark配置信息
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo3")

    //建立SparkSQL的環境對象,即建立SparkSession
    val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    /**
      *   舒適提示:
      *     進行轉換以前,須要引入隱式轉換規則,這裏的spark不是包名的含義,而是SparkSession對象的名字喲~
      */
      import spark.implicits._

    //建立RDD
    val listRDD:RDD[(Int,String,Int)] = spark.sparkContext.parallelize(List((1,"YinZhengjie",18),(2,"Jason Yin",28),(3,"Danny",27)))

    //轉換爲DataFrame
    val df:DataFrame = listRDD.toDF("Id","Name","Age")

    //將DataFrame轉換爲DataSet
    val ds:Dataset[User] = df.as[User]

    //將DataSet轉換爲DataFrame
    val df1:DataFrame = ds.toDF()

    //將DataFrame轉換爲RDD
    val rdd1:RDD[Row] = df1.rdd


    //遍歷RDD,獲取數據時,能夠經過索引訪問數據
    rdd1.foreach(row =>{
      println(row.getString(1))
    })

    //爲listRDD手動添加類型
    val userRDD:RDD[User] = listRDD.map {
      case (id, name, age) => {
        User(id, name, age)
      }
    }
    //將RDD直接轉換爲DataSet
    val ds2:Dataset[User] = userRDD.toDS()

    //將DataSet直接轉換爲RDD
    val rdd2:RDD[User] = ds2.rdd

    //遍歷rdd2
    rdd2.foreach(println)

    //釋放資源
    spark.close()
  }
}
相關文章
相關標籤/搜索