1 解密Spark Streaming Job架構和運行機制1 解密Spark Streaming Job架構和運行機制sql
第一部分經過案例透視Job的執行過程,案例代碼以下:apache
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} /** * * * @author DT大數據夢工廠 * 新浪微博:http://weibo.com/ilovepains/ * * 背景描述:在廣告點擊計費系統中,咱們在線過濾掉黑名單的點擊,進而保護廣告商的利益,只進行有效的廣告點擊計費 * 或者在防刷評分(或者流量)系統,過濾掉無效的點擊或者評分或者流量; * 實現技術:使用transform Api直接基於RDD編程,進行join操做 * */ object OnlineForeachRDD2DB { def main(args: Array[String]){ /** * 第1步:建立Spark的配置對象SparkConf,設置Spark程序的運行時的配置信息, * 例如說經過setMaster來設置程序要連接的Spark集羣的Master的URL,若是設置 * 爲local,則表明Spark程序在本地運行,特別適合於機器配置條件很是差(例如 * 只有1G的內存)的初學者 * */ val conf = new SparkConf() //建立SparkConf對象 conf.setAppName("OnlineForeachRDD") //設置應用程序的名稱,在程序運行的監控界面能夠看到名稱 // conf.setMaster("spark://Master:7077") //此時,程序在Spark集羣 conf.setMaster("local[6]") //設置batchDuration時間間隔來控制Job生成的頻率而且建立Spark Streaming執行的入口 val ssc = new StreamingContext(conf, Seconds(5)) val lines = ssc.socketTextStream("Master", 9999) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => { // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => { val sql = "insert into streaming_itemcount(item,count) values('" + record._1 + "'," + record._2 + ")" val stmt = connection.createStatement(); stmt.executeUpdate(sql); }) ConnectionPool.returnConnection(connection) // return to the pool for future reuse } } } /** * 在StreamingContext調用start方法的內部實際上是會啓動JobScheduler的Start方法,進行消息循環,在JobScheduler * 的start內部會構造JobGenerator和ReceiverTacker,而且調用JobGenerator和ReceiverTacker的start方法: * 1,JobGenerator啓動後會不斷的根據batchDuration生成一個個的Job * 2,ReceiverTracker啓動後首先在Spark Cluster中啓動Receiver(實際上是在Executor中先啓動ReceiverSupervisor),在Receiver收到 * 數據後會經過ReceiverSupervisor存儲到Executor而且把數據的Metadata信息發送給Driver中的ReceiverTracker,在ReceiverTracker * 內部會經過ReceivedBlockTracker來管理接受到的元數據信息 * 每一個BatchInterval會產生一個具體的Job,其實這裏的Job不是Spark Core中所指的Job,它只是基於DStreamGraph而生成的RDD * 的DAG而已,從Java角度講,至關於Runnable接口實例,此時要想運行Job須要提交給JobScheduler,在JobScheduler中經過線程池的方式找到一個 * 單獨的線程來提交Job到集羣運行(實際上是在線程中基於RDD的Action觸發真正的做業的運行),爲何使用線程池呢? * 1,做業不斷生成,因此爲了提高效率,咱們須要線程池;這和在Executor中經過線程池執行Task有殊途同歸之妙; * 2,有可能設置了Job的FAIR公平調度的方式,這個時候也須要多線程的支持; * */ ssc.start() ssc.awaitTermination() } }
2 解密Spark Streaming 容錯架構和運行機制編程
任務容錯安全
咱們知道DStream與RDD的關係就是隨着時間流逝不斷的產生RDD,對DStream的操做就是在固定時間上操做RDD。因此從某種意義上而言,Spark Streaming的基於DStream的容錯機制,實際上就是劃分到每一次造成的RDD的容錯機制,這也是Spark Streaming的高明之處。RDD做爲 分佈式彈性數據集,它的彈性主要體如今:多線程
1.自動的分配內存和硬盤,優先基於內存架構
2.基於lineage容錯機制socket
3.task會指定次數的重試分佈式
4.stage失敗會自動重試大數據
5.checkpoint和persist 複用spa
6.數據調度彈性:DAG,TASK和資源管理無關。
7.數據分片的高度彈性
基於RDD的特性,它的容錯機制主要就是兩種:一是checkpoint,二是基於lineage(血統)的容錯。通常而言,spark選擇血統容錯,由於對於大規模的數據集,作檢查點的成本很高。可是有的狀況下,不如說lineage鏈條過於複雜和冗長,這時候就須要作checkpoint。
考慮到RDD的依賴關係,每一個stage內部都是窄依賴,此時通常基於lineage容錯,方便高效。在stage之間,是寬依賴,產生了shuffle操做,這種狀況下,作檢查點則更好。總結來講,stage內部作lineage,stage之間作checkpoint。
2.任務安全性
1.Driver容錯 :
a)每一個job生成以前作一個checkpoint,生成以後再作一個checkpoint,若是出錯,就從checkpoint中恢復。
2. Executor 容錯:
a)接收數據的安全性:sparkStreaming接收到數據默認的存儲方式是memory_and_disk_ser_2 的方式; wal方式(writer ahead log),在數據到來的時候,先作記錄,而後再存到executor,
b)執行的安全性
徹底基於RDD容錯 ,
RDD可以記住每一個轉換操做,對應於Lineage圖中的一個步驟,恢復丟失分區數據時不須要寫日誌記錄大量數據