Spark編程進階篇html
做者:尹正傑java
版權聲明:原創做品,謝絕轉載!不然將追究法律責任。算法
一.spark三大數據結構apache
Spark有三大數據結構,分別爲RDD,廣播變量和累加器。 RDD: RDD全稱爲"Resilient Distributed Dataset",叫作彈性分佈式數據集,是Spark中最基本的數據抽象。 簡單的理解RDD就是包含數據的計算。不少時候數據須要必定的訪問規則,這個時候使用RDD來作就不太合適啦。 廣播變量: 此處能夠簡單理解爲分佈式只讀共享變量。 累加器: 此處能夠簡單理解爲分佈式只寫共享變量。 博主推薦閱讀: https://www.cnblogs.com/yinzhengjie2020/p/13155362.html
二.廣播變量編程
1>.廣播變量概述數據結構
廣播變量用來高效分發較大的對象。向全部工做節點發送一個較大的只讀值,以供一個或多個Spark操做使用。
好比,若是你的應用須要向全部節點發送一個較大的只讀查詢表,甚至是機器學習算法中的一個很大的特徵向量,廣播變量用起來都很順手。 在多個並行操做中使用同一個變量,可是 Spark會爲每一個任務分別發送。 使用廣播變量的過程以下: (1)經過對一個類型 T 的對象調用 SparkContext.broadcast 建立出一個 Broadcast[T] 對象。 任何可序列化的類型均可以這麼實現。 (2)經過 value 屬性訪問該對象的值(在 Java 中爲 value() 方法)。 (3)變量只會被髮到各個節點一次,應做爲只讀值處理(修改這個值不會影響到別的節點)。
2>.廣播變量應用案例閉包
package com.yinzhengjie.bigdata.spark.rdd import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object BroadcastVariable { def main(args: Array[String]): Unit = { //建立spark配置信息 val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD") //建立SparkContext val sc = new SparkContext(sparkConf) val listRDD:RDD[(Int,String)] = sc.parallelize(List((1,"A"),(2,"B"),(3,"C"))) /** * 若是想要將2個RDD基於Key進行關聯,使用join算子(設計到笛卡爾積,即會增長數據)是能夠實現的; * * 但效率較低,由於使用join算子有一個弊端就是涉及到shuffle過程 */ // val listRDD2:RDD[(Int,Int)] = sc.makeRDD(List((1,1),(2,2),(3,3))) // val joinRDD:RDD[(Int,(String,Int))] = listRDD.join(listRDD2) // joinRDD.foreach(println) val list = List((1,1),(2,2),(3,3)) //將list變量構建爲廣播變量(使用廣播變量減小數據的傳輸,即無需每一個Executor都傳輸一個list變量,而是spark集羣共享同一個只讀變量) val boradcast:Broadcast[List[(Int,Int)]] = sc.broadcast(list) /** * 使用map算子來替代join算子的好處就是map不涉及到shuffle過程,所以咱們說廣播變量是一種調優策略。 */ val mapRDD:RDD[(Int,(String,Any))] = listRDD.map{ case (key,value) => { var v2:Any = null //使用廣播變量 for (t <- boradcast.value){ if (key == t._1){ v2 = t._2 } } (key,(value,v2)) } } mapRDD.foreach(println) //釋放資源 sc.stop() } }
三.累加器機器學習
1>.累加器概述分佈式
累加器用來對信息進行聚合,一般在向 Spark傳遞函數時,好比使用map()函數或者用filter()傳條件時,可使用驅動器程序中定義的變量,可是集羣中運行的每一個任務都會獲得這些變量的一份新的副本,更新這些副本的值也不會影響驅動器中的對應變量。 若是咱們想實現全部分片處理時更新共享變量的功能,那麼累加器能夠實現咱們想要的效果。 累加器的用法以下所示。 經過在驅動器中調用SparkContext.accumulator(initialValue)方法,建立出存有初始值的累加器; 返回值爲 org.apache.spark.Accumulator[T] 對象,其中 T 是初始值 initialValue 的類型; Spark閉包裏的執行器代碼可使用累加器的"+=」方法(在Java中是add)增長累加器的值; 驅動器程序能夠調用累加器的value屬性(在Java中使用value()或setValue())來訪問累加器的值。 自定義累加器 自定義累加器類型的功能在1.X版本中就已經提供了,可是使用起來比較麻煩,在2.0版本後,累加器的易用性有了較大的改進,並且官方還提供了一個新的抽象類:AccumulatorV2來提供更加友好的自定義類型累加器的實現方式。 實現自定義類型累加器須要繼承AccumulatorV2並至少覆寫相應的方法,累加器能夠用於在程序運行過程當中收集一些文本類信息,最終以Set[String]的形式返回。 舒適提示: 工做節點上的任務不能訪問累加器的值。從這些任務的角度來看,累加器是一個只寫變量。 對於要在行動操做中使用的累加器,Spark只會把每一個任務對各累加器的修改應用一次。所以,若是想要一個不管在失敗仍是重複計算時都絕對可靠的累加器,咱們必須把它放在foreach()這樣的行動操做中。轉化操做中累加器可能會發生不止一次更新。
2>.系統累加器 ide
package com.yinzhengjie.bigdata.spark.rdd import org.apache.spark.rdd.RDD import org.apache.spark.util.LongAccumulator import org.apache.spark.{SparkConf, SparkContext} object ShareData { def main(args: Array[String]): Unit = { //建立spark配置信息 val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD") //建立SparkContext val sc = new SparkContext(sparkConf) val dataRDD:RDD[Int] = sc.parallelize(List(10,20,30,40,60,70,80,90),3) /** * 使用reduce算子計算dataRDD各個元素之和,其工做原理以下: * 1>.將Driver中的dataRDD中3個不一樣分區數據發送給不一樣的Executor; * 2>.各個Executor計算相應分區的數據之和; * 3>.最後,每一個Executor的數據再相加獲得最終的數據。 */ // val sum1 = dataRDD.reduce(_+_) // println("sum1 = " + sum1) /** * 咱們使用foreach算子直接遍歷dataRDD中的每一個元素使他們相加,但打印sum2的結果爲"0",其緣由以下: * 1>.將Driver中的dataRDD中3個不一樣分區數據發送給不一樣的Executor; * 2>.sum2屬於Driver中的變量,所以他要序列化sum2變量並其傳遞給各個Executor; * 3>.各個Executor使用Driver傳遞過來的變量進行計算,最終各個Exector算出sum2的結果,但此時各個Executor並無將計算結果發送給Driver; * * 綜上所述,各個Executor均有對應的sum2數據,但並無將計算結果發送給Driver進行累加,所以咱們看到的sum2始終是"0". * */ // var sum2:Int = 0 // dataRDD.foreach(data => sum2 += data) // println("sum2 = " + sum2) /** * 使用累加器來共享變量,用來累加數據,其工做原理以下: * 1>.將Driver中的dataRDD中3個不一樣分區數據發送給不一樣的Executor; * 2>.sum3屬於Driver中的變量,所以他要序列化sum3變量並其傳遞給各個Executor; * 3>.各個Executor使用Driver傳遞過來的變量進行計算,最終各個Executor算出sum3的結果; * 4>.最後spark發現sum3是一個累加器(Accumulator)變量,所以會將各個Executor的sum3的結果返回給Driver,由Driver將最終結果進行累加。 */ val sum3:LongAccumulator = sc.longAccumulator //建立累加器對象sum3 dataRDD.foreach{ case data => { sum3.add(data) //將每個元素和累加器相加 } } println("sum3 = " + sum3.value) //釋放資源 sc.stop() } }
3>.自定義累加器
package com.yinzhengjie.bigdata.spark.rdd import java.util import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.util.AccumulatorV2 /** * 自定義累加器步驟以下: * 1>.繼承AccumulatorV2; * 2>.實現抽象方法; * 3>.建立累加器 */ class WordAccumulator extends AccumulatorV2[String, util.ArrayList[String]] { //定義一個集合,用於返回結果 val list = new util.ArrayList[String]() //當前的累加器是否爲初始化狀態 override def isZero: Boolean = { list.isEmpty } //複製累加器對象,根據你得需求自行實現 override def copy(): AccumulatorV2[String, util.ArrayList[String]] = { new WordAccumulator() } //重置累加器對象 override def reset(): Unit = { list.clear() } //往累加器中增長數據 override def add(v: String): Unit = { if (v.contains("o")){ list.add(v) //判斷字符串是否包含字母"o",若是爲真則寫入累加器 } } //合併累加器,即Driverd端將各個Executor返回的list集合的數據進行累加操做 override def merge(other: AccumulatorV2[String, util.ArrayList[String]]): Unit = { list.addAll(other.value) } //獲取累加器的結果,咱們直接將定義的list集合返回 override def value: util.ArrayList[String] = { list } } object CustomAccumulator { def main(args: Array[String]): Unit = { //建立spark配置信息 val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD") //建立SparkContext val sc = new SparkContext(sparkConf) val dataRDD:RDD[String] = sc.makeRDD(Array("Hadoop","Spark","Flume","HBase","Sqoop","Flink","Storm","Hive"),3) //建立我們自定義的累加器 val wordAccumulator = new WordAccumulator //註冊自定義累加器,目的是讓Spark知曉 sc.register(wordAccumulator) //使用我們的累加器變量 dataRDD.foreach{ case data => { wordAccumulator.add(data) //將每個元素和累加器相加 } } //獲取累加器的值 println("words = " + wordAccumulator.value) //釋放資源 sc.stop() } }