spark 廣播變量

Spark廣播變量java

使用廣播變量來優化,廣播變量的原理是:apache

在每個Executor中保存一份全局變量,task在執行的時候須要使用和這一份變量就能夠,極大的減小了Executor的內存開銷。api

Executor中task在執行的時候若是使用到了廣播變量,會找Executor裏面的BlockManager來獲取廣播變量。ide

若是BlockManager中沒有這個關閉變量,會從driver端拉取關閉變量。優化

在Driver端也有一個blockManagerMaster,其餘的task執行的時候直接使用blockmanager中的廣播變量就能夠。spa

package SparkStreaming; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.broadcast.Broadcast; import java.util.Arrays; import java.util.List; public class BroadCast { public static void main(String[] args) { SparkConf conf = new SparkConf() .setMaster("local") .setAppName("BroadCast"); JavaSparkContext sc = new JavaSparkContext(conf); /* * 使用廣播變量,廣播變量的定義必須在driver端,由於sc沒有被序列化不能被髮送到Executor端 * */ Broadcast<String> blackname = sc.broadcast("dwj3"); List<String> name = Arrays.asList( "dwj1", "dwj2", "dwj3"); //String blackName = "dwj3"; JavaRDD<String> nameRDD = sc.parallelize(name); JavaRDD<String> namefilter = nameRDD.filter(new Function<String, Boolean>() { @Override public Boolean call(String s) throws Exception { String blacknames = blackname.getValue(); return !blacknames.equals(s); } }); List<String> lastname = namefilter.collect(); for(String str:lastname){ System.out.println(str); } } }

注意:在聲明廣播變量的時候,必須在driver端,由於sc沒有被序列化,是不能被髮送到Executor端的。code

相關文章
相關標籤/搜索