於Worker Actor於,每次LaunchExecutor這將建立一個CoarseGrainedExecutorBackend流程。Executor和CoarseGrainedExecutorBackend是1對1的關係。也就是說集羣裏啓動多少Executor實例就有多少CoarseGrainedExecutorBackend進程。
html
那麼到底是怎樣分配Executor的呢?怎麼控制調節Executor的個數呢?java
如下主要介紹一下Spark Executor分配策略:node
咱們僅看。當Application提交註冊到Master後,Master會返回RegisteredApplication,以後便會調用schedule()這種方法,來分配Driver的資源。和啓動Executor的資源。git
schedule()方法是來調度當前可用資源的調度方法,它管理還在排隊等待的Apps資源的分配。這種方法是每次在集羣資源發生變更的時候都會調用,依據當前集羣最新的資源來進行Apps的資源分配。github
// First schedule drivers, they take strict precedence over applications val shuffledWorkers = Random.shuffle(workers) // 把當前workers這個HashSet的順序隨機打亂 for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { //遍歷活着的workers for (driver <- waitingDrivers) { //在等待隊列中的Driver們會進行資源分配 if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { //當前的worker內存和cpu均大於當前driver請求的mem和cpu。則啓動 launchDriver(worker, driver) //啓動Driver 內部實現是發送啓動Driver命令給指定Worker。Worker來啓動Driver。 waitingDrivers -= driver //把啓動過的Driver從隊列移除 } } }
val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
這種纔算是對App可用的Worker。apache
/** * Can an app use the given worker?True if the worker has enough memory and we haven't already * launched an executor for the app on it (right now the standalone backend doesn't like having * two executors on the same worker). */ def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = { worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app) }數組
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app // in the queue, then the second app, etc. if (spreadOutApps) { // Try to spread out each app among all the nodes, until it has all its cores for (app <- waitingApps if app.coresLeft > 0) { //對還未被全然分配資源的apps處理 val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(canUse(app, _)).sortBy(_.coresFree).reverse //依據core Free對可用Worker進行降序排序。 val numUsable = usableWorkers.length //可用worker的個數 eg:可用5個worker val assigned = new Array[Int](numUsable) //候選Worker,每個Worker一個下標,是一個數組,初始化默認都是0 var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)//還要分配的cores = 集羣中可用Worker的可用cores總和(10)。 當前未分配core(5)中找最小的 var pos = 0 while (toAssign > 0) { if (usableWorkers(pos).coresFree - assigned(pos) > 0) { //以round robin方式在所有可用Worker裏推斷當前worker空暇cpu是否大於當前數組已經分配core值 toAssign -= 1 assigned(pos) += 1 //當前下標pos的Worker分配1個core +1 } pos = (pos + 1) % numUsable //round-robin輪詢尋找有資源的Worker } // Now that we've decided how many cores to give on each node, let's actually give them for (pos <- 0 until numUsable) { if (assigned(pos) > 0) { //假設assigned數組中的值>0,將啓動一個executor在。指定下標的機器上。 val exec = app.addExecutor(usableWorkers(pos), assigned(pos)) //更新app裏的Executor信息 launchExecutor(usableWorkers(pos), exec) //通知可用Worker去啓動Executor app.state = ApplicationState.RUNNING } } } } else {
。markdown
奉獻當前Worker全部資源。(Ps:挨個榨乾每個Worker的剩餘資源。。。。app
)負載均衡
} else { // Pack each app into as few nodes as possible until we've assigned all its cores for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { for (app <- waitingApps if app.coresLeft > 0) { if (canUse(app, worker)) { //直接問當前worker是有空暇的core val coresToUse = math.min(worker.coresFree, app.coresLeft) //有則取。不管多少 if (coresToUse > 0) { //有 val exec = app.addExecutor(worker, coresToUse) //直接啓動 launchExecutor(worker, exec) app.state = ApplicationState.RUNNING } } } } } }
二、針對同一個App。每個Worker裏僅僅能有一個針對該App的Executor存在。切記。
假設想讓整個App的Executor變多,設置SPARK_WORKER_INSTANCES。讓Worker變多。
三、Executor的資源分配有2種策略:
3.一、SpreadOut :一種以round-robin方式遍歷集羣所有可用Worker。分配Worker資源。來啓動建立Executor的策略,優勢是儘量的將cores分配到各個節點。最大化負載均衡和高並行。
3.二、非SpreadOut:會盡量的依據每個Worker的剩餘資源來啓動Executor,這樣啓動的Executor可能僅僅在集羣的一小部分機器的Worker上。這樣作對node較少的集羣還能夠,集羣規模大了。Executor的並行度和機器負載均衡就不能夠保證了。
行文倉促,若有不正之處,請指出,歡迎討論 :)
一、關於: 一個App一個Worker爲何僅僅有贊成有針對該App的一個Executor 究竟這樣設計爲什麼? 的討論:
連城404:Spark是線程級並行模型。爲何需要一個worker爲一個app啓動多個executor呢?
樸動_zju:一個worker相應一個executorbackend是從mesos那一套遷移過來的,mesos下也是一個slave一個executorbackend。我理解這裏是可以實現起多個,但起多個貌似沒什麼優勢,而且添加了複雜度。
CrazyJvm:@CodingCat 作了一個patch可以啓動多個,但是尚未被merge。 從Yarn的角度考慮的話,一個Worker可以相應多個executorbackend,正如一個nodemanager相應多個container。 @OopsOutOfMemory
OopsOutOfMemory:回覆@連城404: 假設一個executor太大且裝的對象太多。會致使GC很是慢,多幾個Executor會下降full gc慢的問題。 see this post http://t.cn/RP1bVO4(今天 11:25)
連城404:回覆@OopsOutOfMemory:哦。這個考慮是有道理的。
一個workaround是單臺機器部署多個worker。worker相對來講比較便宜。
JerryLead:回覆@OopsOutOfMemory:看來都還在變化其中,standalone 和 YARN 仍是有很是多不一樣,咱們暫不下結論 (今天 11:35)
JerryLead:問題開始變得複雜了,是提升線程並行度仍是提升進程並行度?我想 Spark 仍是優先選擇前者,這樣 task 好管理。而且 broadcast,cache 的效率高些。後者有一些道理。但參數配置會變得更復雜,各有利弊吧 (今天 11:40)
未完待續。。
。
傳送門:@JerrLead https://github.com/JerryLead/SparkInternals/blob/master/markdown/1-Overview.md
——EOF——
原創文章。轉載請註明來自:http://blog.csdn.net/oopsoom/article/details/38763985