Accumulator是spark提供的累加器,顧名思義,該變量只可以增長。
只有driver能獲取到Accumulator的值(使用value方法),Task只能對其作增長操做(使用 +=)。你也能夠在爲Accumulator命名(不支持Python),這樣就會在spark web ui中顯示,能夠幫助你瞭解程序運行的狀況。web
舉個最簡單的accumulator的使用例子:緩存
//在driver中定義 val accum = sc.accumulator(0, "Example Accumulator") //在task中進行累加 sc.parallelize(1 to 10).foreach(x=> accum += 1) //在driver中輸出 accum.value //結果將返回10 res: 10
val accum= sc.accumulator(0, "Error Accumulator") val data = sc.parallelize(1 to 10) //用accumulator統計偶數出現的次數,同時偶數返回0,奇數返回1 val newData = data.map{x => { if(x%2 == 0){ accum += 1 0 }else 1 }} //使用action操做觸發執行 newData.count //此時accum的值爲5,是咱們要的結果 accum.value //繼續操做,查看剛纔變更的數據,foreach也是action操做 newData.foreach(println) //上個步驟沒有進行累計器操做,但是累加器此時的結果已是10了 //這並非咱們想要的結果 accum.value
官方對這個問題的解釋以下描述:app
For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.ide
咱們都知道,spark中的一系列transform操做會構成一串長的任務鏈,此時須要經過一個action操做來觸發,accumulator也是同樣。所以在一個action操做以前,你調用value方法查看其數值,確定是沒有任何變化的。ui
因此在第一次count(action操做)以後,咱們發現累加器的數值變成了5,是咱們要的答案。spa
以後又對新產生的的newData進行了一次foreach(action操做),其實這個時候又執行了一次map(transform)操做,因此累加器又增長了5。最終得到的結果變成了10。rest
看了上面的分析,你們都有這種印象了,那就是使用累加器的過程當中只能使用一次action的操做才能保證結果的準確性。code
事實上,仍是有解決方案的,只要將任務之間的依賴關係切斷就能夠了。什麼方法有這種功能呢?大家確定都想到了,cache,persist。調用這個方法的時候會將以前的依賴切除,後續的累加器就不會再被以前的transfrom操做影響到了。orm
// val accum= sc.accumulator(0, "Error Accumulator") val data = sc.parallelize(1 to 10) //代碼和上方相同 val newData = data.map{x => {...}} //使用cache緩存數據,切斷依賴。 newData.cache.count //此時accum的值爲5 accum.value newData.foreach(println) //此時的accum依舊是5 accum.value