Spark Streaming 是 Spark Core API 的擴展, 它支持彈性的, 高吞吐的, 容錯的實時數據流的處理. 數據能夠經過多種數據源獲取, 例如 Kafka, Flume, Kinesis 以及 TCP sockets, 也能夠經過例如 map
, reduce
, join
, window
等的高級函數組成的複雜算法處理. 最終, 處理後的數據能夠輸出到文件系統, 數據庫以及實時儀表盤中. 事實上, 你還能夠在 data streams(數據流)上使用 機器學習 以及 圖計算算法.html
運行原理算法
初始化注意點:數據庫
stop()
的可選參數,名叫 stopSparkContext
爲 false.sparkContext.textFileStream(dir)
sparkContext.socketTextStream()
KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
Batch durationapache
對於源源不斷的數據,Spark Streaming是經過切分的方式,先將連續的數據流進行離散化處理。數據流每被切分一次,對應生成一個RDD,每一個RDD都包含了一個時間間隔內所獲取到的全部數據。數組
批處理時間間隔的設置會伴隨Spark Streaming應用程序的整個生命週期,沒法在程序運行期間動態修改網絡
new StreamingContext(sparkConf,Seconds(1))
轉換操做app
dstream.transform(fun)
windowDuration/batchDuration
操做規約機器學習
普通規約是每次把window裏面每一個RDD都計算一遍,增量規約是每次只計算新進入window的數據,而後減去離開window的數據,獲得的就是window數據的大小,在使用上,增量規約須要提供一個規約函數的逆函數,好比+
對應的逆函數爲-
socket
普通規約:val wordCounts=words.map(x=>(x,1)).reduceByKeyAndWindow(_+_,Seconds(5s),seconds(1))
ide
增量規約:val wordCounts=words.map(x=>(x+1)).reduceByKeyAndWindow(_+_,_-_,Seconds(5s),seconds(1))
// 1. con't not create before foreachPartition function(cont't create in driver) // 2. use foreachPartition instead of foreach // 3. use connect pool instead of create connect every time dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // return to the pool for future reuse } }
MEMORY_ONLY_SER
MEMORY_AND_DISK_SER_2
MEMORY_ONLY
持久化sparkStreaming 週期性的把應用數據存儲到HDFS等可靠的存儲系統中能夠供回覆時使用的機制叫作檢查點機制,
做用:
數據類型:
Metadata(元數據): streaming計算邏輯,主要來恢復driver。
Configuration
:配置文件,用於建立該streaming application的全部配置
DStream operations
:對DStream進行轉換的操做集合
Incomplete batches
:未完成batchs,那些提交了job在隊列等待還沒有完成的job信息。
Data checkpointing
: 已經生成的RDD但還未保存到HDFS或者會影響後續RDD的生成。
注意點
Checkpoint類
checkpoint的形式是將類CheckPoint的實例序列化後寫入外部內存
缺點
SparkStreaming 的checkpoint機制是對CheckPoint對象進行序列化後的數據進行存儲,那麼SparkStreaming Application從新編譯後,再去反序列化checkpoint數據就會失敗,這個時候必須新建StreamingContext
針對這種狀況,在結合SparkStreaming+kafka的應用中,須要自行維護消費offsets,這樣即便從新編譯了application,仍是能夠從須要的offsets來消費數據。對於其餘狀況須要結合實際的需求進行處理。
使用
checkpoint的時間間隔正常狀況下應該是sliding interval的5-10倍,可經過dstream.checkpoint(checkpointInterval)
配置每一個流的interval。
若是想要application能從driver失敗中恢復,則application須要知足
def createStreamingContext()={ ... val sparkConf=new SparkConf().setAppName("xxx") val ssc=new StreamingContext(sparkConf,Seconds(1)) ssc.checkpoint(checkpointDir) } ... val ssc=StreamingContext.getOrCreate(checkpointDir,createSreamingContext _)
Accumulators, Broadcast Variables, and Checkpoints
在sparkStreaming中累加器和廣播變量不可以在checkpoints中恢復,廣播變量是在driver上執行的,可是當driver重啓後並無執行廣播,當slaves調用廣播變量時報Exception: (Exception("Broadcast variable '0' not loaded!",)
能夠爲累加器和廣播變量建立延遲實例化的單例實例,以便在驅動程序從新啓動失敗後從新實例化它們
問題參考:https://issues.apache.org/jira/browse/SPARK-5206
系統的容錯主要從三個方面,接收數據,數據處理和輸出數據,在sparkStreaming中,接收數據和數據來源有關係,處理數據能夠保證exactly once,輸出數據能夠保證at least once。
sparStreaming並不能徹底的像RDD那樣實現lineage,由於其有的數據源是經過網絡傳輸的,不可以重複獲取。
接收數據根據數據源不一樣容錯級別不一樣
with file
:經過hdfs等文件系統中讀取數據時能夠保證exactly-oncewith reciever-base-source
:
reliable reciever
:當reciever接收失敗時不給數據源答覆接收成功,在reciever重啓後繼續接收unreliable reciever
:接收數據後不給數據源返回接收結果,則數據源也不會再次下發數據sparkStreaming經過write-ahead-logs 提供了at least once的保證。在spark1.3版本以後,針對kafka數據源,能夠作到exactly once ,更多內容
相似於foreachRdd操做,能夠保證at least once,若是輸出時想實現exactly once可經過如下兩種方式:
Idempotent updates
:冪等更新,屢次嘗試將數據寫入同一個文件Transactional updates
:事物更新,實現方式:經過batch time和the index of rdd實現RDD的惟一標識,經過惟一標識去更新外部系統,即若是已經存在則跳過更新,若是不存在則更新。eg:dstream.foreachRDD { (rdd, time) => rdd.foreachPartition { partitionIterator => val partitionId = TaskContext.get.partitionId() val uniqueId = generateUniqueId(time.milliseconds, partitionId) // use this uniqueId to transactionally commit the data in partitionIterator } }
sparkStreaming調優主要從兩方面進行:開源節流——提升處理速度和減小輸入數據。
詳情參考:http://spark.apache.org/docs/latest/tuning.html#level-of-parallelism