sparkSQL中cache的若干問題

摘要

sparkSQL在使用cache緩存的時候,有時候緩存可能不起做用,可能會發出緩存是假的吧的感慨。如今咱們就把這個問題說道說道。
問題sql

場景描述

當咱們經過spark進行統計和處理數據時,發現他是延遲計算的,若是一個應用中出現多個action,而這多個action處理同一個數據源數據時,數據源用時間來過濾數據時,因爲有多個action操做,遇到每一個action就是一個job,每個action都會執行數據源獲取數據的操做,因爲兩個action之間的操做存在時間差,這兩個action獲取的數據有可能不一致。
例以下例
test1表中的數據緩存

1,2018-07-01 10:10:03
2,2018-07-01 11:12:04app

代碼以下操做
val odsData = spark.sql("""
select
from default.test1
where time < "2018-07-02"
""")
val targetData = odsData.map(fun _)
val targetData.createOrReplaceTempView("data1")
//第一個Action操做
val spark.sql("""
insert overwrite table default.test2
*
from data1
""")ide

val targetData1 = odsData.map(fun2 _) //引用同一個數據源
targetData1.createOrReplaceTempView("data2")
//第二個action操做
val spark.sql("""
insert table default.test2
*
from data2
""")oop

若是在運行第二個Action操做前,test1表中又增長了一條記錄3,2018-07-01 13:12:04
即執行第一個Action時記錄仍是兩條1和2,而再執行完第一個Action後而又執行第二個Action以前,
增長了一個新的單子:3,2018-07-01 13:12:04
那麼在test2表中的數據是怎麼樣的呢?
第一種狀況(由於第二個action是insert而不是insert overwrite)

1,2018-07-01 10:10:03
2,2018-07-01 11:12:04
1,2018-07-01 10:10:03
2,2018-07-01 11:12:04spa

第二種狀況

1,2018-07-01 10:10:03
2,2018-07-01 11:12:04
1,2018-07-01 10:10:03
2,2018-07-01 11:12:04
3,2018-07-01 13:12:043d

結果分析

結果是第二中狀況。若是認爲是第一種狀況的對spark的執行計劃仍是不太熟悉。首先spark是lazy計算的,即不觸發action操做,其實不提交做業的。而在這個application中存在兩個action,而這兩個aciton使用了同一個數據源的rdd,應該稱爲變量odsData,當遇到第一個action,其會把本身這個執行鏈上的rdd都執行一遍,包括執行odsData,而遇到第二個aciton的時候,其也會把本身的執行鏈上的數據又執行了一遍包括odsData,並從數據源中從新取數。有人會疑惑,第一個action在執行的時候,已經執行了odsData,這個RDD的結果不該該緩存起來嗎?我的認爲,spark尚未那麼的智能,而且網上常常說的job,stage,rdd,task的劃分應該是在同一個job內進行的。而同一個應用中誇job的stage拆分是不存在的。那麼出現這個結果應該怎麼辦呢?
cache的出場日誌

當出現這樣的狀況時,個人應用天天就會漏幾十條數據,非常煩人,最後發現了上面的問題,當時想解決方案時,第一個就是想到了cache,我把第一次執行Action操做時,把odsData給緩存了,這樣應該不會有什麼問題了吧。從而能夠保證兩個action操做,同一個數據源的數據一致性。只能說too young to sample了。這樣解決不了上面出現的問題。一樣以一個例子來看。
test表中的數據:blog

1 2017-01-01 01:00:00 2016-05-04 9999-12-31
2 2017-01-01 02:00:00 2016-01-01 9999-12-31
代碼:ip

val curentData = spark.sql(
"""
|select
|*
|from default.test
""".stripMargin)

curentData.cache() //緩存咱們的結果

curentData.createOrReplaceTempView("dwData")

//第一個Action
spark.sql(
"""
|INSERT OVERWRITE TABLE default.test1
|SELECT
|
|FROM dwData
""".stripMargin)
//改變數據源表test表的數據而且是第二個Action
spark.sql(
"""
|INSERT OVERWRITE TABLE default.test
|SELECT
| 1,
| "2017",
| "2018",
| "2018"
|FROM default.test
""".stripMargin)
//第三個Action和第一個Action同數據源,而且cache第一次運行的結果。
spark.sql(
"""
|INSERT OVERWRITE TABLE default.test1
|SELECT
|

|FROM dwData
""".stripMargin)
那麼test1表中的結果
第一種狀況:
1 2017-01-01 01:00:00 2016-05-04 9999-12-31
2 2017-01-01 02:00:00 2016-01-01 9999-12-31
第二種狀況

1 2017 2018 2018
1 2017 2018 2018
結果分析

結果是第二種狀況,也就是說咱們cache根本就沒有起到效果,或者說第三個Action根本就沒有使用咱們cache的數據。此次我把日誌都打出來了啊。
第一個Action的聲明週期:
sparkSQL中cache的若干問題

第三個Action的日誌:
sparkSQL中cache的若干問題

從這兩個日誌能夠看出,咱們設置cache其只能在同一個job中生效。而誇job的使用這樣的數據緩存數據是不存在的。
若是想更加詳細的瞭解cache的原理和做用,能夠去網上搜,大把大把的資料,可是必定要記住,網上說的要限定一個條件,在同一個job內的rdd,誇job的cache是不存在的。
解決方案

咱們最終但願解決的事,當兩個action想要使用同一個數據源的rdd的時候,如何保證其數據的一致性。
方案:
把第一個Action算子用到的數據源給寫入到一個臨時表中
而後再第二個Action中,直接讀取臨時表的數據,而不是直接使用odsData
更好的方案尚未想好,能夠根據業務的不一樣來搞。

第二個方案如今就是咱們使用spark提供的checkpoint機制,checkpoint會把咱們的數據自動緩存到hdfs,它就會把這個rdd之前的父rdd的數據所有刪除,之後無論哪一個job的rdd須要使用這個rdd的數據時,都會從這個checkpoin的目錄中讀取數據。spark.sparkContext.setCheckpointDir("hdfs://hadoop-1:5000/hanfangfang")curentData.cache().checkpoint這樣就可使不一樣的job,同一個數據源數據的一致性。同時咱們也要記住,當程序運行完成,其不會刪除checkpoint的數據的,須要們手動刪除。

相關文章
相關標籤/搜索