週末的任務是更新Learning Spark系列第三篇,覺得本身寫不完了,但爲了改正拖延症,仍是得完成給本身定的任務啊 = =。這三章主要講Spark的運行過程(本地+集羣),性能調優以及Spark SQL相關的知識,若是對Spark不熟的同窗能夠先看看以前總結的兩篇文章:html
【原】Learning Spark (Python版) 學習筆記(一)----RDD 基本概念與命令node
【原】Learning Spark (Python版) 學習筆記(二)----鍵值對、數據讀取與保存、共享特性python
########################################我是正文分割線######################################sql
第七章主要講了Spark的運行架構以及在集羣上的配置,這部分文字比較多,可能會比較枯燥,主要是講整個過程是怎麼運行的。首先咱們來了解一下Spark在分佈式環境中的架構,如圖1 所示shell
圖1 Spark分佈式結構圖數據庫
如上圖所示,在Spark集羣中有一個節點負責中央協調,調度各個分佈式工做節點。這個中央協調點叫「驅動器節點(Driver)」,與之對應的工做節點叫「執行器節點(executor)」。驅動器節點和全部的執行器節點被稱爲一個Spark應用(Application)。Spark應用經過一個「集羣管理器(Cluster Manager)」的外部服務在集羣中的機器上啓動,其中它自帶的集羣管理器叫「獨立集羣管理器」。json
驅動器節點:緩存
做用網絡
- 執行程序中的main()方法的進程,一旦終止,Spark應用也終止了。
職責架構
- 把用戶程序轉化爲任務
- 用戶輸入數據,建立了一系列RDD,再使用Transformation操做生成新的RDD,最後啓動Action操做存儲RDD中的數據,由此構成了一個有向無環圖(DAG)。當Drive啓動時,Spark會執行這些命令,並轉爲一系列stage(步驟)來操做。在這些步驟中,包含了多個task(任務),這些task被打包送到集羣中,就能夠進行分佈式的運算了,是否是像流水線上的工人呢~
- 爲執行器節點調度任務:
- Driver啓動後,必須在各執行器進程間協調各個任務。執行器進程啓動後會在Driver上註冊本身的節點,這樣Driver就有全部執行器節點的完整記錄了。每一個執行器節點表明一個可以處理任務和存儲RDD數據的進程。Spark會根據當前任務的執行器節點集合,嘗試把全部的任務基於數據所在的位置分配給合適的執行器進程。當咱們的任務執行時,執行器進程會把緩存數據存儲起來,而驅動器進程一樣也會跟蹤這些緩存數據的任務,並利用這些位置信息來調度之後的任務,以儘可能減小數據的網絡傳輸。
執行器節點:
做用:
- 負責在Spark做業中運行任務,各個任務間相互獨立。Spark啓動應用時,執行器節點就被同時啓動,並一直持續到Spark應用結束。
職責:
- 負責運行組成Spark應用的任務,並將結果返回給驅動器程序。
- 經過自身的塊管理器(Block Manager)爲用戶程序中要求緩存的RDD提供內存式存儲。RDD是直接緩存在執行器進程裏的,因此能夠在運行時充分利用緩存數據提升運算速度。
集羣管理器:
在圖一中咱們看到,Spark依賴於集羣管理器來啓動執行器節點,而在某些特殊狀況下,也會依賴集羣管理器來啓動驅動器節點。Spark有自帶的獨立集羣管理器,也能夠運行在其餘外部集羣管理器上,如YARN和Mesos等。下面講一下兩種比較常見的外部集羣管理器:
獨立集羣管理器:
1.啓動獨立集羣管理器
2.提交應用:spark-submit --master spark://masternode:7077 yourapp
支持兩種部署模式:客戶端模式和集羣模式
3.配置資源用量:在多個應用間共享Spark集羣時,經過如下兩個設置來對執行器進程分配資源:
3.1 執行器進程內存:能夠經過spark-submit中的 --executor-memory 參數來配置。每一個應用在每一個工做節點上最多擁有一個執行器進程。所以這個這個可以控制 執行器節點佔用工做節點多少內存。默認值是1G。
3.2 佔用核心總數的最大值:能夠經過spark-submit中的 --total -executorcores 參數來設置。
Hadoop YARN:
1.提交應用:設置指向你的Hadoop配置目錄的環境變量,而後使用spark-submit 向一個特殊的主節點URL提交做業便可。
2.配置資源用量:
- --num -executors :設置執行器節點,默認值爲2
- --executor -memory: 設置每一個執行器的內存用量
- --executor -cores: 設置每一個執行器進程從YARN中佔用的核心數目
- --queue:設置隊列名稱,YARN能夠將應用調度到多個隊列中。
Apache Mesos:
1.提交應用:spark-submit --master mesos://masternode:5050 your app
2.Mesos調度模式:兩種:
- 細粒度模式:默認模式。一臺運行了多個執行器進程的機器能夠動態共享CPU資源
- 粗粒度模式:Spark爲每一個執行器分配固定數量的CPU數目,而且在應用結束前不會釋放該資源,即便執行器進程當前沒有運行任務(多浪費啊 = =)。能夠經過spark-submit 傳遞 --conf spark.mesos.coarse=true 來打開粗粒度模式
3.部署模式:僅支持以客戶端的部署模式運行應用,即驅動器程序必須運行提交應用的那臺機器上。
4.配置資源用量:
- --executor -memory:設置每一個執行器進程的內存
- --total -executor -cores :設置應用佔用的核心數(全部執行器節點佔用的總數)的最大值。若是不設置該值,Mesos可能會使用急羣衆全部可用的核心。
選擇合適的集羣管理器:
1.通常狀況下,能夠直接選擇獨立集羣模式,功能全,並且簡單。
2.若是要在使用Spark的同時使用其餘應用,能夠選擇YARN或Mesos。並且大多數版本的Hadoop中已經預裝好YARN了,很是方便。
3.對於多用戶同事運行交互式shell時,能夠選擇Mesos(選擇細粒度模式),這種模式能夠將Spark-shell這樣的交互式應用中的不一樣命令分配到不一樣的CPU上。
4.任什麼時候候,最好把Spark運行在運行HDFS的節點上,能夠快速訪問存儲。
提交應用:
使用spark-submit腳本提交應用,能夠根據不一樣的狀況設置成在本地運行和在集羣運行等:
- 本地模式:bin/spark-submit (--local) my_script.py
(lcoal能夠省略)
- 集羣模式:bin/spark-submit --master spark://host:7077 --executor-memory 10g my_script.py
(--master標記要鏈接的集羣的URL)
總結一下Spark在集羣上的運行過程:
#########################################我是看累了休息會兒的分割線##############################
前面已經講完了Spark的運行過程,包括本地和集羣上的。如今咱們來說講Spark的調優與調試。
咱們知道,Spark執行一個應用時,由做業、任務和步驟組成。先回顧一下:
任務:Spark的最小工做單位
步驟:由多個任務組成
做業:由一個或多個做業組成
在第一篇中咱們也講過,當咱們建立轉化(Transformation)RDD時,是執行"Lazy"(惰性)計算的,只有當出現Action操做時纔會觸發真正的計算。而Action操做是如何調用Transformation計算的呢?實際上,Spark調度器會建立出用於計算Action操做的RDD物理執行計劃,當它從最終被調用Action操做的RDD時,向上回溯全部必需計算的RDD。調度器會訪問RDD的父節點、父節點的父節點,以此類推,遞歸向上生成計算全部必要的祖先RDD的物理計劃。
然而,當調度器圖與執行步驟的對應關係並不必定是一對一的。當RDD不須要混洗數據就能夠從父節點計算出來,RDD不須要混洗數據就能夠從父節點計算出來,或把多個RDD合併到一個步驟中時,調度器就會自動進行進行"流水線執行"(pipeline)。例以下圖中,儘管有不少級父RDD,但從縮進來看,只有兩個步驟,說明物理執行只須要兩個步驟。由於這個執行序列中有幾個連續的篩選和映射操做,因此纔會出現流水線執行。
當步驟圖肯定下來後,任務就會被建立出來併發給內部的調度器,這些步驟會以特定的順序執行。一個物理步驟會啓動不少任務,每一個任務都是在不一樣的數據分區上作一樣的事情,任務內部的流程是同樣的,以下所示:
1.從數據存儲(輸入RDD)或已有RDD(已緩存的RDD)或數據混洗的輸出中獲取輸入數據
2.執行必要的操做來計算RDD。
3.把輸出寫到一個數據混洗文件中,寫入外部存儲,或是發揮驅動器程序。
總結一下,Spark執行的流程:
- 用戶定義RDD的有向無環圖(DAG):RDD上的操做會建立出新的RDD,並引用它們的父節點,這樣就建立出了一個圖。
- Action操做把有向無環圖強制轉譯爲執行計劃:Spark調度器提交一個做業來計算所必要的RD,這個做業包含一個或多個步驟,每一個步驟就是一些並行執行的計算任務。一個步驟對應有向無環圖中的一個或多個RDD(其中對應多個RDD是在"流水線執行"中發生的)
- 在集羣中調度並執行任務:步驟是按順序處理的,任務則獨立啓動來計算RDD的一部分。看成業的最後一個步驟結束時,一個Action操做也執行完了。
Spark調優
到這裏咱們已經基本瞭解Spark的內部工做原理了,那麼在哪些地方能夠進行調優呢?有如下四個方面:
並行度
- 影響性能的兩個方面
- a.並行度太低時,會出現資源限制的狀況。此時能夠提升並行度來充分利用更多的計算core。
- b.並行度太高時,每一個分區產生的間接開銷累計起來會更大。評價並行度是否太高能夠看你的任務是否是在瞬間(毫秒級)完成的,或者任務是否是沒有讀寫任何數據。
- 調優方法
- 在數據混洗操做時,對混洗後的RDD設定參數制定並行度
- 對於任何已有的RDD進行從新分區來獲取更多/更少的分區數。從新分區:repartition();減小分區:coalesce(),比repartition()更高效。
序列化格式
當Spark須要經過網絡傳輸數據,或者將數據溢出寫到磁盤上時(默認存儲方式是內存存儲),Spark須要數據序列化爲二進制格式。默認狀況下,使用Java內建的序列化庫。固然,也支持使用第三方序列化庫Kryo,比Java序列化時間更短,而且有更高壓縮比的二進制表示。但有一點須要注意:Kryo不能序列化所有類型的對象。
內存管理
- RDD存儲(60%)
- 調用persisit()或cahe()方法時,RDD的分區會被存儲到緩存區中。Spark會根據spark.storage.memoryFraction限制用來緩存的內存佔整個JVM堆空間的比例大小。超出限制的話,舊的分區會被移出內存。
- 數據混洗與聚合的緩存區(20%)
- 當數據進行數據混洗時,Spark會創造一些中間緩存區來存儲數據混洗的輸出數據。根據spark.shuffle.memoryFraction限定這種緩存區佔總內存的比例。
- 用戶的代碼(20%)
- spark能夠執行任意代碼,因此用戶的代碼能夠申請大量內存,它能夠訪問JVM堆空間中除了分配給RDD存儲和數據混洗存儲之外的所有空間。20%是默認狀況下的分配比例。不過用戶能夠自行調節這個比例來提升性能。
固然,除了調整內存比例,也能夠改變內存的存儲順序。咱們知道,Spark默認的cache()操做是以Memory_ONLY的存儲等級持久化數據的,也就是說內存優先。若是RDD分區時的空間不夠,舊的分區會直接刪除。(妹的刪數據也不帶打聲招呼的 = =!)當用到這些分區時,又會從新進行計算。因此,若是咱們用Memory_AND_DISK的存儲等級調用persist()方法效果會更好。由於當內存滿的時候,放不下的舊分區會被寫入磁盤,再用的時候就從磁盤裏讀取回來,這樣比從新計算各分區的消耗要小得多,性能也更穩定(不會動不動報Memory Error了,哈哈)。特別是當RDD從數據庫中讀取數據的話,最好選擇內存+磁盤的存儲等級吧。
硬件供給
影響集羣規模的主要這幾個方面:分配給每一個執行器節點的內存大小、每一個執行器節點佔用的核心數、執行器節點總數、以及用來存儲臨時數據的本地磁盤數量(在數據混洗使用Memory_AND_DISK的存儲等級時,更大的磁盤能夠提高Spark的性能哦~)。
##################################我是文章快結束的分割線######################################
最後咱們來說講Spark SQL,上一篇中咱們已經總結了如何使用Spark讀取和保存文件,涉及到了這部份內容,因此這一篇中只會簡要的說明一下:
導入Spark SQL與簡單的查詢示例
1 #初始化Spark SQL
2 #導入Spark SQL
3 from pyspark.sql import HiveContext,Row 4 #當不能引入Hive依賴時
5 from pyspark.sql import SQLContext,Row 6 #建立SQL上下文環境
7 hiveCtx = HiveContext(sc) 8 #基本查詢示例
9 input = hiveCtx.jsonFile(inputFile) 10 #註冊輸入的SchemaRDD(SchemaRDD在Spark 1.3版本後已經改成DataFrame)
11 input.registerTempTable("tweets") 12 #依據retweetCount(轉發計數)選出推文
13 topTweets = hiveCtx.sql("SELECT text,retweetCount FROM tweets ORDER BY retweetCount LIMIT 10")
緩存
以一種列式存儲格式在內存中存儲數據。這些緩存下來的表只會在Driver的生命週期內保留在內存中,退出的話就沒有了。能夠經過cache() 和 uncache()命令來緩存表或者刪除已緩存的表。
讀取和存儲數據
Apache Hive
1 #使用Python從Hive中讀取
2 from pyspark.sql import HiveContext 3
4 hiveCtx = HiveContext(sc) 5 rows = hiveCtx.sql("SELECT key,value FROM mytable") 6 keys = rows.map(lambda: row,row[0])
Parquet
1 #Python中的Parquet數據讀取
2 #從一個有name和favoriteAnimal字段的Parquet文件中讀取數據
3 rows = hiveCtx.parquetFile(parquetFile) 4 names = rows.map(lambda row: row.name) 5 print "Everyone"
6 print names.collect() 7
8 #Python中的Parquet數據查詢
9 #這裏把Parquet文件註冊爲Spark SQL的臨時表來查詢數據
10 #尋找熊貓愛好者
11 tbl = rows.registerTempTable("people") 12 pandaFriends = hiveCtx.sql("SELECT name FROM people WHERE favouriteAnimal = \"panda\"") 13 print "Panda friends"
14 print pandaFriends.map(lambda row:row.name).collect() 15
16 #使用saveAsParquetFile()保存文件
17 pandaFriends.saveAsParqueFile("hdfs://")
JSON
1 #在python中讀取JSON數據 2 input= hiveCtx.jsonFile(inputFile)
使用BeeLine
建立、列舉、查詢Hive表
用戶自定義函數(UDF)
1 #Python版本的字符串長度UDF
2 hiveCtx.registerFuction("strLenPython",lambda x :len(x),IntegerType()) 3 LengthSchemaRDD = hiveCtx.sql("SELECT strLenPython('text') FROM tweets LIMIT 10")
Spark SQL性能
Spark SQL在緩存數據時,使用的是內存式的列式存儲,即Parquet格式,不只節約了緩存時間,並且儘量的減小了後續查詢中針對某幾個字段時的數據讀取。
性能調優選項
選項 | 默認值 | 用途 |
spark.sql.codegen | false | 設爲True時,Spark SQL會把每條查詢語句在運行時編譯爲Java二進制代碼。這能夠提升大型查詢的性能,但在小規模查詢時會變慢 |
spark.sql.inMemoryColumnarStorage.compressed | false | 自動對內存中的列式存儲進行壓縮 |
spark.sql.inMemoryColumnarStorage.batchSize | 1000 | 列式緩存時的每一個批處理的大小。把這個值調大可能會致使內存不夠的異常 |
spark.sql.parquet.compression.codec | snappy | 選擇不一樣的壓縮編碼器。可選項包括uncompressed/snappy/gzip/lzo |
到這裏,第七章-第九章的內容就所有總結完了,看完以後會對Spark的運行過程,性能調優以及存儲格式等有一個更清晰的概念。下一篇是最後一篇,5.15更新,主要講Spark Streaming和Spark MLlib機器學習的內容。順便也能夠和PySpark作機器學習作一下對比:D