Spark使用CombineTextInputFormat緩解小文件過多致使Task數目過多的問題

目前平臺使用Kafka + Flume的方式進行實時數據接入,Kafka中的數據由業務方負責寫入,這些數據一部分由Spark Streaming進行流式計算;另外一部分數據則經由Flume存儲至HDFS,用於數據挖掘或機器學習。HDFS存儲數據時目錄的最小邏輯單位爲「小時」,爲了保證數據計算過程當中的數據完整性(計算某個小時目錄中的數據時,該目錄的數據所有寫入完畢,且再也不變化),咱們在Flume中加入了以下策略:
 
每五分鐘關閉一次正在寫入的文件,即新建立文件進行數據寫入。
 
這樣的方式能夠保證,當前小時的第五分鐘以後就能夠開始計算上一小時目錄中的數據,必定程度上提升了離線數據處理的實時性。
 
隨着業務的增長,開始有業務方反饋:「HDFS中實際被分析的數據量很小,可是Spark App的Task數目卻至關多,不太正常」,咱們跟進以後,發現問題的根源在於如下三個方面:
 
(1)Kafka的實時數據寫入量比較小;
(2)Flume部署多個實例,同時消費Kafka中的數據並寫入HDFS;
(3)Flume每五分鐘會從新建立文件寫入數據(如上所述);
 
這樣的場景直接致使HDFS中存儲着數目衆多但單個文件數據量很小的狀況,間接影響着Spark App Task的數目。
 
咱們以Spark WordCount爲例進行說明,Spark版本爲1.5.1。
 
假設HDFS目錄「/user/yurun/spark/textfile」中存在如下文件:
 
 
這個目錄下僅三個文件包含少許數據:part-0000五、part-000十、part-00015,數據大小均爲6 Byte,其他文件數據大小均爲0 Byte,符合小文件的場景。
 
注意:_SUCCESS至關於一個「隱藏」文件,實際處理時一般會被忽略。
 
常規實現
 
 
咱們使用SparkContext textFile完成數據輸入,應用運行完成以後,經過Spark History Server的頁面能夠看到:應用執行過程當中,會產生一個Job,包含兩個Stage,每一個Stage包含16個Task,也就是說,Task的總數目爲32,以下圖所示:
 
 
之因此每一個Stage包含16個Task,是由於目錄中存有16個文本文件(_SUCCESS不參與計算)。
 
優化實現
 
 
在這個優化的版本中,咱們使用SparkContext newAPIHadoopFile完成數據輸入,須要着重說明一下「org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat」,這個類能夠將多個小文件合併生成一個Split,而一個Split會被一個Task處理,從而減小Task的數目。這個應用的執行過程當中,會產生兩個Job,其中Job0包含一個Stage,一個Task;Job1包含兩個Stage,每一個Stage包含一個Task,也就是說,Task的總數目爲3,以下圖所示:
 
 
 
能夠看出,經過使用「org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat」能夠很大程度上緩解小文件致使Spark App Task數目過多的問題。 
相關文章
相關標籤/搜索