Spark 學習(五)廣播變量和累加器

一,概述java

二,廣播變量broadcast variable算法

  2.1 定義廣播變量的緣由apache

  2.2 圖解廣播變量機器學習

  2.3 定義廣播變量分佈式

  2.4 還原廣播變量函數

  2.5 定義注意事項工具

三,累加器學習

  3.1 爲何要將一個變量定義爲一個累加器spa

  3.2 圖解累加器scala

  3.3 定義累加器

  3.4 還原累加器

  3.5 定義注意事項

 

 

 

 

正文

一,概述

  在spark程序中,當一個傳遞給Spark操做(例如map和reduce)的函數在遠程節點上面運行時,Spark操做實際上操做的是這個函數所用變量的一個獨立副本。這些變量會被複制到每臺機器上,而且這些變量在遠程機器上的全部更新都不會傳遞迴驅動程序。一般跨任務的讀寫變量是低效的,可是,Spark仍是爲兩種常見的使用模式提供了兩種有限的共享變量:廣播變(broadcast variable)和累加器(accumulator)

二,廣播變量broadcast variable

  2.1 定義廣播變量的緣由

  廣播變量用來高效分發較大的對象。向全部工做節點發送一個 較大的只讀值,以供一個或多個 Spark 操做使用。好比,若是你的應用須要向全部節點發 送一個較大的只讀查詢表,甚至是機器學習算法中的一個很大的特徵向量,廣播變量用起 來都很順手。

  2.2 圖解廣播變量

  2.3 定義廣播變量

val a = 3
val broadcast = sc.broadcast(a)

  2.4 還原廣播變量

val c = broadcast.value

  2.5 定義注意事項

  一、能不能將一個RDD使用廣播變量廣播出去?

         不能,由於RDD是不存儲數據的。能夠將RDD的結果廣播出去。

  二、 廣播變量只能在Driver端定義,不能在Executor端定義。

  三、 在Driver端能夠修改廣播變量的值,在Executor端沒法修改廣播變量的值。

  四、若是executor端用到了Driver的變量,若是不使用廣播變量在Executor有多少task就有多少Driver端的變量副本。

  五、若是Executor端用到了Driver的變量,若是使用廣播變量在每一個Executor中只有一份Driver端的變量副本。

  六、變量一旦被定義爲一個廣播變量,那麼這個變量只能讀,不能修改

  2.6 廣播變量應用實例

  實例描述:有一些訪問數據,須要根據訪問的IP獲取對於IP的訪問地址,同時求出每一個訪問地址的數量。由於每個IP段會對應一個區域地址。也就是映射關係。咱們就能夠經過IP還對應這些訪問地址。

  工具類:

package cn.edu360.sparkThree

import scala.io.{BufferedSource, Source}

object MyUtils {
    // 解析IP段對應的省份,將這些省份隱式到IP段
    def readRules(path: String): Array[(Long, Long, String)]={
        val bf: BufferedSource = Source.fromFile(path)
        val lines: Iterator[String] = bf.getLines()
        val rules: Array[(Long, Long, String)] = lines.map(line => {
            val fields: Array[String] = line.split("[|]")
            val startNum: Long = fields(2).toLong
            val endNum: Long = fields(3).toLong
            val privince: String = fields(6)
            (startNum, endNum, privince)
        }).toArray
        rules
    }
    // 將IP地址轉換成長整形
    def ipToLong(ip: String)={
        val fragments: Array[String] = ip.split("[.]")
        var ipNum = 0L
        for(i <- 0 until fragments.length){
            ipNum = fragments(i).toLong | ipNum << 8L
        }
        ipNum
    }
    // 利用二分法的形式返回IP地址對應的IP段
    def binarySearch(lines: Array[(Long, Long, String)], ip: Long ): Int={
        var low = 0
        var high = lines.length - 1
        while (low <= high){
            val middle = (low + high) / 2
            if((ip >= lines(middle)._1) && (ip <=lines(middle)._2)){
                return middle
            }
            if(ip < lines(middle)._1){
                high = middle - 1
            }else{
                low = middle + 1
            }
        }
        -1
    }
}

  利用Spark程序解析IP:

package cn.edu360.sparkThree

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object IpLocationOne {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("IpManage").setMaster("local[4]")
        val sc = new SparkContext(conf)
        // 獲取IP映射庫,同時將其解析
        val rules: Array[(Long, Long, String)] = MyUtils.readRules("C:\\Users\\Administrator\\Desktop\\java\\spark4\\課件與代碼\\ip\\ip.txt")
        // 將IP映射規則廣播到每個Worker的executer
        val broudcastRef: Broadcast[Array[(Long, Long, String)]] = sc.broadcast(rules)
        // 獲取訪問日誌
        val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/ip/log/")
        // 定義一個解析訪問日誌的函數
        val func =(line: String) =>{
            val fields: Array[String] = line.split("[|]")
            val ip: String = fields(1)
            val ipNum: Long = MyUtils.ipToLong(ip)
            val rulesInExecutor: Array[(Long, Long, String)] = broudcastRef.value
            var province = "未知"
            val index: Int = MyUtils.binarySearch(rulesInExecutor, ipNum)
            if(index != -1){
                province = rulesInExecutor(index)._3
            }
            (province, 1)
        }
        // 解析數據
        val provinceOne: RDD[(String, Int)] = lines.map(func)
        // 聚合數據
        val reduced: RDD[(String, Int)] = provinceOne.reduceByKey(_+_)
        val result: Array[(String, Int)] = reduced.collect()
        print(result.toBuffer)
    }
}

三,累加器

  3.1 什麼是累加器

  累加器是經過交替的操做能夠增長的變量,而且能夠運行在並行的狀況下。能夠用來實現一個計數器(和 MapReduce中的同樣)或者求和。Spark自然支持數字類型的累加器,開發人員能夠添加新類型的支持。

  3.2 累加器的做用

  在spark應用程序中,咱們常常會有這樣的需求,如異常監控,調試,記錄符合某特性的數據的數目,這種需求都須要用到計數器,若是一個變量不被聲明爲一個累加器,那麼它將在被改變時不會再driver端進行全局彙總,即在分佈式運行時每一個task運行的只是原始變量的一個副本,並不能改變原始變量的值,可是當這個變量被聲明爲累加器後,該變量就會有分佈式計數的功能。

  3.3 圖解累加器

  

  3.4 定義累加器

val a = sc.accumulator(0)

  3.5 還原累加器

val b = a.value

  3.6 定義注意事項

1、累加器在Driver端定義賦初始值,累加器只能在Driver端讀取最後的值,在Excutor端更新。

二、累加器不是一個調優的操做,由於若是不這樣作,結果是錯的
相關文章
相關標籤/搜索