【原】Learning Spark (Python版) 學習筆記(三)----工做原理、調優與Spark SQL

  週末的任務是更新Learning Spark系列第三篇,覺得本身寫不完了,但爲了改正拖延症,仍是得完成給本身定的任務啊 = =。這三章主要講Spark的運行過程(本地+集羣),性能調優以及Spark SQL相關的知識,若是對Spark不熟的同窗能夠先看看以前總結的兩篇文章:html

  【原】Learning Spark (Python版) 學習筆記(一)----RDD 基本概念與命令node

  【原】Learning Spark (Python版) 學習筆記(二)----鍵值對、數據讀取與保存、共享特性python

 

 

########################################我是正文分割線######################################sql

 

  第七章主要講了Spark的運行架構以及在集羣上的配置,這部分文字比較多,可能會比較枯燥,主要是講整個過程是怎麼運行的。首先咱們來了解一下Spark在分佈式環境中的架構,如圖1 所示shell

圖1 Spark分佈式結構圖數據庫

 

  如上圖所示,在Spark集羣中有一個節點負責中央協調,調度各個分佈式工做節點。這個中央協調點叫「驅動器節點(Driver)」,與之對應的工做節點叫「執行器節點(executor)」。驅動器節點和全部的執行器節點被稱爲一個Spark應用(Application)。Spark應用經過一個「集羣管理器(Cluster Manager)」的外部服務在集羣中的機器上啓動,其中它自帶的集羣管理器叫「獨立集羣管理器」。json

  驅動器節點:緩存

  做用網絡

  • 執行程序中的main()方法的進程,一旦終止,Spark應用也終止了。

  職責架構

  • 把用戶程序轉化爲任務
    • 用戶輸入數據,建立了一系列RDD,再使用Transformation操做生成新的RDD,最後啓動Action操做存儲RDD中的數據,由此構成了一個有向無環圖(DAG)。當Drive啓動時,Spark會執行這些命令,並轉爲一系列stage(步驟)來操做。在這些步驟中,包含了多個task(任務),這些task被打包送到集羣中,就能夠進行分佈式的運算了,是否是像流水線上的工人呢~
  • 爲執行器節點調度任務:
    • Driver啓動後,必須在各執行器進程間協調各個任務。執行器進程啓動後會在Driver上註冊本身的節點,這樣Driver就有全部執行器節點的完整記錄了。每一個執行器節點表明一個可以處理任務和存儲RDD數據的進程。Spark會根據當前任務的執行器節點集合,嘗試把全部的任務基於數據所在的位置分配給合適的執行器進程。當咱們的任務執行時,執行器進程會把緩存數據存儲起來,而驅動器進程一樣也會跟蹤這些緩存數據的任務,並利用這些位置信息來調度之後的任務,以儘可能減小數據的網絡傳輸。

  

  執行器節點:

  做用:

  • 負責在Spark做業中運行任務,各個任務間相互獨立。Spark啓動應用時,執行器節點就被同時啓動,並一直持續到Spark應用結束。

  職責:

  • 負責運行組成Spark應用的任務,並將結果返回給驅動器程序。
  • 經過自身的塊管理器(Block Manager)爲用戶程序中要求緩存的RDD提供內存式存儲。RDD是直接緩存在執行器進程裏的,因此能夠在運行時充分利用緩存數據提升運算速度。

 

  集羣管理器:

  在圖一中咱們看到,Spark依賴於集羣管理器來啓動執行器節點,而在某些特殊狀況下,也會依賴集羣管理器來啓動驅動器節點。Spark有自帶的獨立集羣管理器,也能夠運行在其餘外部集羣管理器上,如YARN和Mesos等。下面講一下兩種比較常見的外部集羣管理器:

  獨立集羣管理器:

1.啓動獨立集羣管理器

2.提交應用:spark-submit --master spark://masternode:7077 yourapp

  支持兩種部署模式:客戶端模式和集羣模式

3.配置資源用量:在多個應用間共享Spark集羣時,經過如下兩個設置來對執行器進程分配資源:

  3.1 執行器進程內存:能夠經過spark-submit中的 --executor-memory 參數來配置。每一個應用在每一個工做節點上最多擁有一個執行器進程。所以這個這個可以控制         執行器節點佔用工做節點多少內存。默認值是1G。

  3.2 佔用核心總數的最大值:能夠經過spark-submit中的 --total -executorcores 參數來設置。

  

  Hadoop YARN:

1.提交應用:設置指向你的Hadoop配置目錄的環境變量,而後使用spark-submit 向一個特殊的主節點URL提交做業便可。

2.配置資源用量:

  • --num -executors :設置執行器節點,默認值爲2
  • --executor -memory: 設置每一個執行器的內存用量
  • --executor -cores: 設置每一個執行器進程從YARN中佔用的核心數目
  • --queue:設置隊列名稱,YARN能夠將應用調度到多個隊列中。

 

   Apache Mesos:

1.提交應用:spark-submit --master mesos://masternode:5050 your app

2.Mesos調度模式:兩種:

  • 細粒度模式:默認模式。一臺運行了多個執行器進程的機器能夠動態共享CPU資源
  • 粗粒度模式:Spark爲每一個執行器分配固定數量的CPU數目,而且在應用結束前不會釋放該資源,即便執行器進程當前沒有運行任務(多浪費啊  = =)。能夠經過spark-submit 傳遞 --conf spark.mesos.coarse=true 來打開粗粒度模式

3.部署模式:僅支持以客戶端的部署模式運行應用,即驅動器程序必須運行提交應用的那臺機器上。

4.配置資源用量:

  • --executor -memory:設置每一個執行器進程的內存
  • --total -executor -cores :設置應用佔用的核心數(全部執行器節點佔用的總數)的最大值。若是不設置該值,Mesos可能會使用急羣衆全部可用的核心。

 

   選擇合適的集羣管理器:

1.通常狀況下,能夠直接選擇獨立集羣模式,功能全,並且簡單。

2.若是要在使用Spark的同時使用其餘應用,能夠選擇YARN或Mesos。並且大多數版本的Hadoop中已經預裝好YARN了,很是方便。

3.對於多用戶同事運行交互式shell時,能夠選擇Mesos(選擇細粒度模式),這種模式能夠將Spark-shell這樣的交互式應用中的不一樣命令分配到不一樣的CPU上。

4.任什麼時候候,最好把Spark運行在運行HDFS的節點上,能夠快速訪問存儲。

 

  提交應用:

  使用spark-submit腳本提交應用,能夠根據不一樣的狀況設置成在本地運行和在集羣運行等:

  • 本地模式:bin/spark-submit (--local) my_script.py

  (lcoal能夠省略)

  • 集羣模式:bin/spark-submit --master spark://host:7077 --executor-memory 10g my_script.py

  (--master標記要鏈接的集羣的URL)

 

  

  總結一下Spark在集羣上的運行過程:

 

 

 

#########################################我是看累了休息會兒的分割線##############################

 

 

  前面已經講完了Spark的運行過程,包括本地和集羣上的。如今咱們來說講Spark的調優與調試。

  咱們知道,Spark執行一個應用時,由做業、任務和步驟組成。先回顧一下:

任務:Spark的最小工做單位

步驟:由多個任務組成

做業:由一個或多個做業組成

  

  在第一篇中咱們也講過,當咱們建立轉化(Transformation)RDD時,是執行"Lazy"(惰性)計算的,只有當出現Action操做時纔會觸發真正的計算。而Action操做是如何調用Transformation計算的呢?實際上,Spark調度器會建立出用於計算Action操做的RDD物理執行計劃,當它從最終被調用Action操做的RDD時,向上回溯全部必需計算的RDD。調度器會訪問RDD的父節點、父節點的父節點,以此類推,遞歸向上生成計算全部必要的祖先RDD的物理計劃。

  然而,當調度器圖與執行步驟的對應關係並不必定是一對一的。當RDD不須要混洗數據就能夠從父節點計算出來,RDD不須要混洗數據就能夠從父節點計算出來,或把多個RDD合併到一個步驟中時,調度器就會自動進行進行"流水線執行"(pipeline)。例以下圖中,儘管有不少級父RDD,但從縮進來看,只有兩個步驟,說明物理執行只須要兩個步驟。由於這個執行序列中有幾個連續的篩選和映射操做,因此纔會出現流水線執行。

 

  

 

  當步驟圖肯定下來後,任務就會被建立出來併發給內部的調度器,這些步驟會以特定的順序執行。一個物理步驟會啓動不少任務,每一個任務都是在不一樣的數據分區上作一樣的事情,任務內部的流程是同樣的,以下所示:

1.從數據存儲(輸入RDD)或已有RDD(已緩存的RDD)或數據混洗的輸出中獲取輸入數據

2.執行必要的操做來計算RDD。

3.把輸出寫到一個數據混洗文件中,寫入外部存儲,或是發揮驅動器程序。

  

  總結一下Spark執行的流程

  • 用戶定義RDD的有向無環圖(DAG):RDD上的操做會建立出新的RDD,並引用它們的父節點,這樣就建立出了一個圖。
  • Action操做把有向無環圖強制轉譯爲執行計劃:Spark調度器提交一個做業來計算所必要的RD,這個做業包含一個或多個步驟,每一個步驟就是一些並行執行的計算任務。一個步驟對應有向無環圖中的一個或多個RDD(其中對應多個RDD是在"流水線執行"中發生的
  • 在集羣中調度並執行任務:步驟是按順序處理的,任務則獨立啓動來計算RDD的一部分。看成業的最後一個步驟結束時,一個Action操做也執行完了。

 

  Spark調優

  到這裏咱們已經基本瞭解Spark的內部工做原理了,那麼在哪些地方能夠進行調優呢?有如下四個方面:

  並行度

  • 影響性能的兩個方面
    • a.並行度太低時,會出現資源限制的狀況。此時能夠提升並行度來充分利用更多的計算core。
    • b.並行度太高時,每一個分區產生的間接開銷累計起來會更大。評價並行度是否太高能夠看你的任務是否是在瞬間(毫秒級)完成的,或者任務是否是沒有讀寫任何數據。
  • 調優方法
    • 在數據混洗操做時,對混洗後的RDD設定參數制定並行度
    • 對於任何已有的RDD進行從新分區來獲取更多/更少的分區數。從新分區:repartition();減小分區:coalesce(),比repartition()更高效。

  

  序列化格式

  當Spark須要經過網絡傳輸數據,或者將數據溢出寫到磁盤上時(默認存儲方式是內存存儲),Spark須要數據序列化爲二進制格式。默認狀況下,使用Java內建的序列化庫。固然,也支持使用第三方序列化庫Kryo,比Java序列化時間更短,而且有更高壓縮比的二進制表示。但有一點須要注意:Kryo不能序列化所有類型的對象。

  

  內存管理

  • RDD存儲(60%)
    • 調用persisit()或cahe()方法時,RDD的分區會被存儲到緩存區中。Spark會根據spark.storage.memoryFraction限制用來緩存的內存佔整個JVM堆空間的比例大小。超出限制的話,舊的分區會被移出內存。
  • 數據混洗與聚合的緩存區(20%)
    • 當數據進行數據混洗時,Spark會創造一些中間緩存區來存儲數據混洗的輸出數據。根據spark.shuffle.memoryFraction限定這種緩存區佔總內存的比例。
  • 用戶的代碼(20%)
    • spark能夠執行任意代碼,因此用戶的代碼能夠申請大量內存,它能夠訪問JVM堆空間中除了分配給RDD存儲和數據混洗存儲之外的所有空間。20%是默認狀況下的分配比例。不過用戶能夠自行調節這個比例來提升性能。

  

  固然,除了調整內存比例,也能夠改變內存的存儲順序。咱們知道,Spark默認的cache()操做是以Memory_ONLY的存儲等級持久化數據的,也就是說內存優先。若是RDD分區時的空間不夠,舊的分區會直接刪除。(妹的刪數據也不帶打聲招呼的 = =!)當用到這些分區時,又會從新進行計算。因此,若是咱們用Memory_AND_DISK的存儲等級調用persist()方法效果會更好。由於當內存滿的時候,放不下的舊分區會被寫入磁盤,再用的時候就從磁盤裏讀取回來,這樣比從新計算各分區的消耗要小得多,性能也更穩定(不會動不動報Memory Error了,哈哈)。特別是當RDD從數據庫中讀取數據的話,最好選擇內存+磁盤的存儲等級吧。

 

  硬件供給

影響集羣規模的主要這幾個方面:分配給每一個執行器節點的內存大小、每一個執行器節點佔用的核心數、執行器節點總數、以及用來存儲臨時數據的本地磁盤數量(在數據混洗使用Memory_AND_DISK的存儲等級時,更大的磁盤能夠提高Spark的性能哦~)。

 

 

##################################我是文章快結束的分割線######################################

 

 

  最後咱們來說講Spark SQL,上一篇中咱們已經總結了如何使用Spark讀取和保存文件,涉及到了這部份內容,因此這一篇中只會簡要的說明一下:

  導入Spark SQL與簡單的查詢示例

 1 #初始化Spark SQL
 2 #導入Spark SQL
 3 from pyspark.sql import HiveContext,Row  4 #當不能引入Hive依賴時
 5 from pyspark.sql import SQLContext,Row  6 #建立SQL上下文環境
 7 hiveCtx = HiveContext(sc)  8 #基本查詢示例
 9 input = hiveCtx.jsonFile(inputFile) 10 #註冊輸入的SchemaRDD(SchemaRDD在Spark 1.3版本後已經改成DataFrame)
11 input.registerTempTable("tweets") 12 #依據retweetCount(轉發計數)選出推文
13 topTweets = hiveCtx.sql("SELECT text,retweetCount FROM tweets ORDER BY retweetCount LIMIT 10")

 

  緩存

以一種列式存儲格式在內存中存儲數據。這些緩存下來的表只會在Driver的生命週期內保留在內存中,退出的話就沒有了。能夠經過cache() 和 uncache()命令來緩存表或者刪除已緩存的表。

 

 

  讀取和存儲數據

  Apache Hive

1 #使用Python從Hive中讀取
2 from pyspark.sql import HiveContext 3 
4 hiveCtx = HiveContext(sc) 5 rows = hiveCtx.sql("SELECT key,value FROM mytable") 6 keys = rows.map(lambda: row,row[0])

 

  Parquet

 1 #Python中的Parquet數據讀取
 2 #從一個有name和favoriteAnimal字段的Parquet文件中讀取數據
 3 rows = hiveCtx.parquetFile(parquetFile)  4 names = rows.map(lambda row: row.name)  5 print "Everyone"
 6 print names.collect()  7 
 8 #Python中的Parquet數據查詢
 9 #這裏把Parquet文件註冊爲Spark SQL的臨時表來查詢數據
10 #尋找熊貓愛好者
11 tbl = rows.registerTempTable("people") 12 pandaFriends = hiveCtx.sql("SELECT name FROM people WHERE favouriteAnimal = \"panda\"") 13 print "Panda friends"
14 print pandaFriends.map(lambda row:row.name).collect() 15 
16 #使用saveAsParquetFile()保存文件
17 pandaFriends.saveAsParqueFile("hdfs://")

 

  JSON

 1 #在python中讀取JSON數據 2 input= hiveCtx.jsonFile(inputFile) 

 

  使用BeeLine

  建立、列舉、查詢Hive表

 

  用戶自定義函數(UDF)

1 #Python版本的字符串長度UDF
2 hiveCtx.registerFuction("strLenPython",lambda x :len(x),IntegerType()) 3 LengthSchemaRDD = hiveCtx.sql("SELECT strLenPython('text') FROM tweets LIMIT 10")

 

  Spark SQL性能

  Spark SQL在緩存數據時,使用的是內存式的列式存儲,即Parquet格式,不只節約了緩存時間,並且儘量的減小了後續查詢中針對某幾個字段時的數據讀取。

性能調優選項

選項 默認值 用途
spark.sql.codegen false 設爲True時,Spark SQL會把每條查詢語句在運行時編譯爲Java二進制代碼。這能夠提升大型查詢的性能,但在小規模查詢時會變慢
spark.sql.inMemoryColumnarStorage.compressed false 自動對內存中的列式存儲進行壓縮
spark.sql.inMemoryColumnarStorage.batchSize 1000 列式緩存時的每一個批處理的大小。把這個值調大可能會致使內存不夠的異常
spark.sql.parquet.compression.codec snappy
選擇不一樣的壓縮編碼器。可選項包括uncompressed/snappy/gzip/lzo

 

  到這裏,第七章-第九章的內容就所有總結完了,看完以後會對Spark的運行過程,性能調優以及存儲格式等有一個更清晰的概念。下一篇是最後一篇,5.15更新,主要講Spark Streaming和Spark MLlib機器學習的內容。順便也能夠和PySpark作機器學習作一下對比:D

相關文章
相關標籤/搜索