Spark SQL編程之DataFrame篇

             Spark SQL編程之DataFrame篇html

                                     做者:尹正傑java

版權聲明:原創做品,謝絕轉載!不然將追究法律責任。sql

 

 

一.DataFrame的建立shell

  在Spark SQL中SparkSession是建立DataFrame和執行SQL的入口,建立DataFrame有三種方式:
    (1)經過Spark的數據源進行建立;
    (2)從一個存在的RDD進行轉換;
    (3)還能夠從Hive Table進行查詢返回。

1>.從Spark數據源進行建立express

[root@hadoop101.yinzhengjie.org.cn ~]# vim /tmp/user.json
[root@hadoop101.yinzhengjie.org.cn ~]# 
[root@hadoop101.yinzhengjie.org.cn ~]# cat /tmp/user.json
{"name":"yinzhengjie","passwd":"2020"}
{"name":"Jason","passwd":"666666"}
{"name":"Liming","passwd":"123"}
{"name":"Jenny","passwd":"456"}
{"name":"Danny","passwd":"789"}
[root@hadoop101.yinzhengjie.org.cn ~]# 
[root@hadoop101.yinzhengjie.org.cn ~]# vim /tmp/user.json        #建立測試數據
[root@hadoop101.yinzhengjie.org.cn ~]# spark-shell 
20/07/13 03:03:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://hadoop101.yinzhengjie.org.cn:4040
Spark context available as 'sc' (master = local[*], app id = local-1594580701441).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.6
      /_/
         
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_201)
Type in expressions to have them evaluated.
Type :help for more information.

scala> sc                            #Spark-shell內置的sc變量
res2: org.apache.spark.SparkContext = org.apache.spark.SparkContext@40cd02fc

scala> spark                          #spark-shell內置的spark變量
res3: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@16ef07fa

scala> val df = spark.read.json("/tmp/user.json")     #讀取本地的json文件會返回一個DataFrame對象,咱們命名爲df。
df: org.apache.spark.sql.DataFrame = [name: string, passwd: string]

scala> df.show                         #展現讀取到的結果
+-----------+------+
|       name|passwd|
+-----------+------+
|yinzhengjie|  2020|
|      Jason|666666|
|     Liming|   123|
|      Jenny|   456|
|      Danny|   789|
+-----------+------+


scala> 

2>.從RDD進行轉換apache

  博主推薦閱讀:
    https://www.cnblogs.com/yinzhengjie2020/p/13185272.html
    https://www.cnblogs.com/yinzhengjie2020/p/13200300.html

3>.從Hive Table進行查詢返回編程

  博主推薦閱讀:
    https://www.cnblogs.com/yinzhengjie2020/p/13211015.html

 

二.SQL風格語法json

  臨時表是Session範圍內的,Session退出後,表就失效了。若是想應用範圍內有效,能夠使用全局表。

  須要注意的是,使用全局表時須要全路徑訪問,如:"global_temp.user2"

1>.建立臨時視圖vim

scala> val df = spark.read.json("/tmp/user.json")     #讀取本地的json文件會返回一個DataFrame對象,咱們命名爲df。
df: org.apache.spark.sql.DataFrame = [name: string, passwd: string]

scala> df.createTempView("user")              #建立臨時視圖

scala> spark.sql("select * from user").show        #使用Spark SQL來查詢數據
+-----------+------+
|       name|passwd|
+-----------+------+
|yinzhengjie|  2020|
|      Jason|666666|
|     Liming|   123|
|      Jenny|   456|
|      Danny|   789|
+-----------+------+


scala> spark.sql("select * from user where passwd=2020").show      #固然,咱們也能夠進行過濾操做。
+-----------+------+
|       name|passwd|
+-----------+------+
|yinzhengjie|  2020|
+-----------+------+


scala> 

2>.建立全局視圖session

scala> df.createGlobalTempView("user2")                    #建立一個全局視圖

scala> spark.sql("select * from global_temp.user2").show          #默認使用當前的session查詢全局視圖數據
+-----------+------+
|       name|passwd|
+-----------+------+
|yinzhengjie|  2020|
|      Jason|666666|
|     Liming|   123|
|      Jenny|   456|
|      Danny|   789|
+-----------+------+


scala> spark.sql("select * from global_temp.user2 user where passwd=2020").show
+-----------+------+
|       name|passwd|
+-----------+------+
|yinzhengjie|  2020|
+-----------+------+


scala> spark.newSession().sql("select * from global_temp.user2").show      #使用一個新session來查詢全局視圖數據
+-----------+------+
|       name|passwd|
+-----------+------+
|yinzhengjie|  2020|
|      Jason|666666|
|     Liming|   123|
|      Jenny|   456|
|      Danny|   789|
+-----------+------+


scala> spark.newSession().sql("select * from global_temp.user2 user where passwd=2020").show
+-----------+------+
|       name|passwd|
+-----------+------+
|yinzhengjie|  2020|
+-----------+------+


scala> 

 

三.DSL風格語法

1>.查看DataFrame的Schema信息

scala> val df = spark.read.json("/tmp/user.json")        #建立一個DataFrame
df: org.apache.spark.sql.DataFrame = [name: string, passwd: string]

scala> df.printSchema                        #查看DataFrame的Schema信息
root
 |-- name: string (nullable = true)
 |-- passwd: string (nullable = true)


scala> 

2>.只查看"name"列數據

scala> df.select("name").show()
+-----------+
|       name|
+-----------+
|yinzhengjie|
|      Jason|
|     Liming|
|      Jenny|
|      Danny|
+-----------+


scala> 

3>.查看」name」列數據以及」passwd+30」數據

scala> df.select($"name", $"passwd" + 10).show()
+-----------+-------------+
|       name|(passwd + 10)|
+-----------+-------------+
|yinzhengjie|       2030.0|
|      Jason|     666676.0|
|     Liming|        133.0|
|      Jenny|        466.0|
|      Danny|        799.0|
+-----------+-------------+


scala> 

4>.查看」passwd」大於」2020」的數據

scala> df.filter($"passwd" > 2020).show()
+-----+------+
| name|passwd|
+-----+------+
|Jason|666666|
+-----+------+


scala> 

5>.按照」passwd」分組,查看數據條數

scala> df.groupBy("passwd").count().show()
+------+-----+
|passwd|count|
+------+-----+
|  2020|    1|
|   789|    1|
|666666|    1|
|   456|    1|
|   123|    1|
+------+-----+


scala> 

 

四.RDD轉換爲DataFrame

  舒適提示:  
    若是須要RDD與DF或者DS之間操做,那麼都須要引入"import spark.implicits._"(spark不是包名,而是sparkSession對象的名稱),下面是具體的案例。


scala
> import spark.implicits._ #導入隱式轉換 import spark.implicits._ scala> val listRDD = sc.makeRDD(List((1,"YinZhengjie",18),(2,"Jason Yin",20),(3,"Danny",28)))      #建立一個RDD listRDD: org.apache.spark.rdd.RDD[(Int, String, Int)] = ParallelCollectionRDD[84] at makeRDD at <console>:27 scala> val df = listRDD.toDF("Id","Name","Age")      #將RDD轉換成DataFrame df: org.apache.spark.sql.DataFrame = [Id: int, Name: string ... 1 more field] scala> df.show                         #查看將RDD轉換成DataFrame後的數據 +---+-----------+---+ | Id| Name|Age| +---+-----------+---+ | 1|YinZhengjie| 18| | 2| Jason Yin| 20| | 3| Danny| 28| +---+-----------+---+ scala>

 

五.DataFrame轉換爲RDD

scala> df          #注意觀察此時df是DataFrame
res33: org.apache.spark.sql.DataFrame = [Id: int, Name: string ... 1 more field]

scala> df.show
+---+-----------+---+
| Id|       Name|Age|
+---+-----------+---+
|  1|YinZhengjie| 18|
|  2|  Jason Yin| 20|
|  3|      Danny| 28|
+---+-----------+---+


scala> df.rdd       #直接調用rdd方法便可將DataFrame轉換爲RDD
res35: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[97] at rdd at <console>:29

scala> res35.collect   #查看DataFrame轉換rdd後的數據(注意哈,這個res36是上一條命令執行的返回結果)
res36: Array[org.apache.spark.sql.Row] = Array([1,YinZhengjie,18], [2,Jason Yin,20], [3,Danny,28])

scala> 
相關文章
相關標籤/搜索