Spark相對於Hadoop MapReduce有一個很顯著的特性就是「迭代計算」(做爲一個MapReduce的忠實粉絲,能這樣說,你們都懂了吧),這在咱們的業務場景裏真的是很是有用。
假設咱們有一個文本文件「datas」,每一行有三列數據,以「\t」分隔,模擬生成文件的代碼以下:
執行該代碼以後,文本文件會存儲於本地路徑:/tmp/datas,它包含1000行測試數據,將其上傳至咱們的測試Hadoop集羣,路徑:/user/yurun/datas,命令以下:
查詢一下它的狀態:
咱們經過Spark SQL API將其註冊爲一張表,代碼以下:
表的名稱爲source,它有三列,列名分別爲:col一、col二、col3,類型都爲字符串(str),測試打印其前10行數據:
假設咱們的分析需求以下:
(1)過濾條件:col1 = ‘col1_50',以col2爲分組,求col3的最大值;
(2)過濾條件:col1 = 'col1_50',以col3爲分組,求col2的最小值;
注意:需求是否是很變態,再次注意咱們只是模擬。
經過狀況下咱們能夠這麼作:
每個collect()(Action)都會產生一個Spark Job,
由於這兩個需求的處理邏輯是相似的,它們都有兩個Stage:
能夠看出這兩個Job的數據輸入量是一致的,根據輸入量的具體數值,咱們能夠推斷出這兩個Job都是直接從原始數據(文本文件)計算的。
這種狀況在Hive(MapReduce)的世界裏是很難優化的,處理邏輯雖然簡單,卻沒法使用一條SQL語句表述(有的是由於分析邏輯複雜,有的則由於各個處理邏輯的結果須要獨立存儲),只能一個需求對應一(多)條SQL語句(如上示例),帶來的問題就是全量原始數據屢次被分析,在海量數據的場景下必然帶來集羣資源的巨大浪費。
其實這兩個需求有一個共同點:過濾條件相同(col1 = 'col1_50'),一個很天然的想法就是將知足過濾條件的數據緩存,而後在緩存數據之上執行計算,Spark爲咱們作到了這一點。
依然是兩個Job,每一個Job仍然是兩個Stage,但這兩個Stage的輸入數據量(Input)已發生變化:
Job1的Input(數據輸入量)仍然是63.5KB,是由於「cacheTable」僅僅在RDD(cacheRDD)第一次被觸發計算並執行完成以後纔會生效,所以Job1的Input是63.5KB;而Job2執行時「cacheTable」已生效,直接輸入緩存中的數據便可,所以Job2的Input減小爲3.4KB,並且由於所需緩存的數據量小,能夠徹底被緩存於內存中,所以效率極高。
咱們也能夠從Spark相關頁面中確認「cache」確實生效:
咱們也須要注意cacheTable與uncacheTable的使用時機,cacheTable主要用於緩存中間表結果,它的特色是少許數據且被後續計算(SQL)頻繁使用;若是中間表結果使用完畢,咱們應該當即使用uncacheTable釋放緩存空間,用於緩存其它數據(示例中註釋uncacheTable操做,是爲了頁面中能夠清楚看到表被緩存的效果)。