一,概述java
二,廣播變量broadcast variable算法
2.1 定義廣播變量的緣由apache
2.2 圖解廣播變量機器學習
2.3 定義廣播變量分佈式
2.4 還原廣播變量函數
2.5 定義注意事項工具
三,累加器學習
3.1 爲何要將一個變量定義爲一個累加器spa
3.2 圖解累加器scala
3.3 定義累加器
3.4 還原累加器
3.5 定義注意事項
在spark程序中,當一個傳遞給Spark操做(例如map和reduce)的函數在遠程節點上面運行時,Spark操做實際上操做的是這個函數所用變量的一個獨立副本。這些變量會被複制到每臺機器上,而且這些變量在遠程機器上的全部更新都不會傳遞迴驅動程序。一般跨任務的讀寫變量是低效的,可是,Spark仍是爲兩種常見的使用模式提供了兩種有限的共享變量:廣播變(broadcast variable)和累加器(accumulator)
廣播變量用來高效分發較大的對象。向全部工做節點發送一個 較大的只讀值,以供一個或多個 Spark 操做使用。好比,若是你的應用須要向全部節點發 送一個較大的只讀查詢表,甚至是機器學習算法中的一個很大的特徵向量,廣播變量用起 來都很順手。
val a = 3
val broadcast = sc.broadcast(a)
val c = broadcast.value
一、能不能將一個RDD使用廣播變量廣播出去?
不能,由於RDD是不存儲數據的。能夠將RDD的結果廣播出去。
二、 廣播變量只能在Driver端定義,不能在Executor端定義。
三、 在Driver端能夠修改廣播變量的值,在Executor端沒法修改廣播變量的值。
四、若是executor端用到了Driver的變量,若是不使用廣播變量在Executor有多少task就有多少Driver端的變量副本。
五、若是Executor端用到了Driver的變量,若是使用廣播變量在每一個Executor中只有一份Driver端的變量副本。
六、變量一旦被定義爲一個廣播變量,那麼這個變量只能讀,不能修改
2.6 廣播變量應用實例
實例描述:有一些訪問數據,須要根據訪問的IP獲取對於IP的訪問地址,同時求出每一個訪問地址的數量。由於每個IP段會對應一個區域地址。也就是映射關係。咱們就能夠經過IP還對應這些訪問地址。
工具類:
package cn.edu360.sparkThree import scala.io.{BufferedSource, Source} object MyUtils { // 解析IP段對應的省份,將這些省份隱式到IP段 def readRules(path: String): Array[(Long, Long, String)]={ val bf: BufferedSource = Source.fromFile(path) val lines: Iterator[String] = bf.getLines() val rules: Array[(Long, Long, String)] = lines.map(line => { val fields: Array[String] = line.split("[|]") val startNum: Long = fields(2).toLong val endNum: Long = fields(3).toLong val privince: String = fields(6) (startNum, endNum, privince) }).toArray rules } // 將IP地址轉換成長整形 def ipToLong(ip: String)={ val fragments: Array[String] = ip.split("[.]") var ipNum = 0L for(i <- 0 until fragments.length){ ipNum = fragments(i).toLong | ipNum << 8L } ipNum } // 利用二分法的形式返回IP地址對應的IP段 def binarySearch(lines: Array[(Long, Long, String)], ip: Long ): Int={ var low = 0 var high = lines.length - 1 while (low <= high){ val middle = (low + high) / 2 if((ip >= lines(middle)._1) && (ip <=lines(middle)._2)){ return middle } if(ip < lines(middle)._1){ high = middle - 1 }else{ low = middle + 1 } } -1 } }
利用Spark程序解析IP:
package cn.edu360.sparkThree import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object IpLocationOne { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("IpManage").setMaster("local[4]") val sc = new SparkContext(conf) // 獲取IP映射庫,同時將其解析 val rules: Array[(Long, Long, String)] = MyUtils.readRules("C:\\Users\\Administrator\\Desktop\\java\\spark4\\課件與代碼\\ip\\ip.txt") // 將IP映射規則廣播到每個Worker的executer val broudcastRef: Broadcast[Array[(Long, Long, String)]] = sc.broadcast(rules) // 獲取訪問日誌 val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/ip/log/") // 定義一個解析訪問日誌的函數 val func =(line: String) =>{ val fields: Array[String] = line.split("[|]") val ip: String = fields(1) val ipNum: Long = MyUtils.ipToLong(ip) val rulesInExecutor: Array[(Long, Long, String)] = broudcastRef.value var province = "未知" val index: Int = MyUtils.binarySearch(rulesInExecutor, ipNum) if(index != -1){ province = rulesInExecutor(index)._3 } (province, 1) } // 解析數據 val provinceOne: RDD[(String, Int)] = lines.map(func) // 聚合數據 val reduced: RDD[(String, Int)] = provinceOne.reduceByKey(_+_) val result: Array[(String, Int)] = reduced.collect() print(result.toBuffer) } }
累加器是經過交替的操做能夠增長的變量,而且能夠運行在並行的狀況下。能夠用來實現一個計數器(和 MapReduce中的同樣)或者求和。Spark自然支持數字類型的累加器,開發人員能夠添加新類型的支持。
在spark應用程序中,咱們常常會有這樣的需求,如異常監控,調試,記錄符合某特性的數據的數目,這種需求都須要用到計數器,若是一個變量不被聲明爲一個累加器,那麼它將在被改變時不會再driver端進行全局彙總,即在分佈式運行時每一個task運行的只是原始變量的一個副本,並不能改變原始變量的值,可是當這個變量被聲明爲累加器後,該變量就會有分佈式計數的功能。
val a = sc.accumulator(0)
val b = a.value
1、累加器在Driver端定義賦初始值,累加器只能在Driver端讀取最後的值,在Excutor端更新。
二、累加器不是一個調優的操做,由於若是不這樣作,結果是錯的