數據接收並行度調優(一)
經過網絡接收數據時(好比Kafka、Flume),會將數據反序列化,並存儲在Spark的內存中。若是數據接收稱爲系統的瓶頸,那麼能夠考慮並行化數據接收。
每個輸入DStream都會在某個Worker的Executor上啓動一個Receiver,該Receiver接收一個數據流。所以能夠經過建立多個輸入DStream,而且配置它們接收數據源不一樣的分區數據,達到接收多個數據流的效果。好比說
,一個接收兩個Kafka Topic的輸入DStream,能夠被拆分爲兩個輸入DStream,每一個分別接收一個topic的數據。這樣就會建立兩個Receiver,從而並行地接收數據,進而提高吞吐量。多個DStream可使用
union算子進行聚合,從而造成一個DStream。而後後續的transformation算子操做都針對該一個聚合後的DStream便可。
int numStreams = 5;
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String, String>>(numStreams);
for (int i = 0; i < numStreams; i++) {
kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();
數據接收並行度調優(二)
數據接收並行度調優,除了建立更多輸入DStream和Receiver之外,還能夠考慮調節
block interval。經過參數,spark.streaming.blockInterval,能夠設置block interval,默認是
200ms。對於大多數Receiver來講,在將接收到的數據保存到Spark的BlockManager以前,都會將數據切分爲一個一個的block。
而每一個batch中的block數量,則決定了該batch對應的RDD的partition的數量,以及針對該RDD執行transformation操做時,建立的task的數量。每一個batch對應的task數量是大約估計的,即batch interval / block interval。
例如說,batch interval爲2s,block interval爲200ms,會建立10個task。若是你認爲每一個batch的task數量太少,即低於每臺機器的cpu core數量,那麼就說明batch的task數量是不夠的,由於全部的cpu資源沒法徹底被利用起來。要爲batch增長block的數量,那麼就減少block interval。然而,
推薦的block interval最小值是50ms,若是低於這個數值,那麼大量task的啓動時間,可能會變成一個性能開銷點。
數據接收並行度調優(三)
除了上述說的兩個提高數據接收並行度的方式,還有一種方法,就是顯式地對輸入數據流進行重分區。使用inputStream.repartition(<number of partitions>)便可。這樣就能夠將接收到的batch,分佈到指定數量的機器上,而後再進行進一步的操做。
任務啓動調優
若是每秒鐘啓動的task過於多,好比每秒鐘啓動50個,那麼發送這些task去Worker節點上的Executor的性能開銷,會比較大,並且此時基本就很難達到毫秒級的延遲了。使用下述操做能夠減小這方面的性能開銷:
一、Task序列化:使用
Kryo序列化機制來序列化task,能夠減少task的大小,從而減小發送這些task到各個Worker節點上的Executor的時間。
二、執行模式:
在Standalone模式下運行Spark,能夠達到更少的task啓動時間。
上述方式,也許能夠將每一個batch的處理時間減小100毫秒。從而從秒級降到毫秒級。
數據處理並行度調優
若是在計算的任何stage中使用的並行task的數量沒有足夠多,那麼集羣資源是沒法被充分利用的。舉例來講,對於分佈式的reduce操做,好比reduceByKey和reduceByKeyAndWindow,
默認的並行task的數量是由spark.default.parallelism參數決定的。你能夠在reduceByKey等操做中,傳入第二個參數,手動指定該操做的並行度,也能夠調節全局的spark.default.parallelism參數。
數據序列化調優(一)
數據序列化形成的系統開銷能夠由序列化格式的優化來減少。在流式計算的場景下,有兩種類型的數據須要序列化。
一、輸入數據:默認狀況下,接收到的輸入數據,是存儲在Executor的內存中的,使用的持久化級別是
StorageLevel.MEMORY_AND_DISK_SER_2。這意味着,數據被序列化爲字節從而減少GC開銷,而且會複製以進行executor失敗的容錯。所以,數據首先會存儲在內存中,而後在內存不足時會溢寫到磁盤上,從而爲流式計算來保存全部須要的數據。這裏的序列化有明顯的性能開銷——Receiver必須反序列化從網絡接收到的數據,而後再使用Spark的序列化格式序列化數據。
二、流式計算操做生成的持久化RDD:流式計算操做生成的持久化RDD,可能會持久化到內存中。例如,窗口操做默認就會將數據持久化在內存中,由於這些數據後面可能會在多個窗口中被使用,並被處理屢次。然而,不像Spark Core的默認持久化級別,
StorageLevel.MEMORY_ONLY,流式計算操做生成的RDD的默認持久化級別是StorageLevel.MEMORY_ONLY_SER ,默認就會減少GC開銷。
數據序列化調優(二)
在上述的場景中,
使用Kryo序列化類庫能夠減少CPU和內存的性能開銷。使用Kryo時,必定要考慮註冊自定義的類,而且禁用對應引用的tracking(spark.kryo.referenceTracking)。
在一些特殊的場景中,好比須要爲流式應用保持的數據總量並非不少,也許能夠將數據以非序列化的方式進行持久化,從而減小序列化和反序列化的CPU開銷,並且又不會有太昂貴的GC開銷。舉例來講,若是你數秒的batch interval,而且沒有使用window操做,那麼你能夠考慮經過顯式地設置持久化級別,來禁止持久化時對數據進行序列化。這樣就能夠減小用於序列化和反序列化的CPU性能開銷,而且不用承擔太多的GC開銷。
batch interval調優(最重要)
若是想讓一個運行在集羣上的Spark Streaming應用程序能夠穩定,它就必須儘量快地處理接收到的數據。換句話說,batch應該在生成以後,就儘量快地處理掉。對於一個應用來講,這個是否是一個問題,能夠經過觀察Spark UI上的batch處理時間來定。batch處理時間必須小於batch interval時間。
基於流式計算的本質,batch interval對於,在固定集羣資源條件下,應用能保持的數據接收速率,會有巨大的影響。例如,在WordCount例子中,對於一個特定的數據接收速率,應用業務能夠保證每2秒打印一次單詞計數,而不是每500ms。所以batch interval須要被設置得,讓預期的數據接收速率能夠在生產環境中保持住。
爲你的應用計算正確的batch大小的比較好的方法,是在一個很保守的batch interval,好比5~10s,以很慢的數據接收速率進行測試。要檢查應用是否跟得上這個數據速率,能夠檢查每一個batch的處理時間的延遲,若是處理時間與batch interval基本吻合,那麼應用就是穩定的。不然,若是batch調度的延遲持續增加,那麼就意味應用沒法跟得上這個速率,也就是不穩定的。所以你要想有一個穩定的配置,能夠嘗試提高數據處理的速度,或者增長batch interval。記住,因爲臨時性的數據增加致使的暫時的延遲增加,能夠合理的,只要延遲狀況能夠在短期內恢復便可。
內存調優(一)
優化Spark應用的內存使用和GC行爲,在Spark Core的調優中,已經講過了。這裏講一下與Spark Streaming應用相關的調優參數。
Spark Streaming應用須要的集羣內存資源,是由使用的transformation操做類型決定的。舉例來講,
若是想要使用一個窗口長度爲10分鐘的window操做,那麼集羣就必須有足夠的內存來保存10分鐘內的數據。若是想要使用updateStateByKey來維護許多key的state,那麼你的內存資源就必須足夠大。反過來講,若是想要作一個簡單的map-filter-store操做,那麼須要使用的內存就不多。
一般來講,經過Receiver接收到的數據,會使用StorageLevel.MEMORY_AND_DISK_SER_2持久化級別來進行存儲,所以沒法保存在內存中的數據會溢寫到磁盤上。
而溢寫到磁盤上,是會下降應用的性能的。所以,一般是建議爲應用提供它須要的足夠的內存資源。建議在一個小規模的場景下測試內存的使用量,並進行評估。
內存調優(二)
內存調優的另一個方面是垃圾回收。對於流式應用來講,若是要得到低延遲,確定不想要有由於JVM垃圾回收致使的長時間延遲。有不少參數能夠幫助下降內存使用和GC開銷:
一、
DStream的持久化:正如在「數據序列化調優」一節中提到的,輸入數據和某些操做生產的中間RDD,默認持久化時都會序列化爲字節。與非序列化的方式相比,這會下降內存和GC開銷。使用Kryo序列化機制能夠進一步減小內存使用和GC開銷。
進一步下降內存使用率,能夠對數據進行壓縮,由spark.rdd.compress參數控制(默認false)。
二、
清理舊數據:默認狀況下,全部輸入數據和經過DStream transformation操做生成的持久化RDD,會自動被清理。Spark Streaming會決定什麼時候清理這些數據,取決於transformation操做類型。例如,
你在使用窗口長度爲10分鐘內的window操做,Spark會保持10分鐘之內的數據,時間過了之後就會清理舊數據。可是在某些特殊場景下,好比Spark SQL和Spark Streaming整合使用時,在異步開啓的線程中,使用Spark SQL針對batch RDD進行執行查詢。那麼就須要讓Spark保存更長時間的數據,直到Spark SQL查詢結束。可使用
streamingContext.remember()方法來實現。
三、
CMS垃圾回收器:使用並行的mark-sweep垃圾回收機制,被推薦使用,用來保持GC低開銷。雖然並行的GC會下降吞吐量,可是仍是建議使用它,來減小batch的處理時間(下降處理過程當中的gc開銷)。若是要使用,那麼要在driver端和executor端都開啓。
在spark-submit中使用--driver-java-options設置;使用spark.executor.extraJavaOptions參數設置。-XX:+UseConcMarkSweepGC。