做者:個推數據研發工程師 學長網絡
隨着大數據的快速發展,業務場景愈來愈複雜,離線式的批處理框架MapReduce已經不能知足業務,大量的場景須要實時的數據處理結果來進行分析、決策。Spark Streaming是一種分佈式的大數據實時計算框架,他提供了動態的,高吞吐量的,可容錯的流式數據處理,不只能夠實現用戶行爲分析,還能在金融、輿情分析、網絡監控等方面發揮做用。個推開發者服務——消息推送「應景推送」正是應用了Spark Streaming技術,基於大數據分析人羣屬性,同時利用LBS地理圍欄技術,實時觸發精準消息推送,實現用戶的精細化運營。此外,個推在應用Spark Streaming作實時處理kafka數據時,採用Direct模式代替Receiver模式的手段,實現了資源優化和程序穩定性提高。架構
本文將從Spark Streaming獲取kafka數據的兩種模式入手,結合個推實踐,帶你解讀Receiver和Direct模式的原理和特色,以及從Receiver模式到Direct模式的優化對比。 框架
該模式下:分佈式
與receiver模式相似,不一樣在於executor中沒有receiver組件,從kafka拉去數據的方式不一樣。性能
該模式下:測試
個推使用Spark Streaming作實時處理kafka數據,先前使用的是receiver模式;大數據
receiver有如下特色:優化
因爲以上特色,receiver模式下會形成必定的資源浪費;使用checkpoint保存狀態, 若是須要升級程序,則會致使checkpoint沒法使用;第3點receiver模式下會致使程序不太穩定;而且若是設置receiver數量不合理也會形成性能瓶頸在receiver。爲了優化資源和程序穩定性,應將receiver模式改形成direct模式。spa
1. 修改InputDStream的建立 code
將receiver的:
val kafkaStream = KafkaUtils.createStream(streamingContext, [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
改爲direct的:
val directKafkaStream = KafkaUtils.createDirectStream[ [key class], [value class], [key decoder class], [value decoder class] ]( streamingContext, [map of Kafka parameters], [set of topics to consume])
2. 手動維護offset
receiver模式代碼:
(receiver模式不須要手動維護offset,而是內部經過kafka consumer high level API 提交到kafka/zk保存)
kafkaStream.map { ... }.foreachRDD { rdd => // 數據處理 doCompute(rdd) }
direct模式代碼:
directKafkaStream.map { ... }.foreachRDD { rdd => // 獲取當前rdd數據對應的offset val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // 數據處理 doCompute(rdd) // 本身實現保存offset commitOffsets(offsetRanges) }
1. 在receiver模式下:
spark.streaming.backpressure.enabled 含義: 是否啓用 SparkStreaming內部的backpressure機制, 默認值:false ,表示禁用 spark.streaming.backpressure.initialRate 含義: receiver 爲第一個batch接收數據時的比率 spark.streaming.receiver.maxRate 含義: receiver接收數據的最大比率,若是設置值<=0, 則receiver接收數據比率不受限制 spark.streaming.kafka.maxRatePerPartition 含義: 從每一個kafka partition中讀取數據的最大比率
8. speculation機制
spark內置speculation機制,推測job中的運行特別慢的task,將這些task kill,並從新調度這些task執行。 默認speculation機制是關閉的,經過如下配置參數開啓:
spark.speculation=true
注意:在有些狀況下,開啓speculation反而效果很差,好比:streaming程序消費多個topic時,從kafka讀取數據直接處理,沒有從新分區,這時若是多個topic的partition的數據量相差較大那麼可能會致使正常執行更大數據量的task會被認爲執行緩慢,而被中途kill掉,這種狀況下可能致使batch的處理時間反而變長;能夠經過repartition來解決這個問題,可是要衡量repartition的時間;而在streaming程序中由於batch時間特別短,因此數據量通常較小,因此repartition的時間短,不像spark_batch一次處理大量數據一旦repartition則會特別久,因此最終仍是要根據具體狀況測試來決定。
將Receiver模式改爲Direct模式,實現了資源優化,提高了程序的穩定性,缺點是須要本身管理offset,操做相對複雜。將來,個推將不斷探索和優化Spark Streaming技術,發揮其強大的數據處理能力,爲建設實時數倉提供保障。