本文始發於我的公衆號:TechFlow,原創不易,求個關注java
今天是spark專題的第五篇,咱們來看看DataFrame。程序員
用過Python作過機器學習的同窗對Python當中pandas當中的DataFrame應該不陌生,若是沒作過也沒有關係,咱們簡單來介紹一下。DataFrame翻譯過來的意思是數據幀,但其實它指的是一種特殊的數據結構,使得數據以相似關係型數據庫當中的表同樣存儲。使用DataFrame咱們能夠很是方便地對整張表進行一些相似SQL的一些複雜的處理。Apache Spark在升級到了1.3版本以後,也提供了相似功能的DataFrame,也就是大名鼎鼎的SparkSQL。web
關於SparkSQL的前世此生實際上是有一大段歷史的,這一段歷史除了能夠充當吹牛的談資以外,還能夠幫助咱們理清楚許多技術之間的內在關聯。sql
在程序開發這個行當,優化和重構註定是兩個沒法擺脫的問題。數據庫
當一個項目啓動的時候,因爲投入有限,可能招不到特別匹配的人才,或者是爲了快速知足業務的須要。每每會採起一些不是特別合理的設計來構建項目,這個應該很好理解,爲了圖快犧牲一些性能或者是拓展性。並且有時候因爲視野和能力的限制,早期的開發者可能也是沒法意識到設計中的不合理性的。可是俗話說得好,出來混遲早是要還的。前面挖了坑,後來遲早也會暴露出來。問題就在於暴露了以後咱們怎麼處理。編程
通常來講,不管是做爲公司也好,仍是做爲開發者我的也罷。想的確定都是怎麼樣以最小的代價解決問題,也就是儘可能優化,能不動核心代碼就不動。除了由於核心代碼過久沒有維護或者是文檔缺失以外,也涉及到成本問題。如今的項目日進斗金,天天都在運行,一旦要下決心把核心代碼翻新一遍,那麼會付出巨大的代價,可能整個項目組要暫停一段時間。並且在上層管理層眼中,每每也是看不到重構的必要性的。由於上層都是以業務爲導向的,技術作得好很差不重要,能賺錢纔是王道。json
但問題是優化並非無止境的,不少時候核心設計的不合理纔是大頭,邊邊角角的修補只能聊勝於無。這個時候考驗的每每都是技術負責人的擔當了,是當個糊裱匠混一年是一年,仍是壯士斷腕,敢叫日月換新天。通常來講糊裱起到的效果都是有限的,總會有撐不下去要重構的那天。session
SparkSQL早期的發展就很是好的印證了這點,SparkSQL誕生之初就是當作一個優化項目誕生的。目的是爲了優化Hive中在spark的效率。數據結構
這裏的Hive可能不少人不太熟悉,它是Hadoop家族結構化查詢的工具。將hadoop集羣中的數據以表結構的形式存儲,讓程序員能夠以類SQL語句來查詢數據。看起來和數據庫有些近似,但原理不太同樣。Hive底層是以MapReduce驅動的,也就是說會把咱們寫好的SQL轉化成MapReduce執行。因爲Hive易用性很好,使用的人不少,因此spark當中也支持Hive。架構
但其實那個時候spark興起,MapReduce時代已經逐漸走到了末期。那時的spark是基於前面介紹的RDD的結構處理數據的,性能比MapReduce好得多。但若是在spark上依然使用MapReduce的形式支持Hive,那麼就不能體現出spark計算性能的優越性。因此對於Hive on Spark的優化勢在必行。我我的以爲這有點搶市場的調調。
最好的辦法是對spark完全重構,重建出一套支持結構化數據查詢的計算框架。但估計那時候主負責人沒能狠下心,或者是爲了趕時間。因此只是對Hive進行了一些優化,大概就是把一些使用MapReduce的計算想辦法儘可能改爲使用RDD,從而提高總體的效率。這樣作固然是可以有提高的,可是核心的框架仍然是Hive的那一套機制,這樣的提高是有限的。大概過了三年左右的時間,基本上全部能壓榨出來的性能都被壓榨完了,開發組通過激烈的思想鬥爭以後,終於接受現實,完全拋棄本來的框架,構建出一套新的架構來。
這套新開發出的架構就是SparkSQL,也就是DataFrame。
咱們來簡單看下SparkSQL的架構,大概知道內部是怎麼運行的。
整個SparkSQL的模型大概分爲三層,最上面是編程模型層,中間是執行優化層,最後是任務執行引擎。
這些都是術語,咱們簡單介紹一下,編程模型層主要有兩塊一塊是SparkSQL一種是DataFrame,這二者只是語法不同,底層執行的邏輯是同樣的。主要作的是對咱們寫的一些語法進行解析以及一些基本的處理。執行計劃層是將SQL語句轉化成具體須要執行的邏輯執行計劃,根據一些策略進行優化以後輸出物理執行策略。最後一層是執行層,負責將物理計劃轉化成RDD或者是DAG進行執行。
咱們觀察一下這個架構,可能還有不少細節不是很清楚,可是至少整個執行的過程已經很明白了。進一步能夠發現,整個架構當中已經徹底沒有MapReduce的影子了,底層的執行單元就是RDD。也就是說SparkSQL實際上是進一步更高層次的封裝。
咱們來簡單看下DataFrame和RDD的差異,最大最直觀的差異就是DataFrame多了schema的概念。也就是多了數據格式的概念,咱們拿到DataFrame能夠很輕鬆地獲取它其中數據的結構信息。
咱們看下下圖作個對比,一樣一份數據在RDD和DataFrame的樣子:
不要小瞧這個schema,有了它以後,咱們就能夠作一些結構化數據才支持的操做了。好比groupby、where、sum等等。這些結構化數據操做的靈活度要比RDD的map、filter等操做大得多。
另一個好處就是效率,若是咱們本身寫RDD來操做數據的話,那麼Python是必定幹不過scala和java的。由於spark底層是依託Java實現的,spark的全部計算都執行在JVM當中。scala和java都是直接在JVM當中直接運行的語言,而Python不行,因此以前咱們使用Python調用RDD處理spark的速度也會慢不少。由於咱們須要通過多層中轉,咱們能夠看下下面這張圖。
當咱們執行pyspark當中的RDD時,spark context會經過Py4j啓動一個使用JavaSparkContext的JVM,全部的RDD的轉化操做都會被映射成Java中的PythonRDD對象。當咱們的任務被傳輸到Workder進行執行的時候,PythonRDD會啓動Python的子進程來傳輸代碼和執行的結果。
上面這段話提及來有點繞,簡單理解就是當pyspark調用RDD的時候,Python會轉化成Java調用spark集羣分發任務。每個任務具體在機器上執行的時候,仍是以Python程序的方式執行。執行結束以後,仍是經過Python拿回數據給spark中的JVM。JVM執行結束以後,再把結果包裝成Python的類型返回給調用端。
原本Python的執行效率就低,加上中間又通過了若干次轉換以及通訊開銷(佔大頭),這就致使了pyspark中的RDD操做效率更低。
而如今有了Catalyst優化器以後,會自動幫助咱們進行底層的計算優化。而且即便是非原生的Python語言,也可使用它,所以會帶來性能的極大提高。甚至通過官方的測量,使用pyspark寫DataFrame的效率已經和scala和java分庭抗禮了。
因此若是咱們要選擇Python做爲操做spark的語言,DataFrame必定是首選。不過Catalyst優化器也有短板,它沒法解決跨語言自己帶來的問題。好比咱們使用Python寫一些udf(user defined function),仍是會帶來性能的損耗。這個時候的總體效率仍是會比scala低一些。
寫了這麼多廢話,下面就讓咱們實際一點,看看究竟pyspark當中的DataFrame要如何使用吧。
和RDD同樣,DataFrame的建立方法有不少,咱們能夠基於內存當中的數據進行建立,也能夠從本地文件或者是HDFS等其餘雲存儲系統當中進行讀取。但怎麼讀取不重要,使用方法纔是關鍵,爲了方便演示,咱們先來看看如何從內存當中建立DataFrame。
前文當中曾經說過,DataFrame當中的數據以表結構的形式存儲。也就是說咱們讀入的通常都是結構化的數據,咱們常用的結構化的存儲結構就是json,因此咱們先來看看如何從json字符串當中建立DataFrame。
首先,咱們建立一個json類型的RDD。
jsonstr = sc.parallelize((""" {'name': 'xiaoming', 'age': 13, 'score': 100}""",
"""{'name': 'xiaohong', 'age': 15, 'score': 98}"""
))
接着,咱們用spark.read.json將它轉化成一個DataFrame。須要注意的是,若是數據量很大,這個執行會須要一點時間,可是它仍然是一個轉化操做。數據其實並無真正被咱們讀入,咱們讀入的只是它的schema而已,只有當咱們執行執行操做的時候,數據纔會真正讀入處理。
studentDf = spark.read.json(jsonstr)
執行完這一句以後,RDD轉DataFrame的工做就完成了。嚴格提及來這是讀取操做,並非真正的轉化操做。RDD轉DataFrame稍微複雜一些,咱們晚點再說。
若是咱們想要查看DataFrame當中的內容,咱們能夠執行show方法,這是一個行動操做。和pandas中的head相似,執行以後,會展現出DataFrame當中前20條數據。咱們也能夠傳入參數,指定咱們要求展現的數據條數。
咱們來運行一下,看看展現出來的結果:
咱們也collect一下本來的RDD做爲一下對比:
這下一對比咱們就發現了,json格式的字符串果真能夠被解析,而且RDD被轉化成了表格格式的DataFrame。
咱們再來看下DataFrame的簡單查詢功能,其實Dataframe當中的查詢功能不少。咱們今天先來看其中用得比較多的兩種。
先來看第一種,第一種是經過select接口查詢數據。這裏的select其實對應的是SQL語句當中的select,含義也基本相同,不一樣的是咱們是經過函數進行調用的而已。
咱們能夠在select當中傳入咱們想要查找的列名。
咱們能夠加上where或者filter函數進行條件判斷,where和filter函數是一個意思,二者的用法也徹底同樣。官方提供了兩個名字,爲了避免同習慣的人使用方便而已。咱們把下圖當中的函數換成filter結果也是同樣的。
另一種操做方式稍稍複雜一些,則是將DataFrame註冊成pyspark中的一張視圖。這裏的視圖和數據庫中的視圖基本上是一個概念,spark當中支持兩種不一樣的視圖。第一種是臨時視圖,第二種是全局視圖。二者的用法基本一致,不一樣的是做用範圍。臨時視圖的做用範圍是當前的session,若是當前的session關閉,或者是另外開啓了新的session,這個視圖就會做廢。而全局視圖則是跨session的,全部session均可以使用。
若是搞不清楚session的概念也沒有關係,在以後的文章當中咱們還會遇到的。咱們先有這麼個印象便可。
咱們調用createOrReplaceTempView方法建立一個臨時視圖,有了視圖以後,咱們就能夠經過SQL語句來查詢數據了。
studentDf.createOrReplaceTempView("student")
咱們經過spark.sql傳入一段SQL string便可完成數據的調用,須要注意的是,DataFrame也支持RDD的collect或者take等方法。若是這裏的結果咱們調用的是collect,那麼spark會將全部數據都返回。若是數據集很大的狀況下可能會出現問題,因此要注意show和collect的使用範圍和區別,在一些場景下搞錯了會很危險。
今天這篇文章咱們一塊兒來看了pyspark當中目前爲止最經常使用的數據處理工具——DataFrame,還簡單瞭解了一下它和RDD相比的性能優點以及它簡單的查詢語法的使用方法。
從上面的方法咱們也看得出來,相比以前RDD中介紹的那些方法,DataFrame中封裝的API提供了更多高級的功能,比寫RDD處理數據也要方便不少。再加上性能緣由,咱們在處理數據時必然首選使用DataFrame。相信你們經過本文對於DataFrame也應該有了一個最初的印象,後續還會有更多文章詳細地介紹DataFrame的使用以及內部機制的一些細節,敬請期待吧。
今天的文章就到這裏,原創不易,掃碼關注我,獲取更多精彩文章。