第13課:Spark Streaming源碼解讀之Driver容錯安全性

本期內容:安全

1,ReceivedBlockTracker容錯安全性數據結構

2,DStreamGraph和JobGenerator容錯安全性less

從數據層面,ReceivedBlockTracker爲整個Spark Streaming應用程序記錄元數據信息。ide

從調度層面,DStreamGraph和JobGenerator是Spark Streaming調度的核心,記錄當前調度到哪一進度,和業務有關。源碼分析

ReceivedBlockTracker在接收到元數據信息後調用addBlock方法,先寫入磁盤中,而後在寫入內存中。this

根據batchTime分配屬於當前BatchDuration要處理的數據到timToAllocatedBlocks數據結構中。日誌

Time類的是一個case class,記錄時間,重載了操做符,隱式轉換,值得借鑑。對象

case class Time(private val millis: Long) {
  def milliseconds: Long = millis
  def < (that: Time): Boolean = (this.millis < that.millis)
  def <= (that: Time): Boolean = (this.millis <= that.millis)
  def > (that: Time): Boolean = (this.millis > that.millis)
  def >= (that: Time): Boolean = (this.millis >= that.millis)
  def + (that: Duration): Time = new Time(millis + that.milliseconds)
  def - (that: Time): Duration = new Duration(millis - that.millis)
  def - (that: Duration): Time = new Time(millis - that.milliseconds)
  // Java-friendlier versions of the above.
  def less(that: Time): Boolean = this < that
  def lessEq(that: Time): Boolean = this <= that
  def greater(that: Time): Boolean = this > that
  def greaterEq(that: Time): Boolean = this >= that
  def plus(that: Duration): Time = this + that
  def minus(that: Time): Duration = this - that
  def minus(that: Duration): Time = this - that
  def floor(that: Duration): Time = {
    val t = that.milliseconds
    new Time((this.millis / t) * t)
  }
  def floor(that: Duration, zeroTime: Time): Time = {
    val t = that.milliseconds
    new Time(((this.millis - zeroTime.milliseconds) / t) * t + zeroTime.milliseconds)
  }
  def isMultipleOf(that: Duration): Boolean =
    (this.millis % that.milliseconds == 0)
  def min(that: Time): Time = if (this < that) this else that
  def max(that: Time): Time = if (this > that) this else that
  def until(that: Time, interval: Duration): Seq[Time] = {
    (this.milliseconds) until (that.milliseconds) by (interval.milliseconds) map (new Time(_))
  }
  def to(that: Time, interval: Duration): Seq[Time] = {
    (this.milliseconds) to (that.milliseconds) by (interval.milliseconds) map (new Time(_))
  }
  override def toString: String = (millis.toString + " ms")
}
object Time {
  implicit val ordering = Ordering.by((time: Time) => time.millis)
}ip

跟蹤Time對象,ReceiverTracker的allocateBlocksToBatch方法中的入參batchTime是被JobGenerator的generateJobs方法調用的。內存

JobGenerator的generateJobs方法是被定時器發送GenerateJobs消息調用的。

GenerateJobs中的時間參數就是nextTime,而nextTime+=period,這個period就是ssc.graph.batchDuration.milliseconds。

nextTime的初始值是在start方法中傳入的startTime賦值的,即RecurringTimer的getStartTime方法的返回值,是當前時間period的(整數倍+1)。

Period這個值是咱們調用new StreamingContext來構造StreamingContext時傳入的Duration值。

ReceivedBlockTracker會清除過時的元數據信息,從HashMap中移除,也是先寫入磁盤,而後在寫入內存。

元數據的生成,消費和銷燬都有WAL,因此失敗時就能夠從日誌中恢復。從源碼分析中得出只有設置了checkpoint目錄,才進行WAL機制。

 

對傳入的checkpoint目錄來建立日誌目錄進行WAL。

這裏是在checkpoint目錄下建立文件夾名爲receivedBlockMetadata的文件夾來保存WAL記錄的數據。

把當前的DStream和JobGenerator的狀態進行checkpoint,該方法是在generateJobs方法最後經過發送DoCheckpoint消息,來調用的。

總結:

ReceivedBlockTracker是經過WAL方式來進行數據容錯的。

DStreamGraph和JobGenerator是經過checkpoint方式來進行數據容錯的。

相關文章
相關標籤/搜索