9.spark core之共享變量

簡介

  spark執行操做時,能夠使用驅動器程序Driver中定義的變量,但有時這種默認的使用方式卻並不理想。java

  • 集羣中運行的每一個任務都會鏈接驅動器獲取變量。若是獲取的變量比較大,執行效率會很是低下。
  • 每一個任務都會獲得這些變量的一份新的副本,更新這些副本的值不會影響驅動器中的對應變量。若是驅動器須要獲取變量的結果值,這種方式是不可行的。

  spark爲了解決這兩個問題,提供了兩種類型的共享變量:廣播變量(broadcast variable)和累加器(accumulator)。python

  • 廣播變量用於高效分發較大的對象。會在每一個執行器本地緩存一份大對象,而避免每次都鏈接驅動器獲取。
  • 累加器用於在驅動器中對數據結果進行聚合。

廣播變量

原理

廣播變量.png

  • 廣播變量只能在Driver端定義,不能在Executor端定義。
  • 在Driver端能夠修改廣播變量的值,在Executor端沒法修改廣播變量的值。
  • 若是不使用廣播變量在Executor有多少task就有多少Driver端的變量副本;若是使用廣播變量在每一個Executor中只有一份Driver端的變量副本。

用法

  • 經過對一個類型T的對象調用SparkContext.broadcast建立出一個BroadCast[T]對象,任何可序列化的類型均可以這麼實現。
  • 經過value屬性訪問該對象的值
  • 變量只會被髮到各個節點一次,應做爲只讀值處理。(修改這個值不會影響到別的節點)

    實例

      查詢每一個國家的呼號個數apache

    python

# 將呼號前綴(國家代碼)做爲廣播變量
signPrefixes = sc.broadcast(loadCallSignTable())

def processSignCount(sign_count, signPrefixes):
    country = lookupCountry(sign_count[0], signPrefixes.value)
    count = sign_count[1]
    return (country, count)

countryContactCounts = (contactCounts.map(processSignCount).reduceByKey((lambda x, y: x+y)))

countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")

scala

// 將呼號前綴(國家代碼)做爲廣播變量
val signPrefixes = sc.broadcast(loadCallSignTable())

def processSignCount(sign_count, signPrefixes):
    country = lookupCountry(sign_count[0], signPrefixes.value)
    count = sign_count[1]
    return (country, count)

val countryContactCounts = contactCounts.map{case (sign, count) => {
    val country = lookupInArray(sign, signPrefixes.value)
    (country, count)
    }}.reduceByKey((x, y) => x+y)

countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")

java

// 將呼號前綴(國家代碼)做爲廣播變量
final Broadcast<String[]> signPrefixes = sc.broadcast(loadCallSignTable());

JavaPairRDD<String, Integer> countryContactCounts = contactCounts.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {
    public Tuple2<String, Integer> call(Tuple2<String, Integer> callSignCount) {
        String sign = callSignCount._1();
        String country = lookupCountry(sign, signPrefixes.value());
        return new Tuple2(country, callSignCount._2()); 
    }
}).reduceByKey(new SumInts());

countryContactCounts.saveAsTextFile(outputDir + "/countries.txt");

累加器

原理

累加器.png

  • 累加器在Driver端定義賦初始值。
  • 累加器只能在Driver端讀取最後的值,在Excutor端更新。

用法

  • 經過調用sc.accumulator(initivalValue)方法,建立出存有初始值的累加器。返回值爲org.apache.spark.Accumulator[T]對象,其中T是初始值initialValue的類型。
  • Spark閉包裏的執行器代碼能夠使用累加器的+=方法增長累加器的值
  • 驅動器程序能夠調用累加器的value屬性來訪問累加器的值

實例

  累加空行編程

python

file = sc.textFile(inputFile)
# 建立Accumulator[Int]並初始化爲0
blankLines = sc.accumulator(0)

def extractCallSigns(line):
    global blankLines # 訪問全局變量
    if (line == ""):
        blankLines += 1
    return line.split(" ")

callSigns = file.flatMap(extractCallSigns)
callSigns.saveAsTextFile(outputDir + "/callsigns")
print "Blank lines: %d" % blankLines.value

scala

val file = sc.textFile("file.txt")
val blankLines = sc.accumulator(0) //建立Accumulator[Int]並初始化爲0

val callSigns = file.flatMap(line => {
    if (line == "") {
        blankLines += 1 //累加器加1
    }
    line.split(" ")
})

callSigns.saveAsTextFile("output.txt")
println("Blank lines:" + blankLines.value)

java

JavaRDD<String> rdd = sc.textFile(args[1]);

final Accumulator<Integer> blankLines = sc.accumulator(0);

JavaRDD<String> callSigns = rdd.flatMap(new FlatMapFunction<String, String>() {
    public Iterable<String> call(String line) {
        if ("".equals(line)) {
            blankLines.add(1);
        }
        return Arrays.asList(line.split(" "));
    }
});

callSigns.saveAsTextFile("output.text");
System.out.println("Blank lines:" + blankLines.value());

忠於技術,熱愛分享。歡迎關注公衆號:java大數據編程,瞭解更多技術內容。緩存

這裏寫圖片描述

相關文章
相關標籤/搜索