咱們傳遞給Spark的函數,如map(),或者filter()的判斷條件函數,可以利用定義在函數以外的變量,可是集羣中的每個task都會獲得變量的一個副本,而且task在對變量進行的更新不會被返回給driver。而Spark的兩種共享變量:累加器(accumulator)和廣播變量(broadcast variable),在廣播和結果聚合這兩種常見類型的通訊模式上放寬了這種限制。 使用累加器能夠很簡便地對各個worker返回給driver的值進行聚合。java
因爲對於worker節點來講,累加器的值是不可訪問的,全部對於worker上的task,累加器是write-only的。這使得累加器能夠被更高效的實現,而不須要在每次更新時都進行通訊。數據庫
Spark保證:在終止操做中對累加器的操做只執行一次,而轉化操做中則可能屢次執行。apache
累加器的操做能夠多種,好比算術加法 MAX,只要這些操做符合交換律和結合律。api
累加器會屢次傳遞給Executor節點(而廣播只一次)。數組
import java.util.ArrayList; import java.util.List; import org.apache.spark.Accumulator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.broadcast.Broadcast; import scala.Tuple2; public class AccumulatorDemo { public static void main(String[] xx){ SparkConf conf = new SparkConf(); conf.setMaster("local[4]"); conf.setAppName("WordCounter"); conf.set("spark.default.parallelism", "4"); conf.set("spark.testing.memory", "2147480000"); JavaSparkContext ctx = new JavaSparkContext(conf); Person[] persons = new Person[10000]; Broadcast<Person []> persons_br = ctx.broadcast(persons); Accumulator<Integer> count = ctx.accumulator(0); List<String> data1 = new ArrayList<String>(); data1.add("Cake"); data1.add("Bread"); data1.add(""); data1.add("Cheese"); data1.add("Milk"); data1.add("Toast"); data1.add("Bread"); data1.add(""); data1.add("Egg"); data1.add(""); JavaRDD<String> rdd1 = ctx.parallelize(data1, 2); System.out.println(rdd1.glom().collect()); rdd1.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { long id = Thread.currentThread().getId(); System.out.println("s:" + s + " in thread:" + id); if(s.equals("")){ count.add(1); } return new Tuple2<String, Integer>(s, 1); } }).collect(); System.out.println(count.value()); rdd1.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { long id = Thread.currentThread().getId(); System.out.println("s:" + s + " in thread:" + id); if(s.equals("")){ count.add(1); // System.out.println("c:"+count.value()); } System.out.println(persons_br.value().length); return new Tuple2<String, Integer>(s, 1); } }).collect(); System.out.println(count.value()); ctx.stop(); } } class Person{}
import java.util.Arrays; import java.util.List; import org.apache.spark.Accumulator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.Time; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; /** * 實例:利用廣播進行黑名單過濾! 檢查新的數據 根據是否在廣播變量-黑名單內,從而實現過濾數據。 */ public class BroadCastDemo { /** * 建立一個List的廣播變量 * */ private static volatile Broadcast<List<String>> broadcastList = null; /** * 計數器! */ private static volatile Accumulator<Integer> accumulator = null; public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("WordCountOnlineBroadcast"); conf.set("spark.testing.memory", "2147480000"); JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5)); /** * 注意:分發廣播須要一個action操做觸發。 注意:廣播的是Arrays的asList * 而非對象的引用。廣播Array數組的對象引用會出錯。 使用broadcast廣播黑名單到每一個Executor中! */ broadcastList = jsc.sparkContext().broadcast(Arrays.asList("Hadoop", "Mahout", "Hive")); /** * 累加器做爲全局計數器!用於統計在線過濾了多少個黑名單! 在這裏實例化。 */ accumulator = jsc.sparkContext().accumulator(0, "OnlineBlackListCounter"); JavaReceiverInputDStream<String> lines = jsc.socketTextStream("Master", 9999); /** * 這裏省去flatmap由於名單是一個個的! */ JavaPairDStream<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) { return new Tuple2<String, Integer>(word, 1); } }); JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) { return v1 + v2; } }); /** * Funtion裏面 前幾個參數是 入參。 後面的出參。 體如今call方法裏面! * */ // wordsCount.foreach(new Function2<JavaPairRDD<String, Integer>,Time,boolean>() { // @Override // public boolean call(JavaPairRDD<String, Integer> rdd, Time time) throws Exception { // rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() { // @Override // public Boolean call(Tuple2<String, Integer> wordPair) throws Exception { // if (broadcastList.value().contains(wordPair._1)) { // /** // * accumulator不單單用來計數。 能夠同時寫進數據庫或者緩存中。 // */ // accumulator.add(wordPair._2); // return false; // } else { // return true; // } // }; // /** // * 廣播和計數器的執行,須要進行一個action操做! // */ // }).collect(); // System.out.println("廣播器裏面的值" + broadcastList.value()); // System.out.println("計時器裏面的值" + accumulator.value()); // return null; // } // }); jsc.start(); try { jsc.awaitTermination(); } catch (InterruptedException e) { // TODO 自動生成的 catch 塊 e.printStackTrace(); } jsc.close(); } }