廣播變量容許程序員將一個只讀的變量緩存在每臺機器上,而不用在任務之間傳遞變量。廣播變量可被用於有效地給每一個節點一個大輸入數據集的副本。Spark還嘗試使用高效地廣播算法來分發變量,進而減小通訊的開銷。 Spark的動做經過一系列的步驟執行,這些步驟由分佈式的洗牌操做分開。Spark自動地廣播每一個步驟每一個任務須要的通用數據。這些廣播數據被序列化地緩存,在運行任務以前被反序列化出來。這意味着當咱們須要在多個階段的任務之間使用相同的數據,或者以反序列化形式緩存數據是十分重要的時候,顯式地建立廣播變量纔有用。html
不是每一個task一份副本,而是變成每一個節點Executor上一個副本。
50個Executor 1000個task。
一個map10M
默認狀況下,1000個task 1000個副本java
1000 * 10M = 10 000M = 10 G程序員
10G的數據,網絡傳輸,在集羣中,耗費10G的內存資源。算法
若是使用 廣播變量,apache
50個Executor ,50個副本,10M*50 = 500M的數據。json
網絡傳輸,並且不必定是從Drver傳輸到各個節點,還多是從就近的節點
的Executor的BlockManager上獲取變量副本,網絡傳輸速度大大增長。緩存
以前 10000M 如今 500M。網絡
20倍網絡傳輸性能的消耗。20倍內存消耗的減小。
開始使用broadcast變量,使用完後,程序結束記得釋放app
sc = SparkContext(appName=AppNames.JOURNEY_AGGREGATOR_APP_NAME) broadCastForLog = None try: broadCastForLog = ELogForDistributedApp.setLogConf2BroadCast(sc) elogging.initLogFromDict(broadCastForLog.value) except StandardError: pass ....... #執行完程序邏輯,記得釋放該變量 if broadCastForLog is not None: broadCastForLog.unpersist(False)
#獲取要被共享的大變量,這裏是log配置jvm
class ELogForDistributedApp(object): LOGHDFSPATH = "/user/hdfs/test/logging/logging_hdfs.json" @staticmethod def setLogConf2BroadCast(sc): logFilePath = ELogForDistributedApp.LOGHDFSPATH if sc is not None: configDict = HDFSOperation.getConfigFromHDFS(logFilePath,sc) broadCast = sc.broadcast(configDict) #globals()['broadCast'] = broadCast #elogging.initLogFromDict(broadCast.value) return broadCast #print broadCast.value else: return None
def initLogFromDict(self):
elogging.initLogFromDict(self.eloggingConfig)
從hdfs中找到相應配置文件
class HDFSOperation(object): @staticmethod def getConfigFromHDFS(hdfsPath,sc): if sc is not None: filesystem_class = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem hadoop_configuration = sc._jsc.hadoopConfiguration() fs =filesystem_class.get(hadoop_configuration) path_class = sc._gateway.jvm.org.apache.hadoop.fs.Path pathObj = path_class(hdfsPath) try: hdfsInStream = fs.open(pathObj) bufferedReader_class = sc._gateway.jvm.java.io.BufferedReader inputStreamReader_class = sc._gateway.jvm.java.io.InputStreamReader bufferedReader = bufferedReader_class(inputStreamReader_class(hdfsInStream)) except IOError,msg: print str(msg) return None else: return None configStr = '' while True: tmpStr = bufferedReader.readLine() if tmpStr == None: break configStr += tmpStr try: confDict = json.loads(configStr) except IOError,msg: print str(msg) return None return confDict