本期內容:安全
1,ReceivedBlockTracker容錯安全性數據結構
2,DStreamGraph和JobGenerator容錯安全性less
從數據層面,ReceivedBlockTracker爲整個Spark Streaming應用程序記錄元數據信息。ide
從調度層面,DStreamGraph和JobGenerator是Spark Streaming調度的核心,記錄當前調度到哪一進度,和業務有關。源碼分析
ReceivedBlockTracker在接收到元數據信息後調用addBlock方法,先寫入磁盤中,而後在寫入內存中。this


根據batchTime分配屬於當前BatchDuration要處理的數據到timToAllocatedBlocks數據結構中。日誌

Time類的是一個case class,記錄時間,重載了操做符,隱式轉換,值得借鑑。對象
case class Time(private val millis: Long) { def milliseconds: Long = millis def < (that: Time): Boolean = (this.millis < that.millis) def <= (that: Time): Boolean = (this.millis <= that.millis) def > (that: Time): Boolean = (this.millis > that.millis) def >= (that: Time): Boolean = (this.millis >= that.millis) def + (that: Duration): Time = new Time(millis + that.milliseconds) def - (that: Time): Duration = new Duration(millis - that.millis) def - (that: Duration): Time = new Time(millis - that.milliseconds) // Java-friendlier versions of the above. def less(that: Time): Boolean = this < that def lessEq(that: Time): Boolean = this <= that def greater(that: Time): Boolean = this > that def greaterEq(that: Time): Boolean = this >= that def plus(that: Duration): Time = this + that def minus(that: Time): Duration = this - that def minus(that: Duration): Time = this - that def floor(that: Duration): Time = { val t = that.milliseconds new Time((this.millis / t) * t) } def floor(that: Duration, zeroTime: Time): Time = { val t = that.milliseconds new Time(((this.millis - zeroTime.milliseconds) / t) * t + zeroTime.milliseconds) } def isMultipleOf(that: Duration): Boolean = (this.millis % that.milliseconds == 0) def min(that: Time): Time = if (this < that) this else that def max(that: Time): Time = if (this > that) this else that def until(that: Time, interval: Duration): Seq[Time] = { (this.milliseconds) until (that.milliseconds) by (interval.milliseconds) map (new Time(_)) } def to(that: Time, interval: Duration): Seq[Time] = { (this.milliseconds) to (that.milliseconds) by (interval.milliseconds) map (new Time(_)) } override def toString: String = (millis.toString + " ms") } object Time { implicit val ordering = Ordering.by((time: Time) => time.millis) }ip |
跟蹤Time對象,ReceiverTracker的allocateBlocksToBatch方法中的入參batchTime是被JobGenerator的generateJobs方法調用的。內存

JobGenerator的generateJobs方法是被定時器發送GenerateJobs消息調用的。



GenerateJobs中的時間參數就是nextTime,而nextTime+=period,這個period就是ssc.graph.batchDuration.milliseconds。

nextTime的初始值是在start方法中傳入的startTime賦值的,即RecurringTimer的getStartTime方法的返回值,是當前時間period的(整數倍+1)。



Period這個值是咱們調用new StreamingContext來構造StreamingContext時傳入的Duration值。


ReceivedBlockTracker會清除過時的元數據信息,從HashMap中移除,也是先寫入磁盤,而後在寫入內存。

元數據的生成,消費和銷燬都有WAL,因此失敗時就能夠從日誌中恢復。從源碼分析中得出只有設置了checkpoint目錄,才進行WAL機制。

對傳入的checkpoint目錄來建立日誌目錄進行WAL。

這裏是在checkpoint目錄下建立文件夾名爲receivedBlockMetadata的文件夾來保存WAL記錄的數據。


把當前的DStream和JobGenerator的狀態進行checkpoint,該方法是在generateJobs方法最後經過發送DoCheckpoint消息,來調用的。



總結:
ReceivedBlockTracker是經過WAL方式來進行數據容錯的。
DStreamGraph和JobGenerator是經過checkpoint方式來進行數據容錯的。