Aloha:一個分佈式任務調度框架

概覽

Aloha 是一個基於 Scala 實現的分佈式的任務調度和管理框架,提供插件式擴展功能,能夠用來調度各類類型的任務。Aloha 的典型的應用場景是做爲統一的任務管理入口。例如,在數據平臺上一般會運行各類類型的應用,如 Spark 任務,Flink 任務,ETL 任務等,統一對這些任務進行管理並及時感知任務狀態的變化是頗有必要的。git

Aloha 的基本實現是基於 Spark 的任務調度模塊,在 Master 和 Worker 組件的基礎上進行了修改,並提供了擴展接口,能夠方便地集成各類類型的任務。Master 支持高可用配置及狀態恢復,並提供了 REST 接口用於提交任務。github

擴展

不一樣類型應用程序

在 Aloha 中,調度的應用被抽象爲 Application 接口。只須要按需實現 Application 接口,就能夠對多種不一樣類型的應用進行調度管理。Application 的生命週期主要經過 start(), shutdown() 進行管理,當應用被調度到 worker 上執行時, start() 方法首先被調用,當用戶要求強制中止應用時,shutdown() 方法被調用。apache

trait Application {
  //啓動
  def start(): Promise[ExitState]
  //強制中止
  def shutdown(reason: Option[String]): Unit
  //提交應用時的描述
  def withDescription(desc: ApplicationDescription): Application
  //應用運行時的工做目錄
  def withApplicationDir(appDir: File): Application
  //系統配置
  def withAlohaConf(conf: AlohaConf): Application
  //應用運行結束後的清理動做
  def clean(): Unit
}
複製代碼

你可能注意到了,start() 方法的返回值是一個 Promise 對象。這是由於,Aloha 最初在設計時主要針對的是長期運行的應用程序,如 Flink 任務、Spark Streaming 任務等。對於這一類 long-running 的應用,Future 和 Promise 提供了一種更靈活的任務狀態通知機制。當任務中止後,經過調用 Promise.success() 方法告知 Worker。網絡

例如,若是要經過啓動一個獨立進程的方式來啓動一個應用程序,能夠這樣來實現:數據結構

override def start(): Promise[ExitState] = {
    //啓動進程
    val processBuilder = getProcessBuilder()
    process = processBuilder.start()
    stateMonitorThread = new Thread("app-state-monitor-thread") {
      override def run(): Unit = {
        val exitCode = process.waitFor()
        //進程退出
        if(exitCode == 0) {
          result.success(ExitState(ExitCode.SUCCESS, Some("success")))
        } else {
          result.success(ExitState(ExitCode.FAILED, Some("failed")))
        }
      }
    }
    stateMonitorThread.start()
    result
  }

  override def shutdown(reason: Option[String]): Unit = {
    if (process != null) {
      //強制結束進程
      val exitCode = Utils.terminateProcess(process, APP_TERMINATE_TIMEOUT_MS)
      if (exitCode.isEmpty) {
        logWarning("Failed to terminate process: " + process +
          ". This process will likely be orphaned.")
      }
    }
  }

複製代碼

自定義事件監聽

在不少狀況下,咱們但願可以實時感知到任務狀態的變化,例如在任務完成或者失敗時發送一條消息提醒。Aloha 提供了事件監聽接口,能夠及時對任務狀態的變化做出響應。架構

trait AlohaEventListener {
  def onApplicationStateChange(event: AppStateChangedEvent): Unit

  def onApplicationRelaunched(event: AppRelaunchedEvent): Unit

  def onOtherEvent(event: AlohaEvent): Unit
}
複製代碼

自定義實現的事件監聽器在 Aloha 啓動時動態註冊,也能夠同時註冊多個監聽器。app

模塊設計

整體架構

Aloha 的總體實現方案是建構在 Spark 的基礎之上,於是 Aloha 也是基於主從架構實現的,主要由 Master 和 Worker 這兩個主要組件構成:Master 負責管理集羣中全部的 Worker,接收用戶提交的應用,並將應用分派給不一樣的 Worker;而 Worker 主要是負責啓動、關閉具體的應用,對應用的生命週期進行管理等。Aloha 還提供了 REST 服務,實際上充當了 Client 的角色,方便經過 REST 接口提交應用。 框架

Aloha
Aloha 提供了 HA 配置,在 Master 發生故障時能夠自動進行故障轉移。同時啓動的多個 Master 實例,只有一個實例會處於 Alive 狀態,其他的處於 Standby 狀態。當本來處於 Alive 狀態的 Master 實例宕機後,LeaderElectionAgent 會從處於 Standby 狀態的 Master 中選舉出新的 Alive Master,並恢復故障以前的狀態。

任務調度管理

Worker 註冊

在 Master 啓動後,等待 Worker 的註冊請求。在 Worker 啓動時,根據 Master 的地址向 Master 發送註冊請求。因爲可能會有多個 Master 實例在運行,Worker 會全部的這些Master 都發送註冊請求,只有處於 Alive 狀態的 Master 會響應註冊成功的消息,處於Standby 狀態的 Master 會告知 Worker 本身正處於 Standby 狀態,Worker 會忽略這一類消息。Worker 會一直嘗試向 Master 發送註冊請求,直到接收到註冊成功的響應。在向 Master 發送註冊請求時,請求的消息中會包含當前 Worker 節點的計算資源信息,包括可用的 CPU 數量和內存大小,Master 在進行調度的時候會追蹤 Worker 的資源使用狀況。異步

一旦 Worker 註冊成功,就會週期性地向 Master 發送心跳信息。Master 則會按期檢查全部 Worker 的心跳狀況,一旦發現過久沒有接收到某一個 Worker 的心跳消息,則認爲該 Worker 已經下線。另外,網絡故障或者進程異常退出等狀況會形成 Master 和 Worker 之間創建的網絡鏈接斷開,鏈接斷開的事件能直接被 Master 和 Worker 監聽到。對 Master 而言,一旦一個 Worker 掉線,須要將該 Worker 上運行的應用置爲爲異常狀態,或是從新調度這些應用。對於 Worker 而言,一旦失去和 Master 創建的鏈接,就須要從新進入註冊流程。async

Application 提交

能夠經過兩種方式向 Master 提交 Application,一種方式是經過 REST 接口,另外一種方式是自行建立一個 Client,經過 Master 的地址向 Master 發送 RPC 調用。實際上 REST Server 充當了一個 Client 的角色。

當 Master 接收到註冊 Application 的請求時,會分配 applicationId,並將應用放到等待調度的列表中。在調度時,採用 FIFO 的方式,選取剩餘資源可以知足應用需求的 Worker,向對應的 Worker 發送啓動應用的消息,應用從 SUMITTED 狀態切換爲 LAUNCHING 狀態。Worker 在收到啓動的應用的請求後,會爲對應的應用建立工做目錄,併爲每個應用單獨啓動一個工做線程。應用成功啓動後會向 Master 發送應用狀態改變的消息,應用狀態切換爲 RUNNING 狀態。此後每當應用狀態發生改變,例如任務成功完成,或是異常退出,都會向 Master 發送應用狀態改變的消息。在應用啓動後,對於的工做線程會阻塞地等待應用結束。當 Master 接收到強制中止應用的請求後,會將消息轉發給對應的 Worker,Worker 在接收到消息後會中斷對應應用的工做線程,工做線程響應中斷,調用 Application 提供的強制關閉方法強行中止應用。

爲了支持擴展不一樣的應用,Worker 在啓動應用時使用了自定義的 ClassLoader 去加載應用提供的依賴包和配置文件路徑。目前須要預先在每一個 Worker 上放置好對應的文件,並在提交應用時指定路徑。後續能夠考慮使用一個分佈式文件系統,如 HDFS ,在啓動應用前下載對應的依賴,或者用戶提交應用時上傳依賴文件,以免預先放置文件的不便。因爲每一個應用的依賴文件都是單獨進行加載的,用戶能夠方便地對應用進行升級,同時也避免了不一樣 Application 出現依賴衝突的問題。

容錯機制

因爲 Master 負責對整個集羣的應用的調度狀況進行管理,一旦 Master 出現異常,則整個集羣就處於癱瘓的狀態,於是必需要考慮爲 Master 提供異常恢復機制。

Master 的異常恢復機制的核心流程在於狀態的恢復。Master 會將已經註冊的 Worker 和 Application的狀態信息持久化存儲在持久化引擎中(目前支持 FileSystem 和 ZooKeeper,支持擴展),每當 Worker 或者 Application 的狀態發生更改,都會更新存儲引擎中保存的狀態。當 Master 啓動時,處於 Standby 狀態。一旦 Master 被選舉爲 Alive 節點,首先要從存儲引擎中讀取 Worker 和 Application 的狀態信息,若是沒有歷史狀態,則 Master 能夠變動爲 Alive 狀態,不然進入恢復流程,狀態變動爲 RECOVERING。在恢復流程中,首先要檢查 Application 的狀態,若是 Application 尚未被調度到任何 Worker 上,則 Application 被放入調度隊列,不然將 Application 的狀態置爲 ApplicationState.UNKNOWN。隨後檢查全部 Worker 的狀態,將 Worker 置爲 WorkerState.UNKNOWN 狀態,並嘗試向 Worker 發送 MasterChange 的消息。在 Worker 接收到 MasterChange 的消息後,會向 Master 響應目前該 Worker 上運行的全部 Application 的狀態,Master 接收到響應後就能夠將對應的 Worker 和 Application 分別調整爲 WorkerState.ALIVEApplicationState.RUNNING。對於超時仍沒有獲得響應的 Worker 和 Application,則認爲已經掉線或異常退出。至此,狀態恢復完成,Master 進入 ALIVE 狀態,能夠正常處理 Worker 和 Application 的各類請求。

在使用 Standalone 模式時,可使用 FILESYSTEM 做爲存儲引擎,這種狀況下只有一個 Master 會運行,失敗後須要手動進行重啓,重啓後狀態能夠恢復。也能夠將 Master 配置爲 HA 模式,多個 Master 實例同時運行,使用 ZooKeeper 做爲 LeaderElectionAgent 和存儲引擎,當 Alive 狀態的 Master 失敗後會自動選舉出新的主節點,並自動進行狀態恢復。

事件總線

Master 在啓動時會建立一個事件總線,並註冊多個事件監聽器,事件監聽器能夠方便地進行擴展,從而知足不一樣的需求。事件總線的核心是一個異步的事件分發機制,基於阻塞隊列實現。當接收到新事件時,會將事件分派給事件監聽器處理。每當 Master 接收到 Application 狀態發生變動的消息時,就會將對應的事件放入事件總線,於是監聽器能夠及時獲取到任務狀態的變動事件。

RPC

RPC 概述

從上一節的介紹能夠看出,做爲一個分佈式的系統,Master 和 Worker 之間存在大量的通訊,這些不一樣的組件之間的通訊正是經過 RPC 來實現的。

在 Aloha 中,RPC 模塊不一樣於傳統的 RPC 框架,不須要預先使用 IDL (Interface Description Language) 來定義客戶端和服務端進行通訊的數據結構、服務端提供的服務等,而是直接基於 Scala 的模式匹配來完成消息的識別和路由。之因此這樣來實現,是由於在這裏 RPC 的主要定位是做爲內部組件之間通訊的橋樑,無需考慮跨語言等特性。基於 Scala 的模式匹配進行路由下降了代碼的複雜度,使用起來很是便捷。

咱們先看一個簡單的例子,來了解一下 RPC 的基本使用方法。其核心就在於 RpcEndpoint 的實現。

//------------------------ Server side ----------------------------
object HelloWorldServer {
  def main(args: Array[String]): Unit = {
    val host = "localhost"
    val rpcEnv: RpcEnv = RpcEnv.create("hello-server", host, 52345, new AlohaConf())
    val helloEndpoint: RpcEndpoint = new HelloEndpoint(rpcEnv)
    rpcEnv.setupEndpoint("hello-service", helloEndpoint)
    rpcEnv.awaitTermination()
  }
}

class HelloEndpoint(override val rpcEnv: RpcEnv) extends RpcEndpoint {
  override def onStart(): Unit = {
    println("Service started.")
  }

  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case SayHi(msg) =>
      context.reply(s"Aloha: $msg")
    case SayBye(msg) =>
      context.reply(s"Bye :), $msg")
  }

  override def onStop(): Unit = {
    println("Stop hello endpoint")
  }
}

case class SayHi(msg: String)

case class SayBye(msg: String)

//--------------------------- Client side -------------------------------
object HelloWorldClient {
  def main(args: Array[String]): Unit = {
    val host = "localhost"
    val rpcEnv: RpcEnv = RpcEnv.create("hello-client", host, 52345, new AlohaConf, true)
    val endPointRef: RpcEndpointRef = rpcEnv.retrieveEndpointRef(RpcAddress("localhost", 52345), "hello-service")
    val future: Future[String] = endPointRef.ask[String](SayHi("WALL-E"))
    future.onComplete {
      case Success(value) => println(s"Got response: $value")
      case Failure(e) => println(s"Got error: $e")
    }
    Await.result(future, Duration.apply("30s"))
  }
}
複製代碼

RpcEndpoint、 RpcEndpointRef 和 RpcEnv

從上面的例子很容易觀察到,RpcEndpointRpcEndpointRefRpcEnv 是使用這個 RPC 框架的關鍵。若是你剛好知道一點 Actor 模型和 Akka 的基本概念,很容易就能把這三個抽象同 Akka 中的 Actor, ActorRefActorSystem 聯繫起來。事實上,Spark 內部的 RPC 最初正是基於 Akka 來實現的,後來雖然剝離了 Akka,但基本的設計理念卻保留了下來。

簡單地來講,RpcEndpoint 是一個可以接收消息並做出響應的服務。Master 和 Worker 實際上都是 RpcEndpoint

RpcEndpoint 對接收的消息有兩種方式,分別對應須要做出應答和不須要做出應答,即:

def receive: PartialFunction[Any, Unit] = {
    case _ => throw new AlohaException(self + " does not implement 'receive'")
  }

  def receiveAndReply(context: RpcCallContext): PartialFunction[Any,Unit] = {
    case _ => context.sendFailure(new AlohaException(self + " won't reply anything"))
  }
複製代碼

其中,RpcCallContext 用於向消息發送方做出應答,包括回覆正常的響應以及錯誤的異常。經過 RpcCallContext 將業務邏輯和數據傳輸進行了解耦,服務方無需知道請求的發送方是來自本地仍是來自遠端。

RpcEndpoint 還包含了一系列生命週期相關的回調方法,如 onStart, onStop, onError, onConnected, onDisconnected, onNetworkError

RpcEndpointRef 是對 RpcEndpoint 的引用,它是服務調用方發送請求的入口。經過獲取 RpcEndpoint 對應的 RpcEndpointRef,就能夠直接向 RpcEndpoint 發送請求。不管 RpcEndpoint 是在本地仍是在遠端,向 RpcEndpoint 發送消息的方法都是一致的。這也正是 RPC 存在的意義,即:執行一個遠程服務提供的方法,就如同調用本地方法同樣。

RpcEndpointRef 提供了以下幾種請求的發送方式:

//Sends a one-way asynchronous message. Fire-and-forget semantics.
  def send(message: Any): Unit

  // Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to receive the reply within the specified timeout.
  def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]

  def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)

  def askSync[T: ClassTag](message: Any):T = askSync(message, defaultAskTimeout)

  //Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a specified timeout, throw an exception if this fails.
  def askSync[T: ClassTag](message: Any,timeout: RpcTimeout):T = {
    val future = ask[T](message, timeout)
    timeout.awaitResult(future)
  }
複製代碼

RpcEnvRpcEndpoint 的運行時環境。一方面,它負責 RpcEndpoint 的註冊,RpcEndpoint 生命週期的管理,以及根據 RpcEndpoint 的地址來獲取對應 RpcEndpointRef;另外一方面,它還負責請求的進一步封裝,底層數據的網絡傳輸,消息的路由等。

RpcEnv 有兩種模式,一種是 Server 模式,一種是 Client 模式。在 Server 模式下,能夠向RpcEnv 註冊 RpcEndpoint,而且會註冊一個特殊的 Endpoint,即 RpcEndpointVerifier,在獲取 RpcEndpointRef 時,會經過 RpcEndpointVerifier 驗證對應的 RpcEndpoint 是否存在。

RpcEnv 經過工廠模式來建立,底層具體的實現方案是可替換的,目前使用的是基於 Netty 實現的 NettyRpcEnv

Dispatcher、Inbox 和 Outbox

NettyRpcEnv 內部,爲了高效進行消息的路由與傳遞,使用了一種相似於 mailbox 的設計。

對於每個 RpcEndpoint,都有一個關聯的 InboxInbox 內部有一個消息列表,這個消息列表中保存了這個 RpcEndpoint 收到的全部消息,包括須要應答的 RpcMessage,無需應答的 OneWayMessage, 以及各類和生命週期相關的狀態消息,對於每一條消息,都會調用對應在 RpcEndpoint 內部定義的各類函數進行處理。而 Dispatcher 則充當了消息投遞的角色。對於 NettyRpcEnv 接收到的全部消息, Dispatcher 都會根據指定的 Endpoint 標識找到對應的 Inbox,並將消息投遞進去。此外,Dispatcher 內部啓動了一個 MessageLoop,這個 MessaLoop 不斷從阻塞隊列中獲取有新消息到達的 Endpoint,不斷地消化新到達的這些消息。

Inbox 遙相呼應的是,在 NettyRpcEnv 內部維護了 RpcAddressOutbox 的映射關係,每一個遠程 Endpoint 都對應一個 Outbox 。在經過 RpcEndpointRef 發送消息時, NettyRpcEnv 會根據 RpcEndpoint 的地址進行判斷:若是是本地的 Endpoint, 則直接經過 Dispatcher進行消息投遞;若是是遠端的 Endpoint, 則將消息投遞到對應的 Outbox 中。 Outbox 中也有一個待投遞的消息列表,在首次向遠端 Endpoint 投遞消息時,會先創建網絡鏈接,而後依次將消息發送出去。

網絡傳輸

NettyRpcEnv 中,如何將請求發送給遠端的 Endpoint,並收到遠端 Endpoint 給出的回覆,這就要要依賴於更底層的網絡傳輸模塊。網絡傳輸模塊,主要是對 Netty 的更進一步封裝,其中關鍵的組件及功能以下:

  • TransportServer: 網絡傳輸的服務端,當 NettyRpcEnv 以 Server 模式啓動時就會建立一個 TransportServer,等待客戶端的鏈接請求
  • TransportClient:網絡傳輸的客戶端,實際上就是對 channel 的進一步封裝,一旦網絡雙方的請求創建成功,那麼在 channel 的兩端就各有一個 TransportClient,從而能夠以全雙工的方式進行數據交換
  • TransportClientFactory:建立 TransportClient 的工廠類,內部使用了鏈接池,能夠複用已經創建的鏈接
  • RpcHandler:負責對接收到的 RPC 請求消息進行處理,NettyRpcEnv 就是在這個接口的方法中將消息交給 Dispatcher 進行投遞
  • RpcResponseCallback:RPC 請求響應的回調接口,NettyRpcEnv 基於這個接口對接收到的數據進行反序列化
  • TransportRequestHandler:對請求消息進行處理,主要是將消息轉交給 RpcHandler 進行處理
  • TransportResponseHandler:對響應消息進行處理,記錄了每一條已發送的消息和與其關聯的 RpcResponseCallback,一旦收到響應,就調用對應的回調方法
  • TransportChannelHandler:位於 channel pipeline 的尾端,根據消息類型將消息交給 TransportRequestHandlerTransportResponseHandler 進行處理
  • TransportContext:用於建立 TransportServerTransportClientFactory,並初始化 Netty Channel 的 pipeline

其餘的諸如引導服務端、引導客戶端、消息的編解碼等過程,都是使用 Netty 進行網絡通訊的慣常流程,這裏再也不詳述。

小結

Aloha 是一個分佈式調度框架 Aloha ,它的實現主要參考了 Spark。文中首先介紹了 Aloha 的使用場景和擴展方式,並採用自頂向下的方式重點介紹了 Aloha 的模塊設計和實現方案。

Aloha 現已在 Github 開源,項目地址: github.com/jrthe42/alo… 。有關該項目的任何問題,歡迎各位經過 issue 進行交流。

-EOF-

原文地址: blog.jrwang.me/2019/aloha-… 轉載請註明出處!

相關文章
相關標籤/搜索