Spark 共享變量

翻譯 Spark 共享變量部分的官方文檔(Spark 2.4.3)。html

一般,當傳遞給 Spark 操做 (如 map 或 reduce ) 的函數在遠程集羣節點上執行時,在函數中使用的全部外部變量都是單獨拷貝的變量副本。這些變量被複制到每臺機器上,對遠程機器上的變量更新不會傳播回驅動程序。支持通用的、任務間的讀寫共享變量是很低效的。不過,Spark確實爲兩種常見的使用模式提供了兩種有限的共享變量類型:廣播變量累加器java

1、廣播變量

廣播變量容許程序員在每臺機器上保留一個只讀變量,而不是給每一個 task 都發送一份它的副本。 例如,它們可用於使用一個有效的方式爲每一個節點提供很大的輸入數據集的副本。 Spark 還嘗試使用有效的廣播算法來分發廣播變量,以下降通訊成本。程序員

Spark 的全部 action 操做都是貫穿着不少個 stage 的,這些 stage 由 shuffle 操做進行劃分。 Spark 自動廣播每一個 stage 中任務所需的公共數據。以這種方式廣播的數據是以序列化形式緩存並在運行每一個 task 以前進行反序列化。因此,廣播變量在多個 stage 中的全部 task 都須要一份一樣的數據這樣的場景中頗有用。web

廣播變量是經過 SparkContext.broadcast(v) 這樣的方式建立的。它是將原始變量 v 包裹到本身封裝的變量中去,而後經過 .value() 這個方法獲取原始變量的值,代碼以下:算法

Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});

broadcastVar.value();
// returns [1, 2, 3]
複製代碼

當廣播變量被建立以後,在集羣上全部的計算函數中都會使用廣播變量去計算,所以原始變量 v 就不須要屢次被複制到不少個節點上了。另外,原始變量 v 在廣播變量被建立以後不可再被修改,若是在廣播變量建立以後再去修改原始變量 v 會致使集羣中每一個節點拿到的共享變量值不同。apache

2、累加器

累加器內部是經過關聯和交換操做實現 「add」 操做的變量,所以能夠併發執行。它能夠用來實現計算器或者求和操做。Spark 自然支持數值類型的累加,程序員也能夠自定義一些新的數據類型用來累加。api

做爲用戶,你能夠建立命名或者未命名的累加器。以下圖所示,一個命名的累加器(counter) 會被展現在使用該累加器的 stage 的 web UI 上面。 Spark 會展現每個被 Tasks 表中的任一個 task 修改過的累加器的值。緩存

累加器

在 UI 中追蹤累加器的值能夠幫助理解運行中的各個 stage 的進度。併發

一個數字類型的累加器能夠經過這樣的方式建立:SparkContext.longAccumulator() 或者 SparkContext.doubleAccumulator() ,去計算 long 類型或者 double 類型的數值累加。集羣中每一個 task 在作累加計算任務的時候能夠經過調用 add 方法去實現。可是,不能夠在集羣上讀取累加器的值。只有在 driver 程序中才能夠讀取累加器的值,經過 value 這個方法。函數

下面的這段代碼是用累加器去將一個 array 中的每一個元素相加:

LongAccumulator accum = jsc.sc().longAccumulator();

sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

accum.value();
// returns 10
複製代碼

除了在代碼中使用內建的 long 類型的累加器以外,程序員也能夠經過繼承 AccumulatorV2 去實現想要的類型的累加器。AccumulatorV2 這個抽象類有不少個方法須要去重寫,如:reset 方法(用來將累加器置零的)、add 方法(用來和另一個值作累加的)、merge 方法(用來合併另一個相同類型的累加器到該累加器的)。其它須要被重寫的方法能夠參考 API documentation 。好比,咱們能夠自定義一個累加器 MyVector 表明數學中的向量集合,能夠這麼寫:

class VectorAccumulatorV2 implements AccumulatorV2<MyVector, MyVector> {

  private MyVector myVector = MyVector.createZeroVector();

  public void reset() {
    myVector.reset();
  }

  public void add(MyVector v) {
    myVector.add(v);
  }
  ...
}

// Then, create an Accumulator of this type:
VectorAccumulatorV2 myVectorAcc = new VectorAccumulatorV2();
// Then, register it into spark context:
jsc.sc().register(myVectorAcc, "MyVectorAcc1");
複製代碼

注意一點,當程序員自定義了一些數據類型的累加器以後,累加器的值的數據類型能夠和你相加的元素類型不一致。

注意 :當 Spark 的 task 計算任務結束後,Spark 將會嘗試着將這個 task 中全部的累加計算合併到一個累加器上去。若是合併失敗,Spark 會忽略此次失敗,仍然認爲這個 task 的計算任務是成功的,而且繼續跑其它的 task。因此,一個有 bug 的累加器將不會影響 Spark 的做業,但一個累加器可能會在整個 Spark 做業成功跑完以後尚未成功更新到最新的值。

因爲累加器的 update 操做只會在 action 算子內部執行,Spark 保證了每一個 task 對累加器的更新操做只有一次。好比重啓 task 不會更新累加器的值。在 transform 算子操做時,用戶須要知道每一個 task 對累加器的更新操做可能不止一次,好比一但某個 task 或者 job 的 stage 被從新執行。

累加器不會改變 Spark 的 lazy 特性。若是累加器的值在 RDD 某個操做中被更新了,他們的值只會在這個 RDD 的某個 action 操做的某個部分計算中更新。所以,累加器的更新不會保證在像 map 這樣的算子中被馬上更新。能夠看下面的代碼片斷:

LongAccumulator accum = jsc.sc().longAccumulator();
data.map(x -> { accum.add(x); return f(x); });
// Here, accum is still 0 because no actions have caused the `map` to be computed.
複製代碼
相關文章
相關標籤/搜索