Spark筆記之累加器(Accumulator)

 

1、累加器簡介

在Spark中若是想在Task計算的時候統計某些事件的數量,使用filter/reduce也能夠,可是使用累加器是一種更方便的方式,累加器一個比較經典的應用場景是用來在Spark Streaming應用中記錄某些事件的數量。html

使用累加器時須要注意只有Driver可以取到累加器的值,Task端進行的是累加操做。java

建立的Accumulator變量的值可以在Spark Web UI上看到,在建立時應該儘可能爲其命名,下面探討如何在Spark Web UI上查看累加器的值。sql

示例代碼:apache

package cc11001100.spark.sharedVariables.accumulators;

import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;

import java.util.Collections;
import java.util.concurrent.TimeUnit;


/**
 * @author CC11001100
 */
public class SparkWebUIShowAccumulatorDemo {

	public static void main(String[] args) {

		SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();
		LongAccumulator fooCount = spark.sparkContext().longAccumulator("fooCount");

		spark.createDataset(Collections.singletonList(1024), Encoders.INT())
				.foreach((ForeachFunction<Integer>) fooCount::add);

		try {
			TimeUnit.DAYS.sleep(365 * 10000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

	}

}

啓動的時候注意觀察控制檯上輸出的Spark Web UI的地址:api

image

打開此連接,點進去Jobs-->Stage,能夠看到fooCount累加器的值已經被累加到了1024:ide

image 

 

2、Accumulator的簡單使用

Spark內置了三種類型的Accumulator,分別是LongAccumulator用來累加整數型,DoubleAccumulator用來累加浮點型,CollectionAccumulator用來累加集合元素。ui

package cc11001100.spark.sharedVariables.accumulators;

import org.apache.spark.SparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.CollectionAccumulator;
import org.apache.spark.util.DoubleAccumulator;
import org.apache.spark.util.LongAccumulator;

import java.util.Arrays;


/**
 * 累加器的基本使用
 *
 * @author CC11001100
 */
public class AccumulatorsSimpleUseDemo {

	public static void main(String[] args) {

		SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();
		SparkContext sc = spark.sparkContext();

		// 內置的累加器有三種,LongAccumulator、DoubleAccumulator、CollectionAccumulator
		// LongAccumulator: 數值型累加
		LongAccumulator longAccumulator = sc.longAccumulator("long-account");
		// DoubleAccumulator: 小數型累加
		DoubleAccumulator doubleAccumulator = sc.doubleAccumulator("double-account");
		// CollectionAccumulator:集合累加
		CollectionAccumulator<Integer> collectionAccumulator = sc.collectionAccumulator("double-account");

		Dataset<Integer> num1 = spark.createDataset(Arrays.asList(1, 2, 3), Encoders.INT());
		Dataset<Integer> num2 = num1.map((MapFunction<Integer, Integer>) x -> {
			longAccumulator.add(x);
			doubleAccumulator.add(x);
			collectionAccumulator.add(x);
			return x;
		}, Encoders.INT()).cache();

		num2.count();

		System.out.println("longAccumulator: " + longAccumulator.value());
		System.out.println("doubleAccumulator: " + doubleAccumulator.value());
		// 注意,集合中元素的順序是沒法保證的,多運行幾回發現每次元素的順序均可能會變化
		System.out.println("collectionAccumulator: " + collectionAccumulator.value());

	}

}

 

3、自定義Accumulator

當內置的Accumulator沒法知足要求時,能夠繼承AccumulatorV2實現自定義的累加器。this

實現自定義累加器的步驟:spa

1. 繼承AccumulatorV2,實現相關方法線程

2. 建立自定義Accumulator的實例,而後在SparkContext上註冊它

 

假設要累加的數很是大,內置的LongAccumulator已經沒法知足需求,下面是一個簡單的例子用來累加BigInteger:

package cc11001100.spark.sharedVariables.accumulators;

import org.apache.spark.SparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.AccumulatorV2;

import java.math.BigInteger;
import java.util.Arrays;
import java.util.List;

/**
 * 自定義累加器
 *
 * @author CC11001100
 */
public class CustomAccumulatorDemo {

	// 須要注意的是累加操做不能依賴順序,好比相似於StringAccumulator這種則會獲得錯誤的結果
	public static class BigIntegerAccumulator extends AccumulatorV2<BigInteger, BigInteger> {

		private BigInteger num = BigInteger.ZERO;

		public BigIntegerAccumulator() {
		}

		public BigIntegerAccumulator(BigInteger num) {
			this.num = new BigInteger(num.toString());
		}

		@Override
		public boolean isZero() {
			return num.compareTo(BigInteger.ZERO) == 0;
		}

		@Override
		public AccumulatorV2<BigInteger, BigInteger> copy() {
			return new BigIntegerAccumulator(num);
		}

		@Override
		public void reset() {
			num = BigInteger.ZERO;
		}

		@Override
		public void add(BigInteger num) {
			this.num = this.num.add(num);
		}

		@Override
		public void merge(AccumulatorV2<BigInteger, BigInteger> other) {
			num = num.add(other.value());
		}

		@Override
		public BigInteger value() {
			return num;
		}
	}

	public static void main(String[] args) {

		SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();
		SparkContext sc = spark.sparkContext();

		// 直接new自定義的累加器
		BigIntegerAccumulator bigIntegerAccumulator = new BigIntegerAccumulator();
		// 而後在SparkContext上註冊一下
		sc.register(bigIntegerAccumulator, "bigIntegerAccumulator");

		List<BigInteger> numList = Arrays.asList(new BigInteger("9999999999999999999999"), new BigInteger("9999999999999999999999"), new BigInteger("9999999999999999999999"));
		Dataset<BigInteger> num = spark.createDataset(numList, Encoders.kryo(BigInteger.class));
		Dataset<BigInteger> num2 = num.map((MapFunction<BigInteger, BigInteger>) x -> {
			bigIntegerAccumulator.add(x);
			return x;
		}, Encoders.kryo(BigInteger.class));

		num2.count();
		System.out.println("bigIntegerAccumulator: " + bigIntegerAccumulator.value());

	}

}

思考:內置的累加器LongAccumulator、DoubleAccumulator、CollectionAccumulator和我上面的自定義BigIntegerAccumulator,它們都有一個共同的特色,就是最終的結果不受累加數據順序的影響(對於CollectionAccumulator來講,能夠簡單的將結果集看作是一個無序Set),看到網上有博主舉例子StringAccumulator,這個就是一個錯誤的例子,就至關於開了一百個線程,每一個線程隨機sleep若干毫秒而後往StringBuffer中追加字符,最後追加出來的字符串是沒法被預測的。總結一下就是累加器的最終結果應該不受累加順序的影響,不然就要從新審視一下這個累加器的設計是否合理。

 

4、使用Accumulator的陷阱

來討論一下使用累加器的一些陷阱,累加器的累加是在Task中進行的,而這些Task就是咱們在Dataset上調用的一些算子操做,這些算子操做有Transform的,也有Action的,來探討一下不一樣類型的算子對Accumulator有什麼影響。

package cc11001100.spark.sharedVariables.accumulators;

import org.apache.spark.SparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;

import java.util.Arrays;

/**
 * 累加器使用的陷阱
 *
 * @author CC11001100
 */
public class AccumulatorTrapDemo {

	public static void main(String[] args) {

		SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();
		SparkContext sc = spark.sparkContext();
		LongAccumulator longAccumulator = sc.longAccumulator("long-account");

		// ------------------------------- 在transform算子中的錯誤使用 -------------------------------------------

		Dataset<Integer> num1 = spark.createDataset(Arrays.asList(1, 2, 3), Encoders.INT());
		Dataset<Integer> nums2 = num1.map((MapFunction<Integer, Integer>) x -> {
			longAccumulator.add(1);
			return x;
		}, Encoders.INT());

		// 由於沒有Action操做,nums.map並無被執行,所以此時廣播變量的值仍是0
		System.out.println("num2 1: " + longAccumulator.value()); // 0

		// 調用一次action操做,num.map獲得執行,廣播變量被改變
		nums2.count();
		System.out.println("num2 2: " + longAccumulator.value());  // 3

		// 又調用了一次Action操做,廣播變量所在的map又被執行了一次,因此累加器又被累加了一遍,就悲劇了
		nums2.count();
		System.out.println("num2 3: " + longAccumulator.value()); // 6

		// ------------------------------- 在transform算子中的正確使用 -------------------------------------------

		// 累加器不該該被重複使用,或者在合適的時候進行cache斷開與以前Dataset的血緣關係,由於cache了就沒必要重複計算了
		longAccumulator.setValue(0);
		Dataset<Integer> nums3 = num1.map((MapFunction<Integer, Integer>) x -> {
			longAccumulator.add(1);
			return x;
		}, Encoders.INT()).cache(); // 注意這個地方進行了cache

		// 由於沒有Action操做,nums.map並無被執行,所以此時廣播變量的值仍是0
		System.out.println("num3 1: " + longAccumulator.value()); // 0

		// 調用一次action操做,廣播變量被改變
		nums3.count();
		System.out.println("num3 2: " + longAccumulator.value());  // 3

		// 又調用了一次Action操做,由於前一次調用count時num3已經被cache,num2.map不會被再執行一遍,因此這裏的值仍是3
		nums3.count();
		System.out.println("num3 3: " + longAccumulator.value()); // 3

		// ------------------------------- 在action算子中的使用 -------------------------------------------
		longAccumulator.setValue(0);
		num1.foreach(x -> {
			longAccumulator.add(1);
		});
		// 由於是Action操做,會被當即執行因此打印的結果是符合預期的
		System.out.println("num4: " + longAccumulator.value()); // 3

	}

}

 

5、Accumulator使用的奇淫技巧

累加器並非只能用來實現加法,也能夠用來實現減法,直接把要累加的數值改爲負數就能夠了:

package cc11001100.spark.sharedVariables.accumulators;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;

import java.util.Arrays;

/**
 * 使用累加器實現減法
 *
 * @author CC11001100
 */
public class AccumulatorSubtraction {

	public static void main(String[] args) {

		SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();
		Dataset<Integer> nums = spark.createDataset(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), Encoders.INT());
		LongAccumulator longAccumulator = spark.sparkContext().longAccumulator("AccumulatorSubtraction");

		nums.foreach(x -> {
			if (x % 3 == 0) {
				longAccumulator.add(-2);
			} else {
				longAccumulator.add(1);
			}
		});
		System.out.println("longAccumulator: " + longAccumulator.value()); // 2

	}

}

 

相關資料:

1. Accumulators

2. When are accumulators truly reliable?

 

.

相關文章
相關標籤/搜索