分析spark的內存模型,文章來源於 https://0x0fff.com/spark-architecture/ 這裏爲了閱讀流暢,把原文英文去除了,你們有疑問能夠對照查看。html
Spark分佈式系統架構 由Alexey Grishchenko編寫該文章java
spark架構 在此文章中講述的內存模型在spark1.6++後的版本中再也不使用,新模型使用UnifiedMemoryManager類實現,在https://0x0fff.com/spark-memory-management/文中中我進行了相應的描述。node
最近我在StackOverflow上回復了關於spark架構的一系列問題,這些促使我在網上寫一篇介紹spark架構的文章。這些內容官方文檔、相關的書籍或者網站都沒有,或許他們剛好缺少一些好的圖片描述spark架構。linux
在這篇文章中,我將好好去解決這些問題,提供一個非官方的spark架構指南並解答這方面的廣泛的一些問題及疑惑。這篇文章不針對於新手,看這篇文章須要一些RDD DAG方面的知識。git
本篇文章是spark系列的第一篇文章,本系列第二篇文章是關於的spark shuffle的,第三篇文章是關於spark新的內存模型的。github
好啦,讓咱們從下面的圖片開始吧(http://spark.apache.org/docs/1.3.0/cluster-overview.html)apache
如圖所見,這圖中同時展現了"executor", "task", "cache", "Worker Node" ...,我以前學習spark的時候,這是關於spark架構惟一的圖片,固然如今也差很少。我我的不喜歡這圖,由於它並無用更好的方式來展現這些重要的概念。數組
讓咱們從頭開始,任何運行在集羣或者本地機器上的spark進程都是JVM進程。做爲JVM進程,你就能夠用"-Xmx and -Xms flags of the JVM"配置堆內存大小。可是進程如何使用配置的堆內存及爲何須要那麼大小的內存?下面有個spark內存結構圖:緩存
Spark默認爲512M的堆內存大小.爲了安全考慮和防止出現 法力耗盡(OOM),錯了,是內存溢出的錯誤,因此spark容許只利用整個堆內存的90%的內存大小,這部分這裏稱爲"安全堆內存"(safe heap),這個能夠經過配置置"spark.storage.safetyFraction"參數修改。安全
由於效率高的spark是基於內存的,spark在內存中會存儲數據。可是若是你讀過 h-t-t-p-s://0x0fff.com/spark-misconceptions/ 這篇文章,你應該瞭解到spark不只僅是是基於內存的,它只是利用了它的LRU緩存{h-t-t-p://en.wikipedia.org/wiki/Cache_algorithms}的內存。因此有一些內存被用來緩存運行過程的數據,這部分通常佔到"安全堆內存"的60%,能夠經過配置"spark.storage.memoryFraction"參數來控制該比例。
因此若是你想知道你能夠在spark中存儲多少數據,你應該把全部的executors的堆內存總量*safetyFraction參數配置*memoryFraction參數配置,默認的話就是:全部的executors的堆內存總量*0.9*0.6=全部的executors的堆內存總量*54%.
上面講的是存儲storage的內存佔用,shuffle的內存佔用相似,他是"Heap Size」* spark.shuffle.safetyFraction(默認0.8) * spark.shuffle.memoryFraction(默認0.2)="Heap Size"*16% ,因此shuffle佔用16%左右堆內存資源。
你能夠在(https://github.com/apache/spark/blob/branch-1.3/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala) 瞭解到spark是如何使用這些內存的,不過他們一般來講用於shuffle操做。
當shuffle運行的時候,你一般須要排序數據,因此你須要一個緩存區來存儲排序過的數據(記住,你不能在LRU緩存中修改原來的數據,由於他們後面還會再使用)。因此你須要必定量的內存來存儲這些大塊的排序後的數據。
若是沒有足夠的內存來排序數據會發生什麼?會有一個很大的計算範圍指向"額外排序區",這可以使你一塊接一塊進行排序,最後纔會合併結果。
最後一部份內存,咱們稱爲"展開內存"。這一部分的內存可使用"整個內存 * spark.storage.unrollFraction(默認0.2) * spark.storage.memoryFraction(默認0.6) * spark.storage.safetyFraction(默認0.9) " = 即"堆內存總量的10.8%"。這一部份內存是用於展開數據使用的。爲何你總須要這一部份內存配置呢?由於spark容許你以序列化和反序列號的方式來存儲數據,序列化的數據不能直接使用,因此你須要這部份內存來進行反序列化數據。由於這部份內存是共享的,因此若是被佔用了,在你展開數據的時候,內存不夠的時候,它可能會扔掉一些在Spark LRU緩存中的partitions來空出一些內存。
好了,如今你應該知道spark的進程了,而且知道她們是如何利用內存了。如今咱們看看集羣模式,當你開啓一個spark集羣,內存佔用是什麼樣子的呢?我喜歡Yarn這個資源管理器,我將會講述一些在Yarn中,她們是如何工做,通常狀況下,其餘集羣上也是相似的,這裏以Yarn爲例:
在yarn集羣上,有一個yarn資源管理的守護進程來控制集羣資源(基本上是內存),還有一些運行在集羣節點上的yarn節點管理器,它們控制節點資源利用。從yarn角度來看,每個節點控制着一個內存池。當你向yarn資源管理器申請資源的時候,它通過分析會提供你能夠運行executor容器的節點管理器的相關信息。每個executor容器就是一個含堆內存大小的JVM。JVM的選擇由yarn資源管理器控制,你沒法控制它選擇哪個。例如:若是一個節點有64G內存(在 yarn-site.xml配置yarn.nodemanager.resource.memory-mb參數),這時候你須要10個4G的executors,那樣的話,即便你有一個大集羣,可是你須要的這些也能夠在單節點的yarn節點輕鬆啓動運行起來。
當你基於yarn啓動spark集羣,你指定了你
須要的executors個數(–num-executors flag or spark.executor.instances 參數),
每個executor須要的內存(–executor-memory flag or spark.executor.memory 參數),
每個executor須要的cpu core(–executor-cores flag of spark.executor.cores parameter),
每個task任務執行所需的cpu core (spark.task.cpus 參數)。
你也指定了driver application的所需內存(–driver-memory flag or spark.driver.memory 參數)。
當你在集羣上執行Job的時候,你的任務將會被切分紅不少stages,每個stage會切分紅不少的tasks,每個task被分開有計劃地執行。 (上述是任務切分,下面是偏重spark組件(譯者添加)) 一個executor是一個JVM,多個executors是一堆JVMS 你能夠認爲這些JVMS,是一些"task執行槽"組成的池。每個executor將會爲你的tasks提供必定數量的"執行槽" ,這數量是: "參數 spark.executor.cores / 參數 spark.task.cpus" 獲得的。 舉個例子: spark在yarn資源管理器上有12個節點,每一個節點64G的內存,32個cpu core(16個物理cpu core線程虛擬出),這樣,在供給系統、yarn、hdfs運行所需內存外,每一個節點上能夠運行兩個executor,每一個能夠用26G的內存和12個cpu core。 整體上,你的集羣將有12臺機器,每臺2個executor,集羣一共12*2的executors,每一個executor有12個cpu cores,1個task一個1 core,那麼一共有任務槽數量:12*2*12/1=288個。 這意味着你的spark集羣,將可以平行運行288個任務,這樣就充分利用了你的集羣資源。 你能夠用於存儲數據的內存數量:0.9 spark.storage.safetyFraction * 0.6 spark.storage.memoryFraction * 12 機器 * 每臺機器 2 executors * 每一個executor 能夠用 26 GB = 336.96 GB.它並很少,但在大多數狀況下是足夠的。
到此,你知道了spark是怎麼使用JVM內存的,而且知道了在你的集羣中有多少"執行槽"。你可能注意到了,"task"究竟是什麼東東,我沒有很深刻的講解,我將在下一篇文章中做爲主題來說,他通常做爲一個獨立的spark工做單元,而且是做爲executor JVM中的一個線程來運行的。這是spark啓動任務時間短的一個祕密,在JVM中,使用額外的線程分支比啓動整個JVM要快的多,當你在啓動hadoop中的mapreduce的時候就是啓動整個JVM,因此在spark中啓動job要比在mapreduce中啓動要快。
如今讓咱們看看在spark中的另外一個抽象的概念"partition",你在spark中的工做數據所有會分紅partitions。一個單獨的partition究竟是什麼,它是怎麼被分配出來的?
partition的大小徹底取決於你使用的數據源。對於大部分的spark讀取數據的方法,你均可以肯定出RDD有的partitions數量。當你從HDFS中讀取文件的時候,你這時候使用Hadoop的InputFormat。(經過查看spark源碼能夠知道---譯者添)InputFormat返回的每個input split造成了RDD的獨立的partition。
在HDFS中的每個input split都是在HDFS中生成的單獨的數據block,通常大小約爲64M(hadoop1.x默認配置)或者128M(hadoop2.x默認配置)。爲何說是"大約",由於雖然在HDFS是以字節大小區分block大小的,可是在運行中,它是以數據記錄來分割block邊界的。對於文本文件,分割字符在新的一行的開始,對於sequence文件,分割點爲block...
對於壓縮文件,規則比較特殊,若是你壓縮了整個文本,它就不能分割爲記錄,整個文件將變成一個單獨的input split,後面將會在spark中造成單獨的一個partition,因此你必須手工來repartition。
如今咱們所知的很簡單(做者說的),產生一個單獨的partition做爲spark的數據,而後造成一個task,而後這個task會在一個task槽上被執行,這個task槽挨着你的數據很近(hadoop的block位置,spark緩存的partition位置)。
其實其中的相關的東西遠遠比本篇文章中要多,在下篇文章中,我將講述spark是怎麼切分stages,把stage切分紅task的,spark在集羣中是怎麼shuffle數據的,還有其餘一些有用的東東...
這是本系列的第一篇文章,第二篇是關於shuffle,第三篇是關於spark在版本1.6+的新的內存管理模型...
討論區有用的翻譯:
Raja: 好的研究,我感受盡管使用LRU緩存,足夠的內存和節點將會保存(不懂---譯者)。我認爲整合Tachyon將會更好,消除重複內存數據,還有不少其餘特性,好比速度提高、分享、安全,這是個人見解。 做者: 固然,若是你整合tacyon,會更快一些:由於更多的數據在linux cache中緩存了,更多的數據加載到spark的內存中了。可是通常來講,tacyon能夠提高持久化數據方面的IO,可是spark的瓶頸不在這裏,在寫shuffle數據到本地文件系統中 關於你說的去除重複數據,我不太明白,你能夠在spark或者持久化到HDFS、HBase、Cassandra, Tachyon...,取決於你和持久化級別,不能幫助你去重。
firemonk9: spark.shuffle.memoryfraction 在堆內存的老生代中存儲空間嗎 (像 spark.storage.memoryfraction) ?
做者: 它取決於你JVM設置,spark不能控制內存位於新生代或者老生代中。在整個shuffle階段,內存像一個連續的Longs數組(128M的大塊,請查看see https://github.com/apache/spark/blob/branch-1.4/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala through https://github.com/apache/spark/blob/branch-1.4/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java and https://github.com/apache/spark/blob/branch-1.4/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java),它取決於JVM來決定將它放到新生代或者直接定位到老生代。
Aniruddh Sharma: 我有一些疑問。Executor JVM運行這些task須要運行多少線程?誰來運行怎麼來運行?這個"executor-cores"參數在決定多少線程的時候也會產生做用嗎?線程的個數被core的數量決定嗎?怎麼決定的? (文章中提過一個executor只運行了一個task,有讀者提出這個數量怎麼來肯定----譯者添)
對於線程的數量,它是看狀況的。例如:一個集羣運行30個線程,一個executor運行一個task(spark.executor.cores=1)。固然只有一個task線程,其餘的都是附加的。task線程是由executor sheduler在executor中啓動的,這是被driver的scheduler容許的。
在每一個executor JVM中執行的task線程數量是能夠被"spark.executor.cores"參數設定的,這個名字有點歧義,實際上它跟cores數量不要緊,僅僅設置用戶任務進程的線程數量。它取這樣的名字,由於它設置的數量和executor分配的cpu cores數量相等的話,這是一種最佳實踐。task executor的線程數量和機器的cpu cores無關,你能夠設置spark.executor.cores爲10,這樣你就在1個cpu core的executor中運行了10個線程
由於executor core通常虛擬爲一個線程不是一個真實的core,我想我怎麼讓一個task分配一個1個core?
做者: 當yarn來控制執行的狀況,在最新版本中的yarn(Apache Hadoop 2.7.2)中,它使用CGroups來控制executors的CPU,來確保excutors不會過多提交CPU資源,多於他們自身所須要的。 (https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/NodeManagerCgroups.html)沒有辦法作到task定製--它徹底取決於task作什麼,怎麼作。你能夠在你的方法中產生必定數量的線程,它將做爲單獨的一個task,這樣執行將會影響到其餘 同一個executor中的其餘task.