Spark Streaming的優化之路——從Receiver到Direct模式

          做者:個推數據研發工程師 學長網絡

1 業務背景

隨着大數據的快速發展,業務場景愈來愈複雜,離線式的批處理框架MapReduce已經不能知足業務,大量的場景須要實時的數據處理結果來進行分析、決策。Spark Streaming是一種分佈式的大數據實時計算框架,他提供了動態的,高吞吐量的,可容錯的流式數據處理,不只能夠實現用戶行爲分析,還能在金融、輿情分析、網絡監控等方面發揮做用。個推開發者服務——消息推送「應景推送」正是應用了Spark Streaming技術,基於大數據分析人羣屬性,同時利用LBS地理圍欄技術,實時觸發精準消息推送,實現用戶的精細化運營。此外,個推在應用Spark Streaming作實時處理kafka數據時,採用Direct模式代替Receiver模式的手段,實現了資源優化和程序穩定性提高。架構

本文將從Spark Streaming獲取kafka數據的兩種模式入手,結合個推實踐,帶你解讀Receiver和Direct模式的原理和特色,以及從Receiver模式到Direct模式的優化對比。框架

2 兩種模式的原理和區別

Receiver模式

1. Receiver模式下的運行架構

  • InputDStream: 從流數據源接收的輸入數據。
  • Receiver:負責接收數據流,並將數據寫到本地。
  • Streaming Context:表明SparkStreaming,負責Streaming層面的任務調度,生成jobs發送到Spark engine處理。
  • Spark Context: 表明Spark Core,負責批處理層面的任務調度,真正執行job的Spark engine。

2. Receiver從kafka拉取數據的過程

該模式下:分佈式

  • 在executor上會有receiver從kafka接收數據並存儲在Spark executor中,在到了batch時間後觸發job去處理接收到的數據,1個receiver佔用1個core;
  • 爲了避免丟數據須要開啓WAL機制,這會將receiver接收到的數據寫一份備份到第三方系統上(如:HDFS);
  • receiver內部使用kafka High Level API去消費數據及自動更新offset。  

Direct模式

1. Direct模式下的運行架構

與receiver模式相似,不一樣在於executor中沒有receiver組件,從kafka拉去數據的方式不一樣。性能

2. Direct從kafka拉取數據的過程

  該模式下:測試

  • 沒有receiver,無需額外的core用於不停地接收數據,而是按期查詢kafka中的每一個partition的最新的offset,每一個批次拉取上次處理的offset和當前查詢的offset的範圍的數據進行處理;
  • 爲了避免丟數據,無需將數據備份落地,而只須要手動保存offset便可;
  • 內部使用kafka simple Level API去消費數據, 須要手動維護offset,kafka zk上不會自動更新offset。  

Receiver與Direct模式的區別

  1. 前者在executor中有Receiver接受數據,而且1個Receiver佔用一個core;然後者無Receiver,因此不會暫用core;  
  2. 前者InputDStream的分區是 num_receiver *batchInterval/blockInteral,後者的分區數是kafka topic partition的數量。Receiver模式下num_receiver的設置不合理會影響性能或形成資源浪費;若是設置過小,並行度不夠,整個鏈路上接收數據將是瓶頸;若是設置太多,則會浪費資源;  
  3. 前者使用zookeeper來維護consumer的偏移量,然後者須要本身維護偏移量;  
  4. 爲了保證不丟失數據,前者須要開啓WAL機制,然後者不須要,只須要在程序中成功消費完數據後再更新偏移量便可。  

3 Receiver改形成Direct模式

個推使用Spark Streaming作實時處理kafka數據,先前使用的是receiver模式;大數據

receiver有如下特色:優化

  1. receiver模式下,每一個receiver須要單獨佔用一個core;
  2. 爲了保證不丟失數據,須要開啓WAL機制,使用checkpoint保存狀態;
  3. 當receiver接受數據速率大於處理數據速率,致使數據積壓,最終可能會致使程序掛掉。

因爲以上特色,receiver模式下會形成必定的資源浪費;使用checkpoint保存狀態, 若是須要升級程序,則會致使checkpoint沒法使用;第3點receiver模式下會致使程序不太穩定;而且若是設置receiver數量不合理也會形成性能瓶頸在receiver。爲了優化資源和程序穩定性,應將receiver模式改形成direct模式。spa

修改方式以下:

1. 修改InputDStream的建立code

將receiver的:

val kafkaStream = KafkaUtils.createStream(streamingContext,
     [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

改爲direct的:

val directKafkaStream = KafkaUtils.createDirectStream[
     [key class], [value class], [key decoder class], [value decoder class] ](
     streamingContext, [map of Kafka parameters], [set of topics to consume])

2. 手動維護offset

receiver模式代碼:

(receiver模式不須要手動維護offset,而是內部經過kafka consumer high level API 提交到kafka/zk保存)

kafkaStream.map {
           ...
 }.foreachRDD { rdd =>
    // 數據處理
    doCompute(rdd)
 }

direct模式代碼:

directKafkaStream.map {
           ...
 }.foreachRDD { rdd =>
    // 獲取當前rdd數據對應的offset
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    // 數據處理
    doCompute(rdd)
    // 本身實現保存offset
    commitOffsets(offsetRanges)
 }

4 其餘優化點

1. 在receiver模式下:

  • 拆分InputDStream,增長Receiver,從而增長接收數據的並行度;
  • 調整blockInterval,適當減少,增長task數量,從而增長並行度(在core的數量>task數量的狀況下);
  • 若是開啓了WAL機制,數據的存儲級別設置爲MOMERY_AND_DISK_SER。
  1. 數據序列化使用Kryoserializationl,相比Java serializationl 更快,序列化後的數據更小;  
  2. 建議使用CMS垃圾回收器下降GC開銷;  
  3. 選擇高性能的算子(mapPartitions, foreachPartitions, aggregateByKey等);  
  4. repartition的使用:在streaming程序中由於batch時間特別短,因此數據量通常較小,因此repartition的時間短,能夠解決一些由於topicpartition中數據分配不均勻致使的數據傾斜問題;  
  5. 由於SparkStreaming生產的job最終都是在sparkcore上運行的,因此sparkCore的優化也很重要;  
  6. BackPressure流控
  • 爲何引入Backpressure?當batch processing time>batchinterval 這種狀況持續過長的時間,會形成數據在內存中堆積,致使Receiver所在Executor內存溢出等問題;
  • Backpressure:根據JobScheduler反饋做業的執行信息來動態調整數據接收率;
  • 配置使用:
spark.streaming.backpressure.enabled
含義: 是否啓用 SparkStreaming內部的backpressure機制,
默認值:false ,表示禁用

spark.streaming.backpressure.initialRate
含義: receiver 爲第一個batch接收數據時的比率

spark.streaming.receiver.maxRate
含義: receiver接收數據的最大比率,若是設置值<=0, 則receiver接收數據比率不受限制

spark.streaming.kafka.maxRatePerPartition
含義: 從每一個kafka partition中讀取數據的最大比率

8. speculation機制

spark內置speculation機制,推測job中的運行特別慢的task,將這些task kill,並從新調度這些task執行。 默認speculation機制是關閉的,經過如下配置參數開啓:

spark.speculation=true

注意:在有些狀況下,開啓speculation反而效果很差,好比:streaming程序消費多個topic時,從kafka讀取數據直接處理,沒有從新分區,這時若是多個topic的partition的數據量相差較大那麼可能會致使正常執行更大數據量的task會被認爲執行緩慢,而被中途kill掉,這種狀況下可能致使batch的處理時間反而變長;能夠經過repartition來解決這個問題,可是要衡量repartition的時間;而在streaming程序中由於batch時間特別短,因此數據量通常較小,因此repartition的時間短,不像spark_batch一次處理大量數據一旦repartition則會特別久,因此最終仍是要根據具體狀況測試來決定。

5 總結

將Receiver模式改爲Direct模式,實現了資源優化,提高了程序的穩定性,缺點是須要本身管理offset,操做相對複雜。將來,個推將不斷探索和優化Spark Streaming技術,發揮其強大的數據處理能力,爲建設實時數倉提供保障。

相關文章
相關標籤/搜索