目錄html
研究Spark做業調度,是爲了合理使用集羣的資源。更具體一點,是看看是否提供了能夠個性化配置的點,而後根據應用的具體狀況制定配置或者使用方案。node
本文參考官網做業調度文檔。apache
spark的做業調度分爲兩個場景:跨應用的調度和應用內部的調度,下面分別介紹。緩存
聲明:文中配圖是本身的理解,並不敢保證其準確性。網絡
跨應用的調度是由底層的集羣管理器負責的,有兩種資源分配策略。spa
一種是靜態資源分隔,即一個應用一開始就申請全部的資源,並在程序運行期間使用持有這些資源。線程
一種是動態資源分配,應用根據本身的負載狀況動態請求或釋放資源。這種策略默認是不開啓的。scala
全部的集羣管理器都支持靜態資源分隔,只是具體的配置策略不一樣:code
提交到Standalone mode集羣的應用會以FIFO的順序運行,每個正在運行的應用都會嘗試佔用全部的可用資源。使用下面的配置項能夠限制每一個應用申請的資源:xml
spark.cores.max
應用能夠申請的最大數量的CPU核的數量,若是沒有設置,取spark.deploy.defaultCores
的值。
spark.executor.memory
分配給每一個executor進程的內存資源。
爲了使用靜態資源隔離,須要設置spark.mesos.coarse
爲true,這稱爲粗粒度的Mesos模式。
另外,spark.cores.max
和spark.executor.memory
在Mesos模式下一樣有效。
--num-executors
在使用spark-submit提交做業時,可使用--num-executors
選項請求指定的executor個數。
在程序內部,能夠經過設置spark.executor.instances
屬性達到一樣的目的。
--executor-memory
在使用spark-submit提交做業時,可使用--executor-memory
選項設置每一個executor申請的內存。
在程序內部,能夠經過設置spark.executor.memory
屬性達到一樣的目的。
--executor-cores
在使用spark-submit提交做業時,可使用--executor-cores
選項設置每一個executor申請的CPU核。
在程序內部,能夠經過設置spark.executor.cores
屬性達到一樣的目的。
spark的運行模型是基於executor的,executor是資源的實際持有者。因此動態資源分配,是經過動態的申請executor和釋放executor來實現的。
動態資源分配涉及到兩個方面,如何在須要的時候動態申請資源,以及如何在空閒的時候動態釋放資源。
動態請求策略:若是一個應用有tasks在等待,超過必定的時間(spark.dynamicAllocation.schedulerBacklogTimeout
秒)就會申請1個executor。此後每隔必定的時間(spark.dynamicAllocation.sustainedSchedulerBacklogTimeout
秒)就檢測應用是否有tasks在等待,有就繼續申請executor。
動態請求資源的數量是指數級的,第一次申請1個,第二次申請2個,接着是4, 8 ...這種考慮是爲了在謹慎申請資源的同時,又能夠在容許的時間範圍內得到真正須要的資源量。
動態釋放資源:是經過檢查應用佔據的executor是否超過了指定的時間(spark.dynamicAllocation.executorIdleTimeout
秒)來決定的,超過了就釋放。
釋放資源的條件和請求資源的條件是互斥的,即若是一個應用有tasks在排隊,就不該該會有空閒的executor。
爲了使用動態資源分配,須要作兩件事:
spark.dynamicAllocation.enabled
值爲trueexternal shuffle service
,並設置spark.shuffle.service.enabled
爲trueexternal shuffle service
的做用在後面會介紹,不一樣集羣模式下啓動external shuffle service
的方式不一樣:
external shuffle service
,只須要設置spark.shuffle.service.enabled
爲true便可。$SPARK_HOME/sbin/start-mesos-shuffle-service.sh
來啓動external shuffle service
。動態釋放資源須要額外的支持,由於executor可能會產生中間結果並輸出到本地,在須要的時候須要經過這個executor獲取它的中間結果。冒然移除executor會丟失它計算的中間結果,致使在真正須要的時候又要從新計算。
好比在map階段executor輸出map結果,在shuffle階段這些map結果又須要經過executor讀出來傳送到負責reduce的executor。
spark經過external shuffle service
來解決這個問題。external shuffle service
是指在每個node都運行的一個長期進程,這個進程獨立於應用和executor,負責提供executor的輸出數據的獲取服務。原來executor之間相互請求來獲取對方的輸出結果,變成了統一從external shuffle service獲取結果。
即便executor已經被移除了,它所輸出的數據依然能夠經過external shuffle service
來獲取。
另外,executor還可能會把中間結果緩存到內存,目前的策略是不移除此類的executor。將來可能採起將緩存持久化的方式,進而釋放executor。
一個spark應用能夠支持多個不一樣線程的job同時提交,這常見於spark應用提供網絡服務的場景。
spark默認的調度策略是FIFO,若是隊列頭部的job比較大,佔用了集羣的全部資源,後面的小任務將遲遲得不到運行的機會。
另外,spark還支持配置FAIR調度,spark循環調度每一個job的task。這樣即便有大job在運行,剛提交的小job也能夠及時得到資源,而不須要等到大job結束。
經過設置屬性spark.scheduler.mode
來啓用公平調度:
val conf = new SparkConf().setMaster(...).setAppName(...) conf.set("spark.scheduler.mode", "FAIR") val sc = new SparkContext(conf)
spark支持公平調度池的概念,每一個線程能夠指定將jobs提交到哪一個池子,最細粒度的場景下是每一個線程對應一個池,也能夠多個線程使用同一個池。
每一個線程默認使用的池是default,也能夠經過設置參數來明確指定池。
// Assuming sc is your SparkContext variable sc.setLocalProperty("spark.scheduler.pool", "pool1")
若是想重置當前線程綁定的池子,調用sc.setLocalProperty("spark.scheduler.pool", null)
。
能夠經過配置文件將資源按照必定的比重分配到池,配置文件的模板:conf/fairscheduler.xml.template
。
經過conf.set("spark.scheduler.allocation.file", "/path/to/file")
指定配置文件。
每一個池可支持的參數有三個:
配置文件示例:
<?xml version="1.0"?> <allocations> <pool name="production"> <schedulingMode>FAIR</schedulingMode> <weight>1</weight> <minShare>2</minShare> </pool> <pool name="test"> <schedulingMode>FIFO</schedulingMode> <weight>2</weight> <minShare>3</minShare> </pool> </allocations>
沒有出如今配置文件中的池,全部參數取默認值(schedulingMode=FIFO,weight=1,minShare=0)。
executor到底指什麼?和容器、JVM的關係是怎樣的?
executor是負責必定職責的程序組件,能夠在已有的JVM中運行(好比local mode),也能夠在新的JVM中運行。使用YARN時,executor是在YARN容器中運行的。
spark的job - stage - task的劃分是怎麼樣的?
spark的job能夠劃分爲多個stage,這些stage構成了DAG。每個stage又能夠劃分爲多個tasks。stage的劃分是根據shuffle map task來的,這一類的task至關於MapReduce中shuffle的map端,負責在本地RDD分區進行計算,並將結果輸出到新的分區,供後續的使用。在劃分stage時,shuffle map任務做爲階段的結束的邊界。
Mesos的粗粒度和細粒度
Mesos能夠啓用CPU核的共享,即同一個節點executor在不使用核的狀況下可讓給另外一個executor來使用。
不啓用CPU核共享稱爲粗粒度,啓用則稱爲細粒度,相關的配置項爲spark.mesos.coarse
,值爲true表示粗粒度。