發表於:《程序員》雜誌2016年2月刊。連接:http://geek.csdn.net/news/detail/54500程序員
做者:徐鑫,董西成算法
在流式計算領域,Spark Streaming和Storm時下應用最普遍的兩個計算引擎。其中,Spark Streaming是Spark生態系統中的重要組成部分,在實現上覆用Spark計算引擎。如圖1所示,Spark Streaming支持的數據源有不少,如Kafka、Flume、TCP等。Spark Streaming的內部數據表示形式爲DStream(Discretized Stream,離散的數據流),其接口設計與RDD很是類似,這使得它對Spark用戶很是友好。Spark Streaming的核心思想是把流式處理轉化爲「微批處理」,即以時間爲單位切分數據流,每一個切片內的數據對應一個RDD,進而能夠採用Spark引擎進行快速計算。因爲Spark Streaming採用了微批處理方式,所以嚴格來講只是一個近實時的處理系統,而不是真正的流式處理系統。數據庫
圖1:Spark Streaming數據流緩存
Storm是這個領域另外一個著名的開源流式計算引擎,這是一個真正的流式處理系統,它每次從數據源讀一條數據,而後單獨處理。相比於Spark Streaming,Storm有更快速的響應時間(小於一秒),更適合低延遲的應用場景,好比信用卡欺詐系統,廣告系統等。可是對比Storm,Spark Streaming的優點是吞吐量大,響應時間也能夠接受(秒級),而且兼容Spark系統中的其餘工具庫,如MLlib和GraphX。從而,對於時間不敏感且流量很大的系統,Spark Streaming是更優的選擇。服務器
Spark Streaming在Hulu應用網絡
Hulu是美國的專業在線視頻網站,天天會有大量用戶在線觀看視頻,進而產生大量用戶觀看的行爲數據。這些數據經過收集系統進入Hulu的大數據平臺,存儲並作進一步處理。在大數據平臺之上,各個團隊會根據須要設計相應的算法對數據進行分析和挖掘以便產生商業價值:推薦團隊從這些數據裏挖掘出用戶感興趣的內容並作精準推薦,廣告團隊根據用戶的歷史行爲推送最合適的廣告,數據團隊從數據的各個維度進行分析從而爲公司的策略制定提供可靠依據。session
Hulu大數據平臺的實現依循Lambda架構。Lambda架構是一個通用的大數據處理框架,包含離線的批處理層、在線的加速層和服務層三部分,具體如圖2所示。服務層通常使用HTTP服務或自定製的客戶端對外提供數據訪問,離線的批處理層通常使用批處理計算框架Spark和MapReduce進行數據分析,在線的加速層通常使用流式實時計算框架Spark Streaming和Storm進行數據分析。架構
圖2:lambda架構原理圖app
對於實時計算部分,Hulu內部使用了Kafka、Codis和Spark Streaming。下面按照數據流的過程,介紹咱們的項目。框架
從服務器日誌中收集數據,主要包括兩個部分:
q 來自網頁、手機App、機頂盒等設備用戶產生的視頻觀看、廣告點擊等行爲,這些行爲數據記錄在各自的Nginx服務的日誌中。
q 使用Flume將用戶行爲數據同時導入HDFS和Kafka,其中HDFS中的數據用於離線分析,而Kafka中數據則用於流式實時分析。
圖3:Hulu數據收集流程
Hulu使用HBase存儲用戶標籤數據,包括基本信息如性別、年齡、是否付費,以及其餘模型推測出來的偏好屬性。這些屬性須要做爲計算模型的輸入,同時HBase隨機讀取的速度比較慢,須要將數據同步到緩存服務器中以加快數據讀取速度。Redis是一個應用普遍的開源緩存服務器,但其自己是個單機系統,不能很好地支持大量數據的緩存。爲解決Redis擴展性差的問題,豌豆莢開源了Codis,一個分佈式Redis解決方案。Hulu將Codis打成Docker鏡像,並實現一鍵式構建緩存系統,附帶自動監控和修復功能。爲了更精細的監控,Hulu構建了多個Codis緩存,分別是:
q codis-profile,同步HBase中的用戶屬性;
q codis-action,緩存來自Kafka的用戶行爲;
q codis-result,記錄計算結果。
在一切準備就緒,啓動Spark Streaming程序:
1) Spark Streaming啓動Kafka Receiver,持續地從Kafka服務器拉取數據;
2) 每隔兩秒,Kafka的數據被整理成一個RDD,交給Spark引擎處理;
3) 對一條用戶行爲,Spark會從codis-action緩存中拿到該用戶的行爲記錄,而後把新的行爲追加進去;
4) Spark從codis-action和codis-profile中得到該用戶的全部相關屬性,而後執行廣告和推薦的計算模型,最後把結果寫入codis-result,進而供服務層實時讀取這些結果。
Spark Streaming優化經驗
實踐中,業務邏輯首先保證完成,使得在Kafka輸入數據量較小的狀況下系統穩定運行,且輸入輸出知足項目需求。而後開始調優,修改Spark Streaming的參數,好比Executor的數量,Core的數量,Receiver的流量等。最後發現僅調參數沒法徹底知足本項目的業務場景,因此有更進一步的優化方案,總結以下:
不少機器學習的模型在第一次運行時,須要執行初始化方法,還會鏈接外部的數據庫,經常須要5-10分鐘,這會成爲潛在的不穩定因素。在Spark Streaming應用中,當Receiver完成初始化,它就開始源源不斷地接收數據,而且由Driver按期調度任務消耗這些數據。若是剛啓動時Executor須要幾分鐘作準備,會致使第一個做業一直沒有完成,這段時間內 Driver不會調度新的做業。這時候在Kafka Receiver端會有數據積壓,隨着積壓的數據量愈來愈大,大部分數據會撐過新生代進入老年代,進而給Java GC帶來嚴重的壓力,容易引起應用程序崩潰。
本項目的解決方案是,修改Spark內核,在每一個Executor接收任務以前先執行一個用戶自定義的初始化函數,初始化函數中能夠執行一些獨立的用戶邏輯。示例代碼以下:
// sc:是SparkContext, setupEnvironment是Hulu擴展的API sc.setupEnvironment(() => { application.initialize() // 用戶應用程序初始化,需執行幾分鐘 }) |
該方案須要更改Spark的任務調度器,首先將每一個Executor設置爲未初始化狀態。此時,調度器只會給未初始化狀態的Executor分配初始化任務(執行前面提到的初始化函數)。等初始化任務完畢,調度器更新Executor的狀態爲已初始化,這樣的Executor才能夠分配正常的計算任務。
本項目中,模型的輸入參數均來自Codis,甚至模型內部也可能訪問外部存儲,直接致使模型計算時長不穩定,不少時間消耗在網絡等待上。
爲提升系統吞吐量,增大並行度是經常使用的優化方案,但在本項目的場景中並不適用。Spark做業的調度策略是,等待上一個做業的全部Task執行完畢,而後調度下一個做業。若是單個Task的運行時間不穩定,易發生個別Task拖慢整個做業的狀況,以致於資源利用率不高;甚至並行度越大,該問題越嚴重。一種經常使用解決Task不穩定的方案是增大Spark Streaming的micro batch的時間間隔,該方案會使整個實時系統的延遲變長,並不推薦。
所以這裏經過異步處理Task中的業務邏輯來解決。以下文的代碼所示,同步方案中,Task內執行業務邏輯,處理時間不定;異步方案中,Task把業務邏輯嵌入線程,交給線程池執行,Task馬上結束, Executor向Driver報告執行完畢,異步處理的時間很是短,在100ms之內。另外,當線程池中積壓的線程數量太大時(代碼中qsize>100的狀況),會暫時使用同步處理,配合反壓機制(見下文的參數spark.streaming.backpressure.enabled),能夠保證不會由於數據積壓過多而致使系統崩潰。經實驗驗證,該方案大大提升了系統的吞吐量。
// 同步處理 // 函數 runBusinessLogic是 Task 中的業務邏輯,執行時間不定 rdd.foreachPartition(partition => runBusinessLogic (partition))
// 異步處理,threadPool是線程池 rdd.foreachPartition(partition => { val qsize = threadPool.getQueue.size // 線程池中積壓的線程數 if (qsize > 100) { runBusinessLogic(partition) // 暫時同步處理 } threadPool.execute(new Runnable { override def run() = runBusinessLogic(partition) }) }) |
異步化Task也存在缺點:若是Executor發生異常,存放在線程池中的業務邏輯沒法從新計算,會致使部分數據丟失。經實驗驗證,僅當Executor異常崩潰時有數據丟失,且不常見,在本項目的場景中能夠接受。
本項目使用了Spark Streaming中的Kafka Receiver,本質上調用Kafka官方的客戶端ZookeeperConsumerConnector。其策略是每一個客戶端在Zookeeper的固定路徑下把本身註冊爲臨時節點,因而全部客戶端都知道其餘客戶端的存在,而後自動協調和分配Kafka的數據資源。該策略存在一個弊端,當一個客戶端與Zookeeper的鏈接狀態發生改變(斷開或者連上),全部的客戶端都會經過Zookeeper協調, 從新分配Kafka的數據資源;在此期間全部客戶端都斷開與Kafka的鏈接,系統接收不到Kafka的數據,直到從新分配成功。若是網絡質量不佳,而且Receiver的個數較多,這種策略會形成數據輸入不穩定,不少Spark Streaming用戶遇到這樣的問題。在咱們的系統中,該策略並無產生明顯的負面影響。值得注意的是,Kafka 客戶端與Zookeeper有個默認的參數zookeeper.session.timeout.ms=6000,表示客戶端與Zookeeper鏈接的session有效時間爲6秒,咱們的客戶端屢次出現由於Full GC超過6秒而與Zookeeper斷開鏈接,以後再次鏈接上,期間全部客戶端都受到影響,系統表現不穩定。因此項目中設置參數zookeeper.session.timeout.ms=30000。
在Hulu內部,Spark Streaming這樣的長時服務與MapRedue、Spark、Hive等批處理應用共享YARN集羣資源。在共享環境中,常常因一個批處理應用佔用大量網絡資源或者CPU資源致使Spark Streaming服務不穩定(儘管咱們採用了CGroup進行資源隔離,但效果不佳)。更嚴重的問題是,若是個別Container崩潰Driver須要向YARN申請新的Container,或者若是整個應用崩潰須要重啓,Spark Streaming不能保證很快申請到足夠的資源,也就沒法保證線上服務的質量。爲解決該問題,Hulu使用label-based scheduling的調度策略,從YARN集羣中隔離出若干節點專門運行Spark Streaming和其餘長時服務,避免與批處理程序競爭資源。
監控反映系統運行的性能狀態,也是一切優化的基礎。 Hulu使用Graphite和Grafana做爲第三方監控系統,本項目把系統中關鍵的性能參數(如計算時長和次數)發送給Graphite服務器,就可以在Grafana網頁上看到直觀的統計圖。
圖4:Graphite監控信息,展現了Kafka中日誌的剩餘數量,一條線對應於一個partition的歷史餘量
圖4是統計Kafka中日誌的剩餘數量,一條線對應於一個partition的歷史餘量,大部分狀況下餘量接近零,符合預期。圖中09:55左右日誌餘量開始出現很明顯的尖峯,以後又迅速逼近零。過後通過多種數據覈對,證明Kafka的數據一直穩定,而當時Spark Streaming執行做業忽然變慢,反壓機制生效,因而Kafka Receiver減少讀取日誌的速率,形成Kafka數據積壓;一段時間以後Spark Streaming又恢復正常,快速消耗了Kafka中的數據餘量。
直觀的監控系統能有效地暴露問題,進而理解和強化系統。 在咱們的實踐中,主要的監控指標有:
q Kafka的剩餘數據量
q Spark的做業運行時間和調度時間
q 每一個Task的計算時間
q Codis的訪問次數、時間、命中率
另外,有腳本按期分析這些統計數據,出現異常則發郵件報警。好比圖4中 Kafka 的日誌餘量過大時,會有連續的報警郵件。咱們的經驗是,監控越細緻,以後的優化工做越輕鬆。
下表列出本項目中比較關鍵的幾個參數:
spark.yarn.max.executor.failures |
Executor容許的失敗上限;若是超過該上限,整個Spark Streaming會失敗,須要設置比較大 |
spark.yarn.executor.memoryOverhead |
Executor中JVM的開銷,與堆內存不同,設置過小會致使內存溢出異常 |
spark.receivers.num |
Kafka Receiver的個數 |
spark.streaming.receiver.maxRate |
每一個Receiver可以接受數據的最大速率;這個值超過峯值約50% |
spark.streaming.backpressure.enabled |
反壓機制;若是目前系統的延遲較長,Receiver端會自動減少接受數據的速率,避免系統因數據積壓過多而崩潰 |
spark.locality.wait |
系統調度Task會盡可能考慮數據的局部性,若是超過spark.locality.wait設置時間的上限,就放棄局部性;該參數直接影響Task的調度時間 |
spark.cleaner.ttl |
Spark系統內部的元信息的超時時間;Streaming長期運行,元信息累積太多會影響性能 |
總結
Spark Streaming的產品上線運行一年多,期間進行了屢次Spark版本升級,從最先期的0.8版本到最近的 1.5.x版本。整體上Spark Streaming是一款優秀的實時計算框架,能夠在線上使用 。但仍然存在一些不足,包括:Spark同時使用堆內和堆外的內存,缺少有效的監控,遇到OOM時分析和調試比較困難;缺乏Executor初始化接口; 新版本的Spark有一些異常,如Shuffle過程當中Block丟失、內存溢出。