Spark之性能優化重點——並行流數據接收

問題導讀html

一、如何減小批數據的執行時間?
二、Spark有哪些方面的性能優化?
三、有哪些錯誤咱們須要關心?
node

(一)減小批數據的執行時間程序員

Spark中有幾個優化能夠減小批處理的時間。這些能夠在優化指南中做了討論。這節重點討論幾個重要的。面試

數據接收的並行水平apache

經過網絡(如kafka,flume,socket等)接收數據須要這些數據反序列化並被保存到Spark中。若是數據接收成爲系統的瓶頸,就要考慮並行地接收數據。注意,每一個輸入DStream建立一個receiver(運行在worker機器上) 接收單個數據流。建立多個輸入DStream並配置它們能夠從源中接收不一樣分區的數據流,從而實現多數據流接收。例如,接收兩個topic數據的單個輸入DStream能夠被切分爲兩個kafka輸入流,每一個接收一個topic。這將 在兩個worker上運行兩個receiver,所以容許數據並行接收,提升總體的吞吐量。多個DStream能夠被合併生成單個DStream,這樣運用在單個輸入DStream的transformation操做能夠運用在合併的DStream上。api

val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
數組

另一個須要考慮的參數是receiver的阻塞時間。對於大部分的receiver,在存入Spark內存以前,接收的數據都被合併成了一個大數據塊。每批數據中塊的個數決定了任務的個數。這些任務是用類 似map的transformation操做接收的數據。阻塞間隔由配置參數spark.streaming.blockInterval決定,默認的值是200毫秒。緩存

多輸入流或者多receiver的可選的方法是明確地從新分配輸入數據流(利用inputStream.repartition()),在進一步操做以前,經過集羣的機器數分配接收的批數據。性能優化

數據處理的並行水平網絡

若是運行在計算stage上的併發任務數不足夠大,就不會充分利用集羣的資源。例如,對於分佈式reduce操做如reduceByKey和reduceByKeyAndWindow,默認的併發任務數經過配置屬性來肯定(configuration.html#spark-properties) spark.default.parallelism。你能夠經過參數(PairDStreamFunctions (api/scala/index.html#org.apache.spark.streaming.dstream.PairDStreamFunctions))傳遞並行度,或者設置參數 spark.default.parallelism修改默認值。

數據序列化

數據序列化的總開銷是日常大的,特別是當sub-second級的批數據被接收時。下面有兩個相關點:

Spark中RDD數據的序列化。關於數據序列化請參照Spark優化指南。注意,與Spark不一樣的是,默認的RDD會被持久化爲序列化的字節數組,以減小與垃圾回收相關的暫停。

輸入數據的序列化。從外部獲取數據存到Spark中,獲取的byte數據須要從byte反序列化,而後再按照Spark的序列化格式從新序列化到Spark中。所以,輸入數據的反序列化花費多是一個瓶頸。

任務的啓動開支

每秒鐘啓動的任務數是很是大的(50或者更多)。發送任務到slave的花費明顯,這使請求很難得到亞秒(sub-second)級別的反應。經過下面的改變能夠減少開支

任務序列化。運行kyro序列化任何能夠減少任務的大小,從而減少任務發送到slave的時間。

執行模式。在Standalone模式下或者粗粒度的Mesos模式下運行Spark能夠在比細粒度Mesos模式下運行Spark得到更短的任務啓動時間。能夠在在Mesos下運行Spark中獲取更多信息。

These changes may reduce batch processing time by 100s of milliseconds, thus allowing sub-second batch size to be viable.

(二)設置正確的批容量

爲了Spark Streaming應用程序可以在集羣中穩定運行,系統應該可以以足夠的速度處理接收的數據(即處理速度應該大於或等於接收數據的速度)。這能夠經過流的網絡UI觀察獲得。批處理時間應該小於批間隔時間。

根據流計算的性質,批間隔時間可能顯著的影響數據處理速率,這個速率能夠經過應用程序維持。能夠考慮WordCountNetwork這個例子,對於一個特定的數據處理速率,系統可能能夠每2秒打印一次單詞計數 (批間隔時間爲2秒),但沒法每500毫秒打印一次單詞計數。因此,爲了在生產環境中維持指望的數據處理速率,就應該設置合適的批間隔時間(即批數據的容量)。

找出正確的批容量的一個好的辦法是用一個保守的批間隔時間(5-10,秒)和低數據速率來測試你的應用程序。爲了驗證你的系統是否能知足數據處理速率,你能夠經過檢查端到端的延遲值來判斷(能夠在 Spark驅動程序的log4j日誌中查看"Total delay"或者利用StreamingListener接口)。若是延遲維持穩定,那麼系統是穩定的。若是延遲持續增加,那麼系統沒法跟上數據處理速率,是不穩定的。 你可以嘗試着增長數據處理速率或者減小批容量來做進一步的測試。注意,由於瞬間的數據處理速度增長致使延遲瞬間的增加多是正常的,只要延遲能從新回到了低值(小於批容量)。

(三)內存調優

調整內存的使用以及Spark應用程序的垃圾回收行爲已經在Spark優化指南中詳細介紹。在這一節,咱們重點介紹幾個強烈推薦的自定義選項,它們能夠 減小Spark Streaming應用程序垃圾回收的相關暫停,得到更穩定的批處理時間。

• Default persistence level of DStreams:和RDDs不一樣的是,默認的持久化級別是序列化數據到內存中(DStream是StorageLevel.MEMORY_ONLY_SER,RDD是StorageLevel.MEMORY_ONLY)。 即便保存數據爲序列化形態會增長序列化/反序列化的開銷,可是能夠明顯的減小垃圾回收的暫停。

• Clearing persistent RDDs:默認狀況下,經過Spark內置策略(LUR),Spark Streaming生成的持久化RDD將會從內存中清理掉。若是spark.cleaner.ttl已經設置了,比這個時間存在更老的持久化 RDD將會被定時的清理掉。正如前面提到的那樣,這個值須要根據Spark Streaming應用程序的操做當心設置。然而,能夠設置配置選項spark.streaming.unpersist爲true來更智能的去持久化(unpersist)RDD。這個 配置使系統找出那些不須要常常保有的RDD,而後去持久化它們。這能夠減小Spark RDD的內存使用,也可能改善垃圾回收的行爲。

• Concurrent garbage collector:使用併發的標記-清除垃圾回收能夠進一步減小垃圾回收的暫停時間。儘管併發的垃圾回收會減小系統的總體吞吐量,可是仍然推薦使用它以得到更穩定的批處理時間。

(四)容錯語義

這一節,咱們將討論在節點錯誤事件時Spark Streaming的行爲。爲了理解這些,讓咱們先記住一些Spark RDD的基本容錯語義。

• 一個RDD是不可變的、肯定可重複計算的、分佈式數據集。每一個RDD記住一個肯定性操做的譜系(lineage),這個譜系用在容錯的輸入數據集上來建立該RDD。

• 若是任何一個RDD的分區由於節點故障而丟失,這個分區能夠經過操做譜系從源容錯的數據集中從新計算獲得。

• 假定全部的RDD transformations是肯定的,那麼最終轉換的數據是同樣的,不論Spark機器中發生何種錯誤。

Spark運行在像HDFS或S3等容錯系統的數據上。所以,任何從容錯數據而來的RDD都是容錯的。然而,這不是在Spark Streaming的狀況下,由於Spark Streaming的數據大部分狀況下是從 網絡中獲得的。爲了得到生成的RDD相同的容錯屬性,接收的數據須要重複保存在worker node的多個Spark executor上(默認的複製因子是2),這致使了當出現錯誤事件時,有兩類數據須要被恢復

Data received and replicated :在單個worker節點的故障中,這個數據會倖存下來,由於有另一個節點保存有這個數據的副本。

Data received but buffered for replication:由於沒有重複保存,因此爲了恢復數據,惟一的辦法是從源中從新讀取數據。

有兩種錯誤咱們須要關心

• worker節點故障:任何運行executor的worker節點都有可能出故障,那樣在這個節點中的全部內存數據都會丟失。若是有任何receiver運行在錯誤節點,它們的緩存數據將會丟失

• Driver節點故障:若是運行Spark Streaming應用程序的Driver節點出現故障,很明顯SparkContext將會丟失,全部執行在其上的executors也會丟失。

做爲輸入源的文件語義(Semantics with files as input source)

若是全部的輸入數據都存在於一個容錯的文件系統如HDFS,Spark Streaming總能夠從任何錯誤中恢復而且執行全部數據。這給出了一個剛好一次(exactly-once)語義,即不管發生什麼故障, 全部的數據都將會剛好處理一次。

基於receiver的輸入源語義

對於基於receiver的輸入源,容錯的語義既依賴於故障的情形也依賴於receiver的類型。正如以前討論的,有兩種類型的receiver

Reliable Receiver:這些receivers只有在確保數據複製以後纔會告知可靠源。若是這樣一個receiver失敗了,緩衝(非複製)數據不會被源所認可。若是receiver重啓,源會重發數 據,所以不會丟失數據。

Unreliable Receiver:當worker或者driver節點故障,這種receiver會丟失數據

選擇哪一種類型的receiver依賴於這些語義。若是一個worker節點出現故障,Reliable Receiver不會丟失數據,Unreliable Receiver會丟失接收了可是沒有複製的數據。若是driver節點 出現故障,除了以上狀況下的數據丟失,全部過去接收並複製到內存中的數據都會丟失,這會影響有狀態transformation的結果。

爲了不丟失過去接收的數據,Spark 1.2引入了一個實驗性的特徵write ahead logs,它保存接收的數據到容錯存儲系統中。有了write ahead logs和Reliable Receiver,咱們能夠 作到零數據丟失以及exactly-once語義。

下面的表格總結了錯誤語義:

 輸出操做的語義

根據其肯定操做的譜系,全部數據都被建模成了RDD,全部的從新計算都會產生一樣的結果。全部的DStream transformation都有exactly-once語義。那就是說,即便某個worker節點出現 故障,最終的轉換結果都是同樣。然而,輸出操做(如foreachRDD)具備at-least once語義,那就是說,在有worker事件故障的狀況下,變換後的數據可能被寫入到一個外部實體不止一次。 利用saveAs***Files將數據保存到HDFS中的狀況下,以上寫屢次是可以被接受的(由於文件會被相同的數據覆蓋)。

結語

感謝您的觀看,若有不足之處,歡迎批評指正。

若是有對大數據感興趣的小夥伴或者是從事大數據的老司機能夠加羣:

658558542    

歡迎你們交流分享,學習交流,共同進步。(裏面還有大量的免費資料,幫助你們在成爲大數據工程師,乃至架構師的路上披荊斬棘!)

最後祝福全部遇到瓶頸的大數據程序員們突破本身,祝福你們在日後的工做與面試中一切順利。

相關文章
相關標籤/搜索