折騰了好久,終於開始學習 Spark 的源碼了,第一篇我打算講一下 Spark 做業的提交過程。html
這個是 Spark 的 App 運行圖,它經過一個 Driver 來和集羣通訊,集羣負責做業的分配。今天我要講的是如何建立這個 Driver Program 的過程。web
咱們先看一下用 Spark Submit 提交的方法吧,下面是從官方上面摘抄的內容。apache
# Run on a Spark standalone cluster
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 \
--executor-memory 20G \
--total-executor-cores 100 \
/path/to/examples.jar \
1000
複製代碼
這個是提交到 standalone 集羣的方式,打開 spark-submit 這文件,咱們會發現它最後是調用了org.apache.spark.deploy.SparkSubmit 這個類。app
咱們直接進去看就好了,main 函數就幾行代碼,太節省了。框架
def main(args: Array[String]) {
val appArgs = new SparkSubmitArguments(args)
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
launch(childArgs, classpath, sysProps, mainClass, appArgs.verbose)
}
複製代碼
咱們主要看看 createLaunchEnv 方法就能夠了,launch 是反射調用 mainClass,精華全在 createLaunchEnv 裏面了。dom
在裏面我發現一些有用的信息,可能在官方文檔上面都沒有的,發出來你們瞅瞅。前面不帶 -- 的能夠在 spark-defaults.conf 裏面設置,帶 -- 的直接在提交的時候指定,具體含義你們一看就懂。tcp
val options = List[OptionAssigner](
OptionAssigner(args.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, false, sysProp = "spark.app.name"),
OptionAssigner(args.name, YARN, true, clOption = "--name", sysProp = "spark.app.name"),
OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, true,
sysProp = "spark.driver.extraClassPath"),
OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, true,
sysProp = "spark.driver.extraJavaOptions"),
OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, true,
sysProp = "spark.driver.extraLibraryPath"),
OptionAssigner(args.driverMemory, YARN, true, clOption = "--driver-memory"),
OptionAssigner(args.driverMemory, STANDALONE, true, clOption = "--memory"),
OptionAssigner(args.driverCores, STANDALONE, true, clOption = "--cores"),
OptionAssigner(args.queue, YARN, true, clOption = "--queue"),
OptionAssigner(args.queue, YARN, false, sysProp = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, true, clOption = "--num-executors"),
OptionAssigner(args.numExecutors, YARN, false, sysProp = "spark.executor.instances"),
OptionAssigner(args.executorMemory, YARN, true, clOption = "--executor-memory"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, false,
sysProp = "spark.executor.memory"),
OptionAssigner(args.executorCores, YARN, true, clOption = "--executor-cores"),
OptionAssigner(args.executorCores, YARN, false, sysProp = "spark.executor.cores"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, false,
sysProp = "spark.cores.max"),
OptionAssigner(args.files, YARN, false, sysProp = "spark.yarn.dist.files"),
OptionAssigner(args.files, YARN, true, clOption = "--files"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
OptionAssigner(args.archives, YARN, false, sysProp = "spark.yarn.dist.archives"),
OptionAssigner(args.archives, YARN, true, clOption = "--archives"),
OptionAssigner(args.jars, YARN, true, clOption = "--addJars"),
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, false, sysProp = "spark.jars")
)
複製代碼
Driver 程序的部署模式有兩種,client 和 cluster,默認是 client。client 的話默認就是直接在本地運行了 Driver 程序了,cluster 模式還會兜一圈把做業發到集羣上面去運行。分佈式
指定部署模式須要用參數 --deploy-mode 來指定,或者在環境變量當中添加 DEPLOY_MODE 變量來指定。ide
下面講的是 cluster 的部署方式,兜一圈的這種狀況。函數
yarn 模式的話 mainClass 是 org.apache.spark.deploy.yarn.Client,standalone 的 mainClass 是org.apache.spark.deploy.Client。
此次咱們講 org.apache.spark.deploy.Client,yarn 的話單獨找一章出來單獨講,目前超哥仍是推薦使用 standalone 的方式部署 spark,具體緣由不詳,聽說是由於資源調度方面的問題。
說個快捷鍵吧,Ctrl+Shift+N,而後輸入 Client 就能找到這個類,這是 IDEA 的快捷鍵,至關好使。
咱們直接找到它的 main 函數,發現了它竟然使用了 Akka 框架,我百度了一下,被它震驚了。
在 main 函數裏面,主要代碼就這麼三行。
//建立一個ActorSystem
val (actorSystem, _) = AkkaUtils.createActorSystem("driverClient",Utils.localHostName(),0,
  conf, new SecurityManager(conf))
//執行ClientActor的preStart方法和receive方法
actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
//等待運行結束
actorSystem.awaitTermination()
複製代碼
看了這裏真的有點兒懵啊,這是啥玩意兒,不懂的朋友們,請點擊這裏 Akka。下面是它官方放出來的例子:
//定義一個case class用來傳遞參數
case class Greeting(who: String)
//定義Actor,比較重要的一個方法是receive方法,用來接收信息的
class GreetingActor extends Actor with ActorLogging {
def receive = {
case Greeting(who) ⇒ log.info("Hello " + who)
}
}
//建立一個ActorSystem
val system = ActorSystem("MySystem")
//給ActorSystem設置Actor
val greeter = system.actorOf(Props[GreetingActor], name = "greeter")
//向greeter發送信息,用Greeting來傳遞
greeter ! Greeting("Charlie Parker")
複製代碼
簡直是無比強大啊,就這麼幾行代碼就搞定了,接下來看你會更加震驚的。
咱們回到 Client 類當中,找到 ClientActor,它有兩個方法,是以前說的 preStart 和 receive 方法,preStart 方法用於鏈接 master 提交做業請求,receive 方法用於接收從 master 返回的反饋信息。
咱們先看 preStart 方法吧。
override def preStart() = {
// 這裏須要把master的地址轉換成akka的地址,而後經過這個akka地址得到指定的actor
// 它的格式是"akka.tcp://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master))
// 把自身設置成遠程生命週期的事件
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
driverArgs.cmd match {
case "launch" =>
// 此處省略100個字
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
// 此處省略100個字
// 向master發送提交Driver的請求,把driverDescription傳過去,RequestSubmitDriver前面說過了,是個case class
masterActor ! RequestSubmitDriver(driverDescription)
case "kill" =>
val driverId = driverArgs.driverId
val killFuture = masterActor ! RequestKillDriver(driverId)
}
}
複製代碼
從上面的代碼看得出來,它須要設置 master 的鏈接地址,最後提交了一個 RequestSubmitDriver 的信息。在 receive 方法裏面,就是等待接受迴應了,有兩個 Response 分別對應着這裏的 launch 和 kill。
線索貌似到這裏就斷了,那下一步在哪裏了呢?固然是在 Master 裏面啦,怎麼知道的,猜的,哈哈。
Master 也是繼承了 Actor,在它的 main 函數裏面找到了如下代碼:
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf,
  securityManager = securityMgr)
val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort, securityMgr), actorName)
val timeout = AkkaUtils.askTimeout(conf)
val respFuture = actor.ask(RequestWebUIPort)(timeout)
val resp = Await.result(respFuture, timeout).asInstanceOf[WebUIPortResponse]
複製代碼
和前面的 actor 基本一致,多了 actor.ask 這句話,查了一下官網的文檔,這句話的意思的發送消息,而且接受一個 Future 做爲 response,和前面的 actor ! message 的區別就是它還接受返回值。
具體的 Akka 的用法,你們仍是參照官網吧,Akka 確實如它官網所言的那樣子,是一個簡單、強大、並行的分佈式框架。
小結:
Akka 的使用確實簡單,短短的幾行代碼即刻完成一個通訊功能,比 Socket 簡單不少。可是它也逃不脫咱們常說的那些東西,請求、接收請求、傳遞的消息、註冊的地址和端口這些概念。
咱們接下來查找 Master 的 receive 方法吧,Master 是做爲接收方的,而不是主動請求,這點和 hadoop 是一致的。
case RequestSubmitDriver(description) => {
val driver = createDriver(description)
persistenceEngine.addDriver(driver)
waitingDrivers += driver
drivers.add(driver)
// 調度
schedule()
// 告訴client,提交成功了,把driver.id告訴它
sender ! SubmitDriverResponse(true, Some(driver.id), s"Driver successfully submitted as ${driver.id}")
}
複製代碼
這裏咱們主要看 schedule 方法就能夠了,它是執行調度的方法。
private def schedule() {
if (state != RecoveryState.ALIVE) { return }
// 首先調度Driver程序,從workers裏面隨機抽一些出來
val shuffledWorkers = Random.shuffle(workers)
for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
for (driver <- waitingDrivers) {
// 判斷內存和cpu夠不夠,夠的就執行了哈
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
}
}
}
// 這裏是按照先進先出的,spreadOutApps是由spark.deploy.spreadOut參數來決定的,默認是true
if (spreadOutApps) {
// 遍歷一下app
for (app <- waitingApps if app.coresLeft > 0) {
// canUse裏面判斷了worker的內存是否夠用,而且該worker是否已經包含了該app的Executor
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(canUse(app, _)).sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable)
// 記錄每一個節點的核心數
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
var pos = 0
// 遍歷直到分配結束
while (toAssign > 0) {
// 從0開始遍歷可用的work,若是可用的cpu減去已經分配的>0,就能夠分配給它
if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
toAssign -= 1
// 這個位置的work的可分配的cpu數+1
assigned(pos) += 1
}
pos = (pos + 1) % numUsable
}
// 給剛纔標記的worker分配任務
for (pos <- 0 until numUsable) {
if (assigned(pos) > 0) {
val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
launchExecutor(usableWorkers(pos), exec)
app.state = ApplicationState.RUNNING
}
}
}
} else {
// 這種方式和上面的方式的區別是,這種方式儘量用少許的節點來完成這個任務
for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
for (app <- waitingApps if app.coresLeft > 0) {
// 判斷條件是worker的內存比app須要的內存多
if (canUse(app, worker)) {
val coresToUse = math.min(worker.coresFree, app.coresLeft)
if (coresToUse > 0) {
val exec = app.addExecutor(worker, coresToUse)
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
}
}
}
}
}
複製代碼
它的調度器是這樣的,先調度 Driver 程序,而後再調度 App,調度 App 的方式是從各個 worker 的裏面和 App 進行匹配,看須要分配多少個 cpu。
那咱們接下來看兩個方法 launchDriver 和 launchExecutor 便可。
def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
worker.addDriver(driver)
driver.worker = Some(worker)
worker.actor ! LaunchDriver(driver.id, driver.desc)
driver.state = DriverState.RUNNING
}
複製代碼
給 worker 發送了一個 LaunchDriver 的消息,下面在看 launchExecutor 的方法。
def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
worker.actor ! LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
exec.application.driver ! ExecutorAdded(
exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
}
複製代碼
它要作的事情多一點,除了給 worker 發送 LaunchExecutor 指令外,還須要給 driver 發送 ExecutorAdded 的消息,說你的任務已經有人幹了。
在繼續 Worker 講以前,咱們先看看它是怎麼註冊進來的,每一個 Worker 啓動以後,會自動去請求 Master 去註冊本身,具體咱們能夠看 receive 的方法裏面的 RegisterWorker 這一段,它須要上報本身的內存、Cpu、地址、端口等信息,註冊成功以後返回 RegisteredWorker 信息給它,說已經註冊成功了。
一樣的,咱們到 Worker 裏面在 receive 方法找 LaunchDriver 和 LaunchExecutor 就能夠找到咱們要的東西。
case LaunchDriver(driverId, driverDesc) => {
logInfo(s"Asked to launch driver $driverId")
val driver = new DriverRunner(driverId, workDir, sparkHome, driverDesc, self, akkaUrl)
drivers(driverId) = driver
driver.start()
coresUsed += driverDesc.cores
memoryUsed += driverDesc.mem
}
複製代碼
看一下 start 方法吧,start 方法裏面,實際上是 new Thread().start(),run 方法裏面是經過傳過來的 DriverDescription 構造的一個命令,丟給 ProcessBuilder 去執行命令,結束以後調用。
worker !DriverStateChanged 通知 worker,worker 再經過 master ! DriverStateChanged 通知 master,釋放掉 worker 的 cpu 和內存。
同理,LaunchExecutor 執行完畢了,經過 worker ! ExecutorStateChanged 通知 worker,而後 worker 經過 master ! ExecutorStateChanged 通知 master,釋放掉 worker 的 cpu 和內存。
下面咱們再梳理一下這個過程,只包括 Driver 註冊,Driver 運行以後的過程在以後的文章再說,比較複雜。
一、Client 經過得到 Url 地址得到 ActorSelection(master 的 actor 引用), 而後經過 ActorSelection 給 Master 發送註冊 Driver 請求(RequestSubmitDriver)
二、Master 接收到請求以後就開始調度了,從 workers 列表裏面找出能夠用的 Worker
三、經過 Worker 的 actor 引用 ActorRef 給可用的 Worker 發送啓動 Driver 請求(LaunchDriver)
四、調度完畢以後,給 Client 回覆註冊成功消息 (SubmitDriverResponse)
五、Worker 接收到 LaunchDriver 請求以後,經過傳過來的 DriverDescription 的信息構造出命令來,經過 ProcessBuilder 執行
六、ProcessBuilder 執行完命令以後,經過 DriverStateChanged 經過 Worker
七、Worker 最後把 DriverStateChanged 彙報給 Master
後記:聽超哥說,org.apache.spark.deploy.Client 這個類快要被刪除了,不知道 cluster 的這種模式是否是也被放棄了,官方給出來的例子推薦的是 client 模式 -> 直接運行程序。難怪在做業調度的時候,看到別的 actor 叫 driverActor。
不過這篇文章還有存在的意義, Akka 和調度這塊,和我如今正在寫的第三篇以及第四篇關係很密切。