廣播變量&累加變量

一、 廣播&累加器

咱們傳遞給Spark的函數,如map(),或者filter()的判斷條件函數,可以利用定義在函數以外的變量,可是集羣中的每個task都會獲得變量的一個副本,而且task在對變量進行的更新不會被返回給driver。而Spark的兩種共享變量:累加器(accumulator)和廣播變量(broadcast variable),在廣播和結果聚合這兩種常見類型的通訊模式上放寬了這種限制。 使用累加器能夠很簡便地對各個worker返回給driver的值進行聚合。java

因爲對於worker節點來講,累加器的值是不可訪問的,全部對於worker上的task,累加器是write-only的。這使得累加器能夠被更高效的實現,而不須要在每次更新時都進行通訊。數據庫

Spark保證:在終止操做中對累加器的操做只執行一次,而轉化操做中則可能屢次執行。apache

 

累加器的操做能夠多種,好比算術加法 MAX,只要這些操做符合交換律和結合律。api

 

   累加器會屢次傳遞給Executor節點(而廣播只一次)。數組

 

1. Accumulator

import java.util.ArrayList;
import java.util.List;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import scala.Tuple2;
public class AccumulatorDemo {
    public static void main(String[] xx){
    	SparkConf conf = new SparkConf();
    	conf.setMaster("local[4]");
    	conf.setAppName("WordCounter");
    	conf.set("spark.default.parallelism", "4");
    	conf.set("spark.testing.memory", "2147480000");
    	JavaSparkContext ctx = new JavaSparkContext(conf);    	
    	Person[] persons = new Person[10000];
    	Broadcast<Person []> persons_br =  ctx.broadcast(persons);    	
    	Accumulator<Integer> count = ctx.accumulator(0);
    	List<String> data1 = new ArrayList<String>();
    	data1.add("Cake");
    	data1.add("Bread");
    	data1.add("");
    	data1.add("Cheese");
    	data1.add("Milk");    	
    	data1.add("Toast");
    	data1.add("Bread");
    	data1.add("");
    	data1.add("Egg");
    	data1.add("");
    	JavaRDD<String> rdd1 = ctx.parallelize(data1, 2);
    	System.out.println(rdd1.glom().collect());
    	
    	rdd1.mapToPair(new PairFunction<String, String, Integer>() {
			@Override 
            public Tuple2<String, Integer> call(String s) throws Exception {
				long id = Thread.currentThread().getId();
				System.out.println("s:" + s + " in thread:" + id);
				if(s.equals("")){
					count.add(1);
				}
                return new Tuple2<String, Integer>(s, 1);  
            }
		}).collect();
    	System.out.println(count.value());
    	
    	rdd1.mapToPair(new PairFunction<String, String, Integer>() {
			@Override 
            public Tuple2<String, Integer> call(String s) throws Exception {
				long id = Thread.currentThread().getId();
				System.out.println("s:" + s + " in thread:" + id);
				if(s.equals("")){
					count.add(1);
//			    	System.out.println("c:"+count.value());
				}
				System.out.println(persons_br.value().length);
                return new Tuple2<String, Integer>(s, 1);  
            }
		}).collect();

    	System.out.println(count.value());

    	ctx.stop();
    }
}
class Person{}

2.BroadCast

import java.util.Arrays;
import java.util.List;

import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

/**
 * 實例:利用廣播進行黑名單過濾! 檢查新的數據 根據是否在廣播變量-黑名單內,從而實現過濾數據。
 */
public class BroadCastDemo {
	/**
	 * 建立一個List的廣播變量
	 *
	 */
	private static volatile Broadcast<List<String>> broadcastList = null;
	/**
	 * 計數器!
	 */
	private static volatile Accumulator<Integer> accumulator = null;

	public static void main(String[] args) {
		SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("WordCountOnlineBroadcast");
		    	conf.set("spark.testing.memory", "2147480000");
		JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));

		/**
		 * 注意:分發廣播須要一個action操做觸發。 注意:廣播的是Arrays的asList
		 * 而非對象的引用。廣播Array數組的對象引用會出錯。 使用broadcast廣播黑名單到每一個Executor中!
		 */
		broadcastList = jsc.sparkContext().broadcast(Arrays.asList("Hadoop", "Mahout", "Hive"));
		/**
		 * 累加器做爲全局計數器!用於統計在線過濾了多少個黑名單! 在這裏實例化。
		 */
		accumulator = jsc.sparkContext().accumulator(0, "OnlineBlackListCounter");

		JavaReceiverInputDStream<String> lines = jsc.socketTextStream("Master", 9999);

		/**
		 * 這裏省去flatmap由於名單是一個個的!
		 */
		JavaPairDStream<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
			@Override
			public Tuple2<String, Integer> call(String word) {
				return new Tuple2<String, Integer>(word, 1);
			}
		});
		JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
			@Override
			public Integer call(Integer v1, Integer v2) {
				return v1 + v2;
			}
		});
		/**
		 * Funtion裏面 前幾個參數是 入參。 後面的出參。 體如今call方法裏面!
		 *
		 */
//		wordsCount.foreach(new Function2<JavaPairRDD<String, Integer>,Time,boolean>() {
//			@Override
//			public boolean call(JavaPairRDD<String, Integer> rdd, Time time) throws Exception {
//				rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
//					@Override
//					public Boolean call(Tuple2<String, Integer> wordPair) throws Exception {
//						if (broadcastList.value().contains(wordPair._1)) {
//							/**
//							 * accumulator不單單用來計數。 能夠同時寫進數據庫或者緩存中。
//							 */
//							accumulator.add(wordPair._2);
//							return false;
//						} else {
//							return true;
//						}
//					};
//					/**
//					 * 廣播和計數器的執行,須要進行一個action操做!
//					 */
//				}).collect();
//				System.out.println("廣播器裏面的值" + broadcastList.value());
//				System.out.println("計時器裏面的值" + accumulator.value());
//				return null;
//			}
//		});

		jsc.start();
		try {
			jsc.awaitTermination();
		} catch (InterruptedException e) {
			// TODO 自動生成的 catch 塊
			e.printStackTrace();
		}
		jsc.close();
	}
}
相關文章
相關標籤/搜索