Spark Streaming fileStream實現原理

fileStream是Spark Streaming Basic Source的一種,用於「近實時」地分析HDFS(或者與HDFS API兼容的文件系統)指定目錄(假設:dataDirectory)中新近寫入的文件,dataDirectory中的文件須要知足如下約束條件:
 
(1)這些文件格式必須相同,如:統一爲文本文件;
(2)這些文件在目錄dataDirectory中的建立形式比較特殊:必須以原子方式被「移動」或「重命名」至目錄dataDirectory中;
(3)一旦文件被「移動」或「重命名」至目錄dataDirectory中,文件不能夠被改變,例如:追加至這些文件的數據可能不會被處理。
 
之因此稱之爲「近實時」就是基於約束條件(2),文件的數據必須所有寫入完成,而且被「移動」或「重命名」至目錄dataDirectory中以後,這些文件才能夠被處理。
 
調用示例以下:
 
 
directory:指定待分析文件的目錄;
filter:用戶指定的文件過濾器,用於過濾directory中的文件;
newFilesOnly:應用程序啓動時,目錄directory中可能已經存在一些文件,若是newFilesOnly值爲true,表示忽略這些文件;若是newFilesOnly值爲false,表示須要分析這些文件;
conf:用戶指定的Hadoop相關的配置屬性;
 
注:fileStream有另外兩個重載方法,在此再也不贅述。
 
若是分析的文件是文本文件,Spark提供了一個便利的方法:
 
 
fileStream的實現原理是比較簡單的:以固定的時間間隔(duration)不斷地探測目錄(dataDirectory),每次探測時將時間段(now - duration, now]內新寫入的文件(即文件的最近修改時間處於時間區間(now - duration, now])封裝爲RDD交由Spark處理。
 
Spark Streaming有一個核心組件:DStream,fileStream的實現依賴於其中的一個實現類:FileInputDStream。
 
 
而FileInputDStream的核心邏輯就是探測文件、封裝RDD,由方法compute(重寫至DStream compute)實現,
 
 
compute方法的註釋引出了一個很重要的問題:咱們爲何須要維護一個最近已分析文件的列表?
 
假設探測目錄爲dataDirectory,探測時間間隔爲duration,當前時間爲now,則本次選擇的文件須要知足條件:文件的最近修改時間須要處於區間(now - duration, now],此時文件可能有如下幾個狀態:
 
(1)文件的最後修改時間小於或等於now - duration;
(2)文件的最後修改時間處於區間(now - duration, now);
(3)文件的最後修改時間等於now;
(4)文件的最後修改時間大於now;
 
考慮第(3)種狀況,文件的最後修改時間等於now,它有可能在探測以前已被移動至目錄dataDirectory,或者在探測時或探測完成以後被移動至目錄dataDirectory;若是是後二者,就可能會出現文件「丟失」的狀況(即文件不被處理),由於下次探測的時間點爲now + duration,探測的時間範圍爲(now, now + duration],最近修改時間等於now的文件已不處於該區間。爲了不或減小文件「丟失」的狀況,因此Spark Streaming fileStream容許將探測的時間範圍向「前」擴展爲(now - n * duration, now],以下所示:
 
 
ignore threshold:now - n * duration
current batch time:now
remember window:n * duration
 
也就是說,每一次探測時,咱們會選擇文件的最後修改時間處於區間(ignore threshold, current batch time]的文件,但其中有些文件可能在前幾回的探測中已經被分析,爲了防止出現重複分析的狀況,咱們須要記錄時間區間(ignore threshold, current batch time](remember window)內已經被分析過的文件有哪些。
 
下面咱們來分析compute的處理流程:
 
1. 尋找新的文件;
 
 
 
(1)計算ignore threshold;
 
這一步有兩個重要的變量須要說明:initialModTimeIgnoreThreshold和durationToRemember。
 
initialModTimeIgnoreThreshold
 
 
它的值與newFilesOnly有關,newFilesOnly表示Spark Streaming App剛剛啓動時是否分析目錄dataDirectory中已有的文件:
 
newFilesOnly == true:不須要分析目錄dataDirectory中已有的文件,所以initialModTimeIgnoreThreshold的值被設置爲「當前時間」,表示僅僅分析最近修改時間大於「當前時間」的文件;
 
newFilesOnly == false:須要分析目錄dataDirectory中已有的文件,所以initialModTimeIgnoreThreshold的值被設置爲0(文件的最近修改時間必大於0)。
 
durationToRemember
 
 
slideDuration:表示探測的時間間隔。
 
 
minRememberDurationS:默認值爲60s,能夠經過屬性spark.streaming.fileStream.minRememberDuration進行修改。
 
 
 
經過上面的代碼能夠看出,durationToRemember = slideDuration * math.ceil(minRememberDurationS.milliseconds.toDouble / batchDuration.milliseconds).toInt,durationToRemember就是咱們前面提到的remember window,也就是說這個時間區間內的已分析文件會被記錄。
 
 
ignore threshold取initialModTimeIgnoreThreshold、currentTime - durationToRemember.milliseconds的最大值,這也意味着即便newFilesOnly值爲false,dataDirectory中的文件也不會被所有分析,只有最近修改時間大於currentTime - durationToRemember.milliseconds的文件纔會被分析。
 
(2)建立過濾器實例;
 
 
過濾器實例實際就是Hadoop PathFilter實例,依賴於方法isNewFile構建,顧名思義這個過濾器實例是用來選取新文件的,新文件的標準須要知足如下四個條件:
 
 
a. 文件路徑匹配用戶指定的過濾器實例;
b. 文件的最近修改時間大於modTimeIgnoreThreshold;
c. 文件的最近修改時間小於或等於currentTime;
d. 文件尚沒有被分析過,即文件沒有出如今最近已分析文件的列表recentlySelectedFiles。
 
這裏須要額外說明一下c,爲何文件的最近修改時間不能大於currentTime?這主要是爲了防止Spark Streaming應用重啓時出現文件被重複分析的狀況。
 
假設應用的終止時間爲time,重啓時間爲time + 5 * duration,recentlySelectedFiles僅保存最近一個duration已經被分析過的文件,即保存的時間窗口爲duration;應用重啓以後,第一次探測的時間爲time + duration,若是容許文件的最近修改時間大於currentTime(即time + duration),則最近修改時間處於時間區間(time, +∞)的文件將所有被分析,這些文件被記入recentlySelectedFiles;第二次探測的時間爲time + 2 * duration,由於recentlySelectedFiles的時間窗口爲duration,此時能夠認爲它的值已經被清空,若是容許文件的最近修改時間大於currentTime(即time + 2 * duration),則最近修改時間處於時間區間(time + 2 * duration, +∞)的文件將所有被分析,這種狀況下能夠看出最近修改時間處於時間區間(time + 2 * duration, +∞)的文件被重複分析;此外探測時間爲time + 3 * duration、time + 4 * duration、time + 5 * duration也將出現相似文件被重複分析的狀況。綜上所述,每次探測文件時,文件的最近修改時間不能大於currentTime。
 
(3)獲取知足過濾器實例條件的文件路徑;
 
 
至此,尋找「新」文件的流程結束。
 
2. 將找到的新文件加入已分析文件列表;
 
 
recentlySelectedFiles中的過時數據是由方法clearMetadata負責清理的。
 
3. 將找到的新文件封裝爲RDD;
 
 
 
(1)遍歷新文件(路徑),將每個新文件(路徑)經過SparkContext newAPIHadoopFile轉換爲一個RDD,最後造成一個RDD列表:fileRDDs;
(2)將fileRDDs轉換爲一個UnionRDD並返回;
 
至此,compute的整個處理流程結束。能夠看出,整個流程中最爲複雜的部分就是每次探測新文件的過程,特別是時間區間的選取以及最近已分析文件的緩存。
相關文章
相關標籤/搜索