spark 廣播變量 之廣播表(dataframe)

Broadcast variables(廣播變量)容許程序員將一個 read-only(只讀的)變量緩存到每臺機器上,而不是給任務傳遞一個副本。它們是如何來使用呢,例如,廣播變量能夠用一種高效的方式給每一個節點傳遞一份比較大的 input dataset(輸入數據集)副本。在使用廣播變量時,Spark 也嘗試使用高效廣播算法分發 broadcast variables(廣播變量)以下降通訊成本。程序員

Spark 的 action(動做)操做是經過一系列的 stage(階段)進行執行的,這些 stage(階段)是經過分佈式的 "shuffle" 操做進行拆分的。Spark 會自動廣播出每一個 stage(階段)內任務所須要的公共數據。這種狀況下廣播的數據使用序列化的形式進行緩存,並在每一個任務運行前進行反序列化。這也就意味着,只有在跨越多個 stage(階段)的多個任務會使用相同的數據,或者在使用反序列化形式的數據特別重要的狀況下,使用廣播變量會有比較好的效果。(來自官網描述)算法

一、若是廣播一個rdd的值的話,能夠把rdd.collect 彙總到driver端,而後經過sc廣播到excutorsql

二、可是若是廣播一個表(datafram),上面的那種方式就不能這樣作了緩存

例如:我廣播一個手機號碼歸屬地的表,結構是(1353763,深圳市)分佈式

代碼以下ui

val df_operate = sqlContext.sparkContext
      .textFile(inpath_hdfs)
      .map(t => {
        val arr = t.split("\001")
        arr
      }).filter(t => {
      t.length == 7
    }).map(t => {
      (t(1), t(6))
    }).toDF("cell_seg", "cell_city")
    
//   進行廣播
    val broadcast_df_standard = sc.broadcast(df_operate.as("t_base_phone_to_operate"))

 

sparkDS 的代碼以下url

contact_DStream.foreachRDD { rdd =>

      if (!rdd.isEmpty) {
        //        val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)

        val sparkSession: SparkSession = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
        import sparkSession.implicits._

        //        val logger = Logger.getLogger(Contact.getClass.getName)
        val es_url = broadcast_es_uri1.value
        val es_port = broadcast_es_port.value
        val es_index = broadcast_es_index.value
        val es_type = broadcast_es_type.value

        //廣播變量的值
        val url_config = broadcast_url_config.value
        val user_config = broadcast_user_config.value
        val passwd_config = broadcast_passwd_config.value
        val table_config_name = broadcast_table_config.value


        val index2kinesisStreamName = broadcast_index2kinesis.value


        val df_standard = broadcast_df_standard.value

        //外部數據歸屬地
//        df_standard.createGlobalTempView("t_base_phone_to_operate")
        df_standard.createOrReplaceTempView("t_base_phone_to_operate")
//      數據cache到內存裏,防止大量的oom
        sparkSession.catalog.cacheTable("t_base_phone_to_operate")

       
//      cache後的數據記得要釋放,否則太佔內存了       
        sparkSession.catalog.clearCache()

      }
    }
    ssc.start()
    ssc.awaitTermination()
  }
相關文章
相關標籤/搜索