Spark SQL編程之DataFrame篇html
做者:尹正傑java
版權聲明:原創做品,謝絕轉載!不然將追究法律責任。sql
一.DataFrame的建立shell
在Spark SQL中SparkSession是建立DataFrame和執行SQL的入口,建立DataFrame有三種方式:
(1)經過Spark的數據源進行建立;
(2)從一個存在的RDD進行轉換;
(3)還能夠從Hive Table進行查詢返回。
1>.從Spark數據源進行建立express
[root@hadoop101.yinzhengjie.org.cn ~]# vim /tmp/user.json [root@hadoop101.yinzhengjie.org.cn ~]# [root@hadoop101.yinzhengjie.org.cn ~]# cat /tmp/user.json {"name":"yinzhengjie","passwd":"2020"} {"name":"Jason","passwd":"666666"} {"name":"Liming","passwd":"123"} {"name":"Jenny","passwd":"456"} {"name":"Danny","passwd":"789"} [root@hadoop101.yinzhengjie.org.cn ~]#
[root@hadoop101.yinzhengjie.org.cn ~]# spark-shell 20/07/13 03:03:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Spark context Web UI available at http://hadoop101.yinzhengjie.org.cn:4040 Spark context available as 'sc' (master = local[*], app id = local-1594580701441). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.6 /_/ Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_201) Type in expressions to have them evaluated. Type :help for more information. scala> sc #Spark-shell內置的sc變量 res2: org.apache.spark.SparkContext = org.apache.spark.SparkContext@40cd02fc scala> spark #spark-shell內置的spark變量 res3: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@16ef07fa scala> val df = spark.read.json("/tmp/user.json") #讀取本地的json文件會返回一個DataFrame對象,咱們命名爲df。 df: org.apache.spark.sql.DataFrame = [name: string, passwd: string] scala> df.show #展現讀取到的結果 +-----------+------+ | name|passwd| +-----------+------+ |yinzhengjie| 2020| | Jason|666666| | Liming| 123| | Jenny| 456| | Danny| 789| +-----------+------+ scala>
2>.從RDD進行轉換apache
博主推薦閱讀: https://www.cnblogs.com/yinzhengjie2020/p/13185272.html https://www.cnblogs.com/yinzhengjie2020/p/13200300.html
3>.從Hive Table進行查詢返回編程
博主推薦閱讀: https://www.cnblogs.com/yinzhengjie2020/p/13211015.html
二.SQL風格語法json
臨時表是Session範圍內的,Session退出後,表就失效了。若是想應用範圍內有效,能夠使用全局表。
須要注意的是,使用全局表時須要全路徑訪問,如:"global_temp.user2"
1>.建立臨時視圖vim
scala> val df = spark.read.json("/tmp/user.json") #讀取本地的json文件會返回一個DataFrame對象,咱們命名爲df。 df: org.apache.spark.sql.DataFrame = [name: string, passwd: string] scala> df.createTempView("user") #建立臨時視圖 scala> spark.sql("select * from user").show #使用Spark SQL來查詢數據 +-----------+------+ | name|passwd| +-----------+------+ |yinzhengjie| 2020| | Jason|666666| | Liming| 123| | Jenny| 456| | Danny| 789| +-----------+------+ scala> spark.sql("select * from user where passwd=2020").show #固然,咱們也能夠進行過濾操做。 +-----------+------+ | name|passwd| +-----------+------+ |yinzhengjie| 2020| +-----------+------+ scala>
2>.建立全局視圖session
scala> df.createGlobalTempView("user2") #建立一個全局視圖 scala> spark.sql("select * from global_temp.user2").show #默認使用當前的session查詢全局視圖數據 +-----------+------+ | name|passwd| +-----------+------+ |yinzhengjie| 2020| | Jason|666666| | Liming| 123| | Jenny| 456| | Danny| 789| +-----------+------+ scala> spark.sql("select * from global_temp.user2 user where passwd=2020").show +-----------+------+ | name|passwd| +-----------+------+ |yinzhengjie| 2020| +-----------+------+ scala> spark.newSession().sql("select * from global_temp.user2").show #使用一個新session來查詢全局視圖數據 +-----------+------+ | name|passwd| +-----------+------+ |yinzhengjie| 2020| | Jason|666666| | Liming| 123| | Jenny| 456| | Danny| 789| +-----------+------+ scala> spark.newSession().sql("select * from global_temp.user2 user where passwd=2020").show +-----------+------+ | name|passwd| +-----------+------+ |yinzhengjie| 2020| +-----------+------+ scala>
三.DSL風格語法
1>.查看DataFrame的Schema信息
scala> val df = spark.read.json("/tmp/user.json") #建立一個DataFrame df: org.apache.spark.sql.DataFrame = [name: string, passwd: string] scala> df.printSchema #查看DataFrame的Schema信息 root |-- name: string (nullable = true) |-- passwd: string (nullable = true) scala>
2>.只查看"name"列數據
scala> df.select("name").show() +-----------+ | name| +-----------+ |yinzhengjie| | Jason| | Liming| | Jenny| | Danny| +-----------+ scala>
3>.查看」name」列數據以及」passwd+30」數據
scala> df.select($"name", $"passwd" + 10).show() +-----------+-------------+ | name|(passwd + 10)| +-----------+-------------+ |yinzhengjie| 2030.0| | Jason| 666676.0| | Liming| 133.0| | Jenny| 466.0| | Danny| 799.0| +-----------+-------------+ scala>
4>.查看」passwd」大於」2020」的數據
scala> df.filter($"passwd" > 2020).show() +-----+------+ | name|passwd| +-----+------+ |Jason|666666| +-----+------+ scala>
5>.按照」passwd」分組,查看數據條數
scala> df.groupBy("passwd").count().show() +------+-----+ |passwd|count| +------+-----+ | 2020| 1| | 789| 1| |666666| 1| | 456| 1| | 123| 1| +------+-----+ scala>
四.RDD轉換爲DataFrame
舒適提示: 若是須要RDD與DF或者DS之間操做,那麼都須要引入"import spark.implicits._"(spark不是包名,而是sparkSession對象的名稱),下面是具體的案例。
scala> import spark.implicits._ #導入隱式轉換 import spark.implicits._ scala> val listRDD = sc.makeRDD(List((1,"YinZhengjie",18),(2,"Jason Yin",20),(3,"Danny",28))) #建立一個RDD listRDD: org.apache.spark.rdd.RDD[(Int, String, Int)] = ParallelCollectionRDD[84] at makeRDD at <console>:27 scala> val df = listRDD.toDF("Id","Name","Age") #將RDD轉換成DataFrame df: org.apache.spark.sql.DataFrame = [Id: int, Name: string ... 1 more field] scala> df.show #查看將RDD轉換成DataFrame後的數據 +---+-----------+---+ | Id| Name|Age| +---+-----------+---+ | 1|YinZhengjie| 18| | 2| Jason Yin| 20| | 3| Danny| 28| +---+-----------+---+ scala>
五.DataFrame轉換爲RDD
scala> df #注意觀察此時df是DataFrame res33: org.apache.spark.sql.DataFrame = [Id: int, Name: string ... 1 more field] scala> df.show +---+-----------+---+ | Id| Name|Age| +---+-----------+---+ | 1|YinZhengjie| 18| | 2| Jason Yin| 20| | 3| Danny| 28| +---+-----------+---+ scala> df.rdd #直接調用rdd方法便可將DataFrame轉換爲RDD res35: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[97] at rdd at <console>:29 scala> res35.collect #查看DataFrame轉換rdd後的數據(注意哈,這個res36是上一條命令執行的返回結果) res36: Array[org.apache.spark.sql.Row] = Array([1,YinZhengjie,18], [2,Jason Yin,20], [3,Danny,28]) scala>