Broadcast variables(廣播變量)容許程序員將一個 read-only(只讀的)變量緩存到每臺機器上,而不是給任務傳遞一個副本。它們是如何來使用呢,例如,廣播變量能夠用一種高效的方式給每一個節點傳遞一份比較大的 input dataset(輸入數據集)副本。在使用廣播變量時,Spark 也嘗試使用高效廣播算法分發 broadcast variables(廣播變量)以下降通訊成本。程序員
Spark 的 action(動做)操做是經過一系列的 stage(階段)進行執行的,這些 stage(階段)是經過分佈式的 "shuffle" 操做進行拆分的。Spark 會自動廣播出每一個 stage(階段)內任務所須要的公共數據。這種狀況下廣播的數據使用序列化的形式進行緩存,並在每一個任務運行前進行反序列化。這也就意味着,只有在跨越多個 stage(階段)的多個任務會使用相同的數據,或者在使用反序列化形式的數據特別重要的狀況下,使用廣播變量會有比較好的效果。(來自官網描述)算法
一、若是廣播一個rdd的值的話,能夠把rdd.collect 彙總到driver端,而後經過sc廣播到excutorsql
二、可是若是廣播一個表(datafram),上面的那種方式就不能這樣作了緩存
例如:我廣播一個手機號碼歸屬地的表,結構是(1353763,深圳市)分佈式
代碼以下ui
val df_operate = sqlContext.sparkContext .textFile(inpath_hdfs) .map(t => { val arr = t.split("\001") arr }).filter(t => { t.length == 7 }).map(t => { (t(1), t(6)) }).toDF("cell_seg", "cell_city") // 進行廣播 val broadcast_df_standard = sc.broadcast(df_operate.as("t_base_phone_to_operate"))
sparkDS 的代碼以下url
contact_DStream.foreachRDD { rdd => if (!rdd.isEmpty) { // val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) val sparkSession: SparkSession = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate() import sparkSession.implicits._ // val logger = Logger.getLogger(Contact.getClass.getName) val es_url = broadcast_es_uri1.value val es_port = broadcast_es_port.value val es_index = broadcast_es_index.value val es_type = broadcast_es_type.value //廣播變量的值 val url_config = broadcast_url_config.value val user_config = broadcast_user_config.value val passwd_config = broadcast_passwd_config.value val table_config_name = broadcast_table_config.value val index2kinesisStreamName = broadcast_index2kinesis.value val df_standard = broadcast_df_standard.value //外部數據歸屬地 // df_standard.createGlobalTempView("t_base_phone_to_operate") df_standard.createOrReplaceTempView("t_base_phone_to_operate") // 數據cache到內存裏,防止大量的oom sparkSession.catalog.cacheTable("t_base_phone_to_operate") // cache後的數據記得要釋放,否則太佔內存了 sparkSession.catalog.clearCache() } } ssc.start() ssc.awaitTermination() }