上一篇文章已經已經執行到 Client 向 masterEndpoint 發送了 RequestSubmitDriver 信息,下面就看看 master 怎麼註冊 driver 信息,而且怎麼讓 worker 去啓動 driver 的。java
這個 Master 就是前面 Client 發送的對象,是一個 ThreadSafeRpcEndpoint。內部的 receiveAndReply
這個方法在監聽外部發來信息。下面就來看這個方法。web
這個方法內部會根據發送過來的消息作模式匹配,咱們找到 Client 發送過來的 RequestSubmitDriver 這個消息對應代碼,以下:算法
// 匹配到 Client 發送過來的消息
case RequestSubmitDriver(description) =>
// 判斷當前 master 的狀態是否爲 alive
if (state != RecoveryState.ALIVE) {
// 若是不是 alive 則回覆 driver 提交失敗
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can only accept driver submissions in ALIVE state."
context.reply(SubmitDriverResponse(self, false, None, msg))
} else {
logInfo("Driver submitted " + description.command.mainClass)
// 根據 client 發過來的 driver 信息建立 driver,而後持久化 driver
// 而後將 driver 加入到等待隊列中去
val driver = createDriver(description)
persistenceEngine.addDriver(driver)
waitingDrivers += driver
// 將 driver 加入到 HashSet 中去
drivers.add(driver)
// 開始調度
schedule()
// TODO: It might be good to instead have the submission client poll the master to determine
// the current status of the driver. For now it's simply "fire and forget".
context.reply(SubmitDriverResponse(self, true, Some(driver.id),
s"Driver successfully submitted as ${driver.id}"))
}
複製代碼
這段代碼,作了這麼一些操做:判斷當前 master 的狀態是否爲 alive ,若是不是則回覆消息說:提交失敗,若是是則根據傳遞過來的 driver 信息建立 driver 對象(經過 createDriver 方法建立)並將其持久化,加入到等待隊列中去,而後開始執行調度算法 schduler。apache
這裏涉及到連個方法,分別能夠看一下,一個是 createDriver 方法,一個是 schduler 方法。app
// 建立 driver 對象
private def createDriver(desc: DriverDescription): DriverInfo = {
val now = System.currentTimeMillis()
val date = new Date(now)
// 經過系統當前時間生成一個 driverId
// 而後將系統當前時間,driverId,DriverDescription,日期 這些信息封裝成一個 DriverInfo
new DriverInfo(now, newDriverId(date), desc, date)
}
複製代碼
這個方法主要是經過當前時間生成一個 driverId,而後將當前時間,DriverDescription 等參數封裝成一個 DriverInfo 對象。dom
該方法在 master 中會被屢次調用,每當 driver 的等待隊列中數據發生變更或者集羣資源發生變化都會掉用這個方法。這個方法主要是爲當前 driver 的等待隊列分配資源的。ide
private def schedule(): Unit = {
// 首先判斷當前 master 的狀態是否爲 alive 的,若是不是 alive 則不往下執行
if (state != RecoveryState.ALIVE) {
return
}
// Random.shuffle 這個方法主要是隨機分散各個元素,具體代碼能夠點進去看
// 這裏主要是將集羣中 state 爲 alive 的 worker 帥選出來,而後隨機打亂
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
// 當前 alive 的 worker 數量
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
// 將等待分配資源的 driver 隊列中的全部 driver 進行遍歷
// 而後爲每一個 driver 遍歷一遍全部的 alive worker,當碰到 worker 的可用內存和比當前隊列中
// 等待的 driver 所須要的內存要大而且 worker 的 core 數量也知足 driver 的需求時
// 就會調用 launcherDriver 方法去將 driver 發送對應的 worker 上去執行
for (driver <- waitingDrivers.toList) {
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
// 找到符合條件的 worker
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
// 將該 driver 從等待隊列中移除
waitingDrivers -= driver
// 標記當前 driver 爲 launched
launched = true
}
// 移到下一個 driver 上
curPos = (curPos + 1) % numWorkersAlive
}
}
// 調用 startExecutorsOnWorkers 方法
startExecutorsOnWorkers()
}
複製代碼
這個 schduler 方法會遍歷等待分配資源的 driver 隊列,爲每一個 driver 遍歷一遍 alive 的 worker,找到資源知足的 worker,而後調用 launchDriver 方法,將該 driver 在這個 worker 上啓動,移除掉等待隊列中當前 driver,而後調用 startExecutorsOnWorkers 啓動 executor。ui
這裏又有兩個方法,一個是 launchDriver 方法,一個是 startExecutorsOnWorkers 方法去啓動 executor,startExecutorsOnWorkers 這個方法放到下面文章裏講,這篇文章主要講 driver 註冊和啓動。this
這個方法主要是更新一些信息(worker 中的資源變動,worker 中啓動的 driver 信息記錄;driver 中添加上 worker 的信息),而後將向對應的 worker 發送 LaunchDriver 的消息。spa
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
// 這裏是將 workerInfo 中添加上啓動 driver 的信息,內部也會減去 driver 使用掉的資源
worker.addDriver(driver)
// 將 driver 啓動的 worker 信息記錄到 driver 中
driver.worker = Some(worker)
// 給 worker 發送 LaunchDriver 的信息
worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
// 標記當前 driver 狀態爲 running 狀態
driver.state = DriverState.RUNNING
}
複製代碼
經過把啓動的 driver 信息記錄到對應的 worker 信息中,再將對應的 worker 信息記錄到 driver 裏,而後給 worker 發送消息讓 worker 啓動 driver,標記當前的 driver 狀態爲 running。
這裏會給 worker 發送 LaunchDriver 的消息,下面去看下 worker 中是怎麼處理這個消息的。
private[deploy] class Worker(
override val rpcEnv: RpcEnv,
webUiPort: Int,
cores: Int,
memory: Int,
masterRpcAddresses: Array[RpcAddress],
endpointName: String,
workDirPath: String = null,
val conf: SparkConf,
val securityMgr: SecurityManager)
extends ThreadSafeRpcEndpoint with Logging
複製代碼
從繼承關係上能夠看出 worker 也是 RpcEndPoint,因此直接找到它的 receive 方法,而後根據模式匹配找到 LaunchDriver 這個匹配下看操做邏輯便可。
case LaunchDriver(driverId, driverDesc) =>
logInfo(s"Asked to launch driver $driverId")
// 將 driver 信息封裝到一個 runner 內
val driver = new DriverRunner(
conf,
driverId,
workDir,
sparkHome,
driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
self,
workerUri,
securityMgr)
// 而後將這個 runner 保存到一個 HashMap 中
drivers(driverId) = driver
// 啓動這個 runner
driver.start()
// 更新當前 worker 的資源信息
coresUsed += driverDesc.cores
memoryUsed += driverDesc.mem
複製代碼
這裏會將 driver 的信息封裝到一個 DriverRunner 裏面,而後再降這個 runner 保存到內存的一個 HashMap 中,而後開啓這個 ruuner,更新當前 worker 的資源信息。
到這裏咱們須要去看 DriverRunner 裏是怎麼操做的。
DriverRunner 是在 standalone cluster 部署模式下用來執行 driver 操做的,包括當 driver 掛掉以後的自動重啓。
前面調用的是 runner 的 start 方法,因此咱們直接看這個 start 方法:
private[worker] def start() = {
// 開一個線程
new Thread("DriverRunner for " + driverId) {
override def run() {
var shutdownHook: AnyRef = null
try {
shutdownHook = ShutdownHookManager.addShutdownHook { () =>
logInfo(s"Worker shutting down, killing driver $driverId")
kill()
}
// 準備 driver 的 jar 包而且執行 driver,並返回一個 exitCode
val exitCode = prepareAndRunDriver()
// 根據 exitCode 設置 finalState,一共有三種,分別爲:FINISHED,KILLED,FAILED
finalState = if (exitCode == 0) {
Some(DriverState.FINISHED)
} else if (killed) {
Some(DriverState.KILLED)
} else {
Some(DriverState.FAILED)
}
} catch {
case e: Exception =>
kill()
finalState = Some(DriverState.ERROR)
finalException = Some(e)
} finally {
if (shutdownHook != null) {
ShutdownHookManager.removeShutdownHook(shutdownHook)
}
}
// 而後將 driverId 和 driver 執行結果 finalState 以及一些異常信息發送給 worker
worker.send(DriverStateChanged(driverId, finalState.get, finalException))
}
}.start()
}
複製代碼
這裏主要是調用了一個 prepareAndRunDriver 這個方法,返回了一個結果碼,而後把結果碼轉換爲 finalState ,而後發送給 worker。
因此咱們直接去找 prepareAndRunDriver 這個方法。
private[worker] def prepareAndRunDriver(): Int = {
// 建立 driver 的工做目錄
val driverDir = createWorkingDirectory()
// 下載 driver 的 jar 包到工做目錄下
val localJarFilename = downloadUserJar(driverDir)
def substituteVariables(argument: String): String = argument match {
case "{{WORKER_URL}}" => workerUrl
case "{{USER_JAR}}" => localJarFilename
case other => other
}
// 建立 ProcessBuilder
val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)
runDriver(builder, driverDir, driverDesc.supervise)
}
複製代碼
這個方法主要作了這些事:建立 driver 的工做目錄,將 driver 的 jar 包下載到工做目錄下,而後建立 ProcessBuilder,傳入 driver 的執行命令,而後調用 runDriver 方法。
下面咱們看下 runDriver 方法。
private def runDriver(builder: ProcessBuilder, baseDir: File, supervise: Boolean): Int = {
builder.directory(baseDir)
// 初始化操做
def initialize(process: Process): Unit = {
// 建立 stout 文件
val stdout = new File(baseDir, "stdout")
// 將 process 的 InputStream 流重定向爲 stout 文件
CommandUtils.redirectStream(process.getInputStream, stdout)
// 建立 stderr 文件
val stderr = new File(baseDir, "stderr")
// 將 builder 命令格式化處理
val formattedCommand = builder.command.asScala.mkString("\"", "\" \"", "\"")
val header = "Launch Command: %s\n%s\n\n".format(formattedCommand, "=" * 40)
Files.append(header, stderr, StandardCharsets.UTF_8)
// 將 process 的 ErrStream 重定向到 stderr 文件
CommandUtils.redirectStream(process.getErrorStream, stderr)
}
// 調用 runCommandWithRetry
runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise)
}
複製代碼
該方法主要是定義了一個 initialize 方法,裏面會將傳入的 process 的輸入流和 err 流重定向到自定義的兩個文件中去,而後調用 runCommandWithRetry 這個方法。
看下 runCommandWithRetry 這個方法。
private[worker] def runCommandWithRetry( command: ProcessBuilderLike, initialize: Process => Unit, supervise: Boolean): Int = {
// 退出碼
var exitCode = -1
// 提交重試的燈帶時間
var waitSeconds = 1
// A run of this many seconds resets the exponential back-off.
val successfulRunDuration = 5
var keepTrying = !killed while (keepTrying) {
logInfo("Launch Command: " + command.command.mkString("\"", "\" \"", "\""))
synchronized {
// 若是被 kill 則返回 exitcode
if (killed) { return exitCode }
// 執行 command 命令,啓動 driver 進程
process = Some(command.start())
// 調用上面定義好的 initialize 方法,將一些流的輸出文件作重定向
initialize(process.get)
}
val processStart = clock.getTimeMillis()
exitCode = process.get.waitFor()
// check if attempting another run
keepTrying = supervise && exitCode != 0 && !killed if (keepTrying) {
if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) {
waitSeconds = 1
}
logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.")
sleeper.sleep(waitSeconds)
waitSeconds = waitSeconds * 2 // exponential back-off
}
}
exitCode
}
複製代碼
這裏是真正運行 driver 進程的地方,開啓 driver 進程後會使用上面 runDriver 中定義好的 initialize 方法去將 driver 進程中的一些流的輸出文件作重定向操做,並返回 exitcode。
至此,driver 就已經在 master 上註冊好了,而且 master 也分配合適的 worker 啓動了該 driver 進程。
咱們在 DriverRunner start 方法的最後會調用 worker.send(DriverStateChanged(driverId, finalState.get, finalException))
這個方法,給 worker 發送 driver 狀態變化的消息。
這裏咱們看下 worker 是怎麼處理的。
在 woker 的 receive 方法的模式匹配中是這麼操做的:
case driverStateChanged @ DriverStateChanged(driverId, state, exception) =>
handleDriverStateChanged(driverStateChanged)
複製代碼
會去調用 handleDriverStateChanged 這個方法。
咱們再看下 handleDriverStateChanged 這個方法:
private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {
val driverId = driverStateChanged.driverId
val exception = driverStateChanged.exception
val state = driverStateChanged.state
// 根據 state 作匹配打印日誌
state match {
case DriverState.ERROR =>
logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
case DriverState.FAILED =>
logWarning(s"Driver $driverId exited with failure")
case DriverState.FINISHED =>
logInfo(s"Driver $driverId exited successfully")
case DriverState.KILLED =>
logInfo(s"Driver $driverId was killed by user")
case _ =>
logDebug(s"Driver $driverId changed state to $state")
}
// 向 master 發送 driverStateChanged 消息
sendToMaster(driverStateChanged)
// 將該 driver 從 drivers 移除到 finishedDrivers 中去
val driver = drivers.remove(driverId).get finishedDrivers(driverId) = driver trimFinishedDriversIfNecessary() // 更新 worker 節點的資源狀況 memoryUsed -= driver.driverDesc.mem
coresUsed -= driver.driverDesc.cores
}
複製代碼
主要是作了這些事:根據發送過來的 state 作模式匹配,打印對應的 log。而後把這個 driverStateChanged 消息轉發給 master,最後再更新下當前 worker 的一些存儲數據。
最後在看下 master 收到這個 driverStateChanged 消息是怎麼處理的。
在其 recieve 方法中能夠找到匹配到 driverStageChanged 消息後的操做:
case DriverStateChanged(driverId, state, exception) =>
state match {
case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
// 調用 removeDriver 方法
removeDriver(driverId, state, exception)
case _ =>
throw new Exception(s"Received unexpected state update for driver $driverId: $state")
}
複製代碼
在這裏是調用了 removeDriver 方法,咱們下面就看下這個方法。
private def removeDriver( driverId: String, finalState: DriverState, exception: Option[Exception]) {
// 根據 driver id 進行模式匹配
drivers.find(d => d.id == driverId) match {
case Some(driver) =>
logInfo(s"Removing driver: $driverId")
// 從 drivers 集合中移除當前 driver
drivers -= driver if (completedDrivers.size >= RETAINED_DRIVERS) {
val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
completedDrivers.trimStart(toRemove)
}
// 將 driver 添加到 completedDrivers 中去
completedDrivers += driver
// 從持久化引擎中移除
persistenceEngine.removeDriver(driver)
// 更新 driver 的狀態和 exception 並從 driver 的 worker 中移除掉當前 driver
driver.state = finalState
driver.exception = exception
driver.worker.foreach(w => w.removeDriver(driver))
schedule()
case None =>
logWarning(s"Asked to remove unknown driver: $driverId")
}
}
複製代碼
這個方法主要是將 master 中資源作恢復操做,會根據當前退出的 driver 作模式匹配,找到這個 driver,而後將其從 drivers 的集合中移除,添加到 completedDrivers 中去,而後從持久化引擎中移除掉,更新 driver 的狀態,並從 driver 持有的 worker 中移除掉結束的這個 driver。而後再調用 schedule 方法,讓釋放資源從新調度。
至此,driver 的註冊,啓動,以及退出後資源回收,都結束了。