經過DeveloperApi獲取spark程序執行進度及異常

在應用spark時,常常要獲取任務的執行進度,能夠參照jobProgressListener的設計來完成該功能。前端

如下代碼僅供參考,歡迎交流。web

 

效果顯示:sql

代碼:apache

package org.apache.spark.zpc.listener

import org.apache.spark.Logging
import org.apache.spark.scheduler._

import scala.collection.mutable


/**
  * Spark 的 DeveloperApi 提供針對app, job, task的執行監聽。
  * 經過該監聽,能夠實現:
  * 1.任務執行進度的粗略計算。
  * 2.執行異常失敗時,獲取異常信息。
  * 3.獲取app啓動的appId,從而能夠控制殺死任務。
  * 4.自定義進度和異常的handle處理(如控制檯打印,保存db,或jms傳輸到web等終端
  *
  * @param jobNum Application中Job個數。能夠經過代碼的提交查看spark日誌查看到。
  */

abstract class SparkAppListener(jobNum: Int) extends SparkListener with Logging {

  //Job和Job信息(包括總task數,當前完成task數,當前Job百分比)的映射
  private val jobToJobInfo = new mutable.HashMap[Int, (Int, Int, Int)]
  //stageId和Job的映射,用戶獲取task對應的job
  private val stageToJob = new mutable.HashMap[Int, Int]
  //完成的job數量
  private var finishJobNum = 0
  private var hasException: Boolean = false

  override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = synchronized {
    val appId = applicationStart.appId
    //記錄app的Id,用於後續處理:
    //如:yarn application  -kill  appId
    //handleAppId(appId)
  }

  //獲取job的task數量,初始化job信息
  override def onJobStart(jobStart: SparkListenerJobStart) = synchronized {
    val jobId = jobStart.jobId
    val tasks = jobStart.stageInfos.map(stageInfo => stageInfo.numTasks).sum
    jobToJobInfo += (jobId ->(tasks, 0, 0))
    jobStart.stageIds.map(stageId => stageToJob(stageId) = jobId)
  }

  //task結束時,粗略估計當前app執行進度。
  //估算方法:當前完成task數量/總task數量。總完成task數量按(job總數*當前job的task數。)
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
    val stageId = taskEnd.stageId
    val jobId = stageToJob.get(stageId).get
    val (totalTaskNum: Int, finishTaskNum: Int, percent: Int) = jobToJobInfo.get(jobId).get
    val currentFinishTaskNum = finishTaskNum + 1
    val newPercent = currentFinishTaskNum * 100 / (totalTaskNum * jobNum)
    jobToJobInfo(jobId) = (totalTaskNum, currentFinishTaskNum, newPercent)

    if (newPercent > percent) {
      //hanlde application progress
      val totalPercent = jobToJobInfo.values.map(_._3).sum
      if (totalPercent <= 100){
//        handleAppProgress(totalPercent)
      }
    }
  }

  //job 結束,獲取job結束的狀態,異常結束能夠將異常的類型返回處理。
  // handle處理自定義,好比返回給web端,顯示異常log。
  override def onJobEnd(jobEnd: SparkListenerJobEnd) = synchronized {
    jobEnd.jobResult match {
      case JobSucceeded => finishJobNum += 1
      case JobFailed(exception) if !hasException =>
        hasException = true

        // handle application failure
//        handleAppFailure(exception)
      case _ =>
    }
  }

  //app結束時,將程序執行進度標記爲 100%。
  //缺陷:SparkListenerApplicationEnd沒有提供app的Exception的獲取。這樣,當程序在driver端出錯時,
  //獲取不到出錯的具體緣由返回給前端,自定義提示。好比(driver對app中的sql解析異常,尚未開始job的運行)

/*** driver 端異常可經過主程序代碼裏 try catch獲取到 ***/
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) = synchronized { val totalJobNum = jobToJobInfo.keySet.size val totalPercent = jobToJobInfo.values.map(_._3).sum //handle precision lose if (!hasException && totalPercent == 99) { // handleAppProgress(100) } val msg = "執行失敗" if(totalJobNum == 0){ handleAppFailure(new Exception(msg)) } } }

 

 

博客記錄是個好習慣,計劃一下之後幾期的博客。網絡

由淺入深,圍繞機器學習的主題,來學習介紹。app

 模型評估與選擇 
 線性模型
 決策樹
 神經網絡
 支持向量機
 貝葉斯分類器
  貝葉斯決策論 
  極大似然估計 
  樸素貝葉斯分類器 
  半樸素貝葉斯分類器 
  貝葉斯網 
 集成學習
  個體與集成
  Boosting
  Bagging與隨機森林
  Bagging
  隨機森林 
 聚類 
 降維與度量學習 
 特徵選擇與稀疏學習
 機率圖模型
   隱馬爾可夫模型
   馬爾可夫隨機場
 規則學習
 強化學習
 深度學習系列機器學習

相關文章
相關標籤/搜索