本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark源碼解讀及商業實戰指導,請持續關注本套博客。版權聲明:本套Spark源碼解讀及商業實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。node
選擇故障恢復機制,主要有ZOOKEEPER 和 FILESYSTEM 。算法
private val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")
複製代碼
PersistenceEngine 的初始化是放在Master的onStart()方法中,用於初始化持久化引擎。緩存
val serializer = new JavaSerializer(conf)架構
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory =
new ZooKeeperRecoveryModeFactory(conf, serializer)
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
case "FILESYSTEM" =>
val fsFactory =
new FileSystemRecoveryModeFactory(conf, serializer)
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
.newInstance(conf, serializer)
.asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
persistenceEngine = persistenceEngine_
leaderElectionAgent = leaderElectionAgent_
複製代碼
(1)PersistenceEngine 的調用時機:app
舉例以下:框架
persistenceEngine.removeWorker(worker)
複製代碼
abstract class PersistenceEngine {
/**
* Defines how the object is serialized and persisted. Implementation will
* depend on the store used.
*/
def persist(name: String, obj: Object): Unit
/**
* Defines how the object referred by its name is removed from the store.
*/
def unpersist(name: String): Unit
/**
* Gives all objects, matching a prefix. This defines how objects are
* read/deserialized back.
*/
def read[T: ClassTag](prefix: String): Seq[T]
final def addApplication(app: ApplicationInfo): Unit = {
persist("app_" + app.id, app)
}
final def removeApplication(app: ApplicationInfo): Unit = {
unpersist("app_" + app.id)
}
final def addWorker(worker: WorkerInfo): Unit = {
persist("worker_" + worker.id, worker)
}
final def removeWorker(worker: WorkerInfo): Unit = {
unpersist("worker_" + worker.id)
}
final def addDriver(driver: DriverInfo): Unit = {
persist("driver_" + driver.id, driver)
}
final def removeDriver(driver: DriverInfo): Unit = {
unpersist("driver_" + driver.id)
}
/**
* Returns the persisted data sorted by their respective ids (which implies that they're
* sorted by time of creation).
*/
final def readPersistedData(
rpcEnv: RpcEnv): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = {
rpcEnv.deserialize { () =>
(read[ApplicationInfo]("app_"), read[DriverInfo]("driver_"), read[WorkerInfo]("worker_"))
}
}
def close() {}
}
複製代碼
基於文件系統持久化FileSystemPersistenceEngineide
private def serializeIntoFile(file: File, value: AnyRef) {
val created = file.createNewFile()
if (!created) { throw new IllegalStateException("Could not create file: " + file) }
val fileOut = new FileOutputStream(file)
var out: SerializationStream = null
Utils.tryWithSafeFinally {
out = serializer.newInstance().serializeStream(fileOut)
out.writeObject(value)
} {
fileOut.close()
if (out != null) {
out.close()
}
}
}
複製代碼
基於Zookeeper的持久化ZooKeeperPersistenceEngineoop
Curator是Netflix公司開源的Zookeeper客戶端,注意這裏會把ApplicationInfo,WorkerInfo,DriverInfo等數據經過ZooKeeperPersistenceEngine將數據存儲到Zookeeper的不一樣Znode節點上。post
這裏Zookeeper能撐得住嗎??疑問學習
private val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
private val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)
private def serializeIntoFile(path: String, value: AnyRef) {
val serialized = serializer.newInstance().serialize(value)
val bytes = new Array[Byte](serialized.remaining())
serialized.get(bytes)
zk.create().withMode(CreateMode.PERSISTENT).forPath(path, bytes)
}
複製代碼
所謂選舉機制就是註冊監聽機制,一旦監聽到Master掛了,就會進行回調監聽。
主要有:
接下來主要以ZooKeeperLeaderElectionAgent爲例:
經過/leader_election這個目錄進行監聽:
val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
private def start() {
logInfo("Starting ZooKeeper LeaderElection agent")
zk = SparkCuratorUtil.newClient(conf)
leaderLatch = new LeaderLatch(zk, WORKING_DIR)
leaderLatch.addListener(this)
leaderLatch.start()
}
private def updateLeadershipStatus(isLeader: Boolean) {
if (isLeader && status == LeadershipStatus.NOT_LEADER) {
status = LeadershipStatus.LEADER
masterInstance.electedLeader()
} else if (!isLeader && status == LeadershipStatus.LEADER) {
status = LeadershipStatus.NOT_LEADER
masterInstance.revokedLeadership()
}
}
複製代碼
經過監聽/leader_election對應目錄來進行選舉
override def isLeader() {
synchronized {
// could have lost leadership by now.
if (!leaderLatch.hasLeadership) {
return
}
logInfo("We have gained leadership")
updateLeadershipStatus(true)
}
}
override def notLeader() {
synchronized {
// could have gained leadership by now.
if (leaderLatch.hasLeadership) {
return
}
logInfo("We have lost leadership")
updateLeadershipStatus(false)
}
}
複製代碼
Master本身給本身發送消息,開始進行恢復操做:
Master繼承了LeaderElectable,所以實現了electedLeader方法:
override def electedLeader() {
self.send(ElectedLeader)
}
複製代碼
Master 的行動beginRecovery和CompleteRecovery
override def receive: PartialFunction[Any, Unit] = {
case ElectedLeader =>
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
RecoveryState.ALIVE
} else {
RecoveryState.RECOVERING
}
logInfo("I have been elected leader! New state: " + state)
if (state == RecoveryState.RECOVERING) {
beginRecovery(storedApps, storedDrivers, storedWorkers) <=神來之筆
recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CompleteRecovery) <=神來之筆
}
}, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
}
複製代碼
Master 的行動beginRecovery
private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
storedWorkers: Seq[WorkerInfo]) {
for (app <- storedApps) {
logInfo("Trying to recover app: " + app.id)
try {
registerApplication(app)
app.state = ApplicationState.UNKNOWN
app.driver.send(MasterChanged(self, masterWebUiUrl))
} catch {
case e: Exception => logInfo("App " + app.id + " had exception on reconnect")
}
}
複製代碼
Master 的行動completeRecovery
private def completeRecovery() {
// Ensure "only-once" recovery semantics using a short synchronization period.
if (state != RecoveryState.RECOVERING) { return }
state = RecoveryState.COMPLETING_RECOVERY
// Kill off any workers and apps that didn't respond to us.
workers.filter(_.state == WorkerState.UNKNOWN).foreach(
removeWorker(_, "Not responding for recovery"))
apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
// Update the state of recovered apps to RUNNING
apps.filter(_.state == ApplicationState.WAITING).foreach(_.state = ApplicationState.RUNNING)
// Reschedule drivers which were not claimed by any workers
drivers.filter(_.worker.isEmpty).foreach { d =>
logWarning(s"Driver ${d.id} was not found after master recovery")
if (d.desc.supervise) {
logWarning(s"Re-launching ${d.id}")
relaunchDriver(d)
} else {
removeDriver(d.id, DriverState.ERROR, None)
logWarning(s"Did not re-launch ${d.id} because it was not supervised")
}
}
複製代碼
秦凱新 於深圳