Spark 廣播變量BroadCast

 

1、 廣播變量

 

廣播變量容許程序員將一個只讀的變量緩存在每臺機器上,而不用在任務之間傳遞變量。廣播變量可被用於有效地給每一個節點一個大輸入數據集的副本。Spark還嘗試使用高效地廣播算法來分發變量,進而減小通訊的開銷。 Spark的動做經過一系列的步驟執行,這些步驟由分佈式的洗牌操做分開。Spark自動地廣播每一個步驟每一個任務須要的通用數據。這些廣播數據被序列化地緩存,在運行任務以前被反序列化出來。這意味着當咱們須要在多個階段的任務之間使用相同的數據,或者以反序列化形式緩存數據是十分重要的時候,顯式地建立廣播變量纔有用。html

 

2、爲何使用廣播變量

假如咱們要共享的變量map,1M
在默認的,task執行的算子中,使用了外部的變量,每一個task都會獲取一份變量的副本
在什麼狀況下,會出現性能上的惡劣的影響呢?
1000個task。大量task的確都在並行運行。這些task裏面都用到了佔用1M內存的map,那麼首先,map會拷貝1000份副本,經過網絡傳輸到各個task中去,給task使用。總計有1G的數據,會經過網絡傳輸。網絡傳輸的開銷,不容樂觀啊!!!網絡傳輸,也許就會消耗掉你的spark做業運行的總時間的一小部分。
map副本,傳輸到了各個task上以後,是要佔用內存的。1個map的確不大,1M;1000個map分佈在你的集羣中,一會兒就耗費掉1G的內存。對性能會有什麼影響呢?沒必要要的內存的消耗和佔用,就致使了,你在進行RDD持久化到內存,也許就無法徹底在內存中放下;就只能寫入磁盤,最後致使後續的操做在磁盤IO上消耗性能;
你的task在建立對象的時候,也許會發現堆內存放不下全部對象,也許就會致使頻繁的垃圾回收器的回收,GC。GC的時候,必定是會致使工做線程中止,也就是致使Spark暫停工做那麼一點時間。頻繁GC的話,對Spark做業的運行的速度會有至關可觀的影響。
 
若是說,task使用大變量(1m~100m),明知道會致使性能出現惡劣的影響。那麼咱們怎麼來解決呢?
廣播,Broadcast,將大變量廣播出去。而不是直接使用。
 
廣播變量的好處,不是每一個task一份變量副本,而是變成每一個節點的executor才一份副本。這樣的話,就可讓變量產生的副本大大減小。
廣播變量,初始的時候,就在Drvier上有一份副本。task在運行的時候,想要使用廣播變量中的數據,此時首先會在本身本地的Executor對應的
BlockManager中,嘗試獲取變量副本;若是本地沒有,BlockManager,也許會從遠程的Driver上面去獲取變量副本;也有可能從距離比較近的其餘
節點的Executor的BlockManager上去獲取,並保存在本地的BlockManager中;BlockManager負責管理某個Executor對應的內存和磁盤上的數據,
此後這個executor上的task,都會直接使用本地的BlockManager中的副本。

優勢:
    不是每一個task一份副本,而是變成每一個節點Executor上一個副本。

 

1.舉例來講:

50個Executor 1000個task。 
一個map10M 

默認狀況下,1000個task 1000個副本java

1000 * 10M = 10 000M = 10 G程序員

10G的數據,網絡傳輸,在集羣中,耗費10G的內存資源算法

若是使用 廣播變量,apache

50個Executor ,50個副本,10M*50 = 500M的數據json

網絡傳輸,並且不必定是從Drver傳輸到各個節點,還多是從就近的節點 
的Executor的BlockManager上獲取變量副本,網絡傳輸速度大大增長。緩存

以前 10000M 如今 500M網絡

20倍網絡傳輸性能的消耗。20倍內存消耗的減小。

3、如何使用

開始使用broadcast變量,使用完後,程序結束記得釋放app

  sc = SparkContext(appName=AppNames.JOURNEY_AGGREGATOR_APP_NAME)
    broadCastForLog = None
    try:
        broadCastForLog = ELogForDistributedApp.setLogConf2BroadCast(sc)
        elogging.initLogFromDict(broadCastForLog.value)
    except StandardError:
        pass

.......
    #執行完程序邏輯,記得釋放該變量

    if broadCastForLog is not None:
        broadCastForLog.unpersist(False)

#獲取要被共享的大變量,這裏是log配置jvm

 

class ELogForDistributedApp(object):

    LOGHDFSPATH = "/user/hdfs/test/logging/logging_hdfs.json"
    @staticmethod
    def setLogConf2BroadCast(sc):
        logFilePath = ELogForDistributedApp.LOGHDFSPATH
        if sc is not None:
            configDict = HDFSOperation.getConfigFromHDFS(logFilePath,sc)
            broadCast = sc.broadcast(configDict)
            #globals()['broadCast'] = broadCast
            #elogging.initLogFromDict(broadCast.value)
            return broadCast
            #print broadCast.value
        else:
            return None

 

    def initLogFromDict(self):
        elogging.initLogFromDict(self.eloggingConfig)

 

從hdfs中找到相應配置文件

class HDFSOperation(object):

    @staticmethod
    def getConfigFromHDFS(hdfsPath,sc):
        if sc is not None:
            filesystem_class = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
            hadoop_configuration = sc._jsc.hadoopConfiguration()
            fs =filesystem_class.get(hadoop_configuration)
            path_class = sc._gateway.jvm.org.apache.hadoop.fs.Path
            pathObj = path_class(hdfsPath)
            try:
                hdfsInStream = fs.open(pathObj)
                bufferedReader_class = sc._gateway.jvm.java.io.BufferedReader
                inputStreamReader_class = sc._gateway.jvm.java.io.InputStreamReader
                bufferedReader = bufferedReader_class(inputStreamReader_class(hdfsInStream))
            except IOError,msg:
                print str(msg)
                return None

        else:
            return None
        configStr = ''
        while True:
            tmpStr = bufferedReader.readLine()
            if tmpStr == None:
                break
            configStr += tmpStr
        try:
            confDict = json.loads(configStr)
        except IOError,msg:
            print str(msg)
            return None
        return confDict

 

參考文檔

  1. Spark Programming Guide1.6.3
  2. How can I update a broadcast variable in spark streaming?
  3. Spark踩坑記——共享變量

相關文章
相關標籤/搜索