spark執行操做時,能夠使用驅動器程序Driver中定義的變量,但有時這種默認的使用方式卻並不理想。java
spark爲了解決這兩個問題,提供了兩種類型的共享變量:廣播變量(broadcast variable)和累加器(accumulator)。python
查詢每一個國家的呼號個數apache
# 將呼號前綴(國家代碼)做爲廣播變量 signPrefixes = sc.broadcast(loadCallSignTable()) def processSignCount(sign_count, signPrefixes): country = lookupCountry(sign_count[0], signPrefixes.value) count = sign_count[1] return (country, count) countryContactCounts = (contactCounts.map(processSignCount).reduceByKey((lambda x, y: x+y))) countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")
// 將呼號前綴(國家代碼)做爲廣播變量 val signPrefixes = sc.broadcast(loadCallSignTable()) def processSignCount(sign_count, signPrefixes): country = lookupCountry(sign_count[0], signPrefixes.value) count = sign_count[1] return (country, count) val countryContactCounts = contactCounts.map{case (sign, count) => { val country = lookupInArray(sign, signPrefixes.value) (country, count) }}.reduceByKey((x, y) => x+y) countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")
// 將呼號前綴(國家代碼)做爲廣播變量 final Broadcast<String[]> signPrefixes = sc.broadcast(loadCallSignTable()); JavaPairRDD<String, Integer> countryContactCounts = contactCounts.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() { public Tuple2<String, Integer> call(Tuple2<String, Integer> callSignCount) { String sign = callSignCount._1(); String country = lookupCountry(sign, signPrefixes.value()); return new Tuple2(country, callSignCount._2()); } }).reduceByKey(new SumInts()); countryContactCounts.saveAsTextFile(outputDir + "/countries.txt");
累加空行編程
file = sc.textFile(inputFile) # 建立Accumulator[Int]並初始化爲0 blankLines = sc.accumulator(0) def extractCallSigns(line): global blankLines # 訪問全局變量 if (line == ""): blankLines += 1 return line.split(" ") callSigns = file.flatMap(extractCallSigns) callSigns.saveAsTextFile(outputDir + "/callsigns") print "Blank lines: %d" % blankLines.value
val file = sc.textFile("file.txt") val blankLines = sc.accumulator(0) //建立Accumulator[Int]並初始化爲0 val callSigns = file.flatMap(line => { if (line == "") { blankLines += 1 //累加器加1 } line.split(" ") }) callSigns.saveAsTextFile("output.txt") println("Blank lines:" + blankLines.value)
JavaRDD<String> rdd = sc.textFile(args[1]); final Accumulator<Integer> blankLines = sc.accumulator(0); JavaRDD<String> callSigns = rdd.flatMap(new FlatMapFunction<String, String>() { public Iterable<String> call(String line) { if ("".equals(line)) { blankLines.add(1); } return Arrays.asList(line.split(" ")); } }); callSigns.saveAsTextFile("output.text"); System.out.println("Blank lines:" + blankLines.value());
忠於技術,熱愛分享。歡迎關注公衆號:java大數據編程,瞭解更多技術內容。緩存