Spark編程進階篇

                Spark編程進階篇html

                                     做者:尹正傑java

版權聲明:原創做品,謝絕轉載!不然將追究法律責任。算法

 

 

 

 

一.spark三大數據結構apache

  Spark有三大數據結構,分別爲RDD,廣播變量和累加器。
    RDD:
      RDD全稱爲"Resilient Distributed Dataset",叫作彈性分佈式數據集,是Spark中最基本的數據抽象。
      簡單的理解RDD就是包含數據的計算。不少時候數據須要必定的訪問規則,這個時候使用RDD來作就不太合適啦。

    廣播變量:
      此處能夠簡單理解爲分佈式只讀共享變量。

    累加器:
      此處能夠簡單理解爲分佈式只寫共享變量。

  博主推薦閱讀:
    https://www.cnblogs.com/yinzhengjie2020/p/13155362.html

 

二.廣播變量編程

1>.廣播變量概述數據結構

  廣播變量用來高效分發較大的對象。向全部工做節點發送一個較大的只讀值,以供一個或多個Spark操做使用。

  好比,若是你的應用須要向全部節點發送一個較大的只讀查詢表,甚至是機器學習算法中的一個很大的特徵向量,廣播變量用起來都很順手。 在多個並行操做中使用同一個變量,可是 Spark會爲每一個任務分別發送。   使用廣播變量的過程以下:     (
1)經過對一個類型 T 的對象調用 SparkContext.broadcast 建立出一個 Broadcast[T] 對象。 任何可序列化的類型均可以這麼實現。     (2)經過 value 屬性訪問該對象的值(在 Java 中爲 value() 方法)。     (3)變量只會被髮到各個節點一次,應做爲只讀值處理(修改這個值不會影響到別的節點)。

2>.廣播變量應用案例閉包

package com.yinzhengjie.bigdata.spark.rdd

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object BroadcastVariable {
  def main(args: Array[String]): Unit = {
    //建立spark配置信息
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD")

    //建立SparkContext
    val sc = new SparkContext(sparkConf)

    val listRDD:RDD[(Int,String)] = sc.parallelize(List((1,"A"),(2,"B"),(3,"C")))

    /**
      *     若是想要將2個RDD基於Key進行關聯,使用join算子(設計到笛卡爾積,即會增長數據)是能夠實現的;
      *
      *     但效率較低,由於使用join算子有一個弊端就是涉及到shuffle過程
      */
//    val listRDD2:RDD[(Int,Int)] = sc.makeRDD(List((1,1),(2,2),(3,3)))
//    val joinRDD:RDD[(Int,(String,Int))] = listRDD.join(listRDD2)
//    joinRDD.foreach(println)

    val list = List((1,1),(2,2),(3,3))

    //將list變量構建爲廣播變量(使用廣播變量減小數據的傳輸,即無需每一個Executor都傳輸一個list變量,而是spark集羣共享同一個只讀變量)
    val boradcast:Broadcast[List[(Int,Int)]] = sc.broadcast(list)

    /**
      *   使用map算子來替代join算子的好處就是map不涉及到shuffle過程,所以咱們說廣播變量是一種調優策略。
      */
    val mapRDD:RDD[(Int,(String,Any))] = listRDD.map{
      case (key,value) => {
        var v2:Any = null
        //使用廣播變量
        for (t <- boradcast.value){
          if (key == t._1){
            v2 = t._2
          }
        }
        (key,(value,v2))
      }
    }
    mapRDD.foreach(println)

    //釋放資源
    sc.stop()
  }
}

 

三.累加器機器學習

1>.累加器概述分佈式

  累加器用來對信息進行聚合,一般在向 Spark傳遞函數時,好比使用map()函數或者用filter()傳條件時,可使用驅動器程序中定義的變量,可是集羣中運行的每一個任務都會獲得這些變量的一份新的副本,更新這些副本的值也不會影響驅動器中的對應變量。

  若是咱們想實現全部分片處理時更新共享變量的功能,那麼累加器能夠實現咱們想要的效果。

  累加器的用法以下所示。
    經過在驅動器中調用SparkContext.accumulator(initialValue)方法,建立出存有初始值的累加器;
    返回值爲 org.apache.spark.Accumulator[T] 對象,其中 T 是初始值 initialValue 的類型;
    Spark閉包裏的執行器代碼可使用累加器的"+=」方法(在Java中是add)增長累加器的值;
    驅動器程序能夠調用累加器的value屬性(在Java中使用value()或setValue())來訪問累加器的值。

  自定義累加器
    自定義累加器類型的功能在1.X版本中就已經提供了,可是使用起來比較麻煩,在2.0版本後,累加器的易用性有了較大的改進,並且官方還提供了一個新的抽象類:AccumulatorV2來提供更加友好的自定義類型累加器的實現方式。
    實現自定義類型累加器須要繼承AccumulatorV2並至少覆寫相應的方法,累加器能夠用於在程序運行過程當中收集一些文本類信息,最終以Set[String]的形式返回。

  舒適提示:
    工做節點上的任務不能訪問累加器的值。從這些任務的角度來看,累加器是一個只寫變量。
    對於要在行動操做中使用的累加器,Spark只會把每一個任務對各累加器的修改應用一次。所以,若是想要一個不管在失敗仍是重複計算時都絕對可靠的累加器,咱們必須把它放在foreach()這樣的行動操做中。轉化操做中累加器可能會發生不止一次更新。

2>.系統累加器 ide

package com.yinzhengjie.bigdata.spark.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}

object ShareData {
  def main(args: Array[String]): Unit = {
    //建立spark配置信息
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD")

    //建立SparkContext
    val sc = new SparkContext(sparkConf)

    val dataRDD:RDD[Int] = sc.parallelize(List(10,20,30,40,60,70,80,90),3)

    /**
      *   使用reduce算子計算dataRDD各個元素之和,其工做原理以下:
      *     1>.將Driver中的dataRDD中3個不一樣分區數據發送給不一樣的Executor;
      *     2>.各個Executor計算相應分區的數據之和;
      *     3>.最後,每一個Executor的數據再相加獲得最終的數據。
      */
//    val sum1 = dataRDD.reduce(_+_)
//    println("sum1 = " + sum1)

    /**
      *   咱們使用foreach算子直接遍歷dataRDD中的每一個元素使他們相加,但打印sum2的結果爲"0",其緣由以下:
      *     1>.將Driver中的dataRDD中3個不一樣分區數據發送給不一樣的Executor;
      *     2>.sum2屬於Driver中的變量,所以他要序列化sum2變量並其傳遞給各個Executor;
      *     3>.各個Executor使用Driver傳遞過來的變量進行計算,最終各個Exector算出sum2的結果,但此時各個Executor並無將計算結果發送給Driver;
      *
      *    綜上所述,各個Executor均有對應的sum2數據,但並無將計算結果發送給Driver進行累加,所以咱們看到的sum2始終是"0".
      *
      */
//    var sum2:Int = 0
//    dataRDD.foreach(data => sum2 += data)
//    println("sum2 = " + sum2)

    /**
      *   使用累加器來共享變量,用來累加數據,其工做原理以下:
      *     1>.將Driver中的dataRDD中3個不一樣分區數據發送給不一樣的Executor;
      *     2>.sum3屬於Driver中的變量,所以他要序列化sum3變量並其傳遞給各個Executor;
      *     3>.各個Executor使用Driver傳遞過來的變量進行計算,最終各個Executor算出sum3的結果;
      *     4>.最後spark發現sum3是一個累加器(Accumulator)變量,所以會將各個Executor的sum3的結果返回給Driver,由Driver將最終結果進行累加。
      */
    val sum3:LongAccumulator = sc.longAccumulator  //建立累加器對象sum3

    dataRDD.foreach{
      case data => {
        sum3.add(data)   //將每個元素和累加器相加
      }
    }
    println("sum3 = " + sum3.value)

    //釋放資源
    sc.stop()
  }
}

3>.自定義累加器

package com.yinzhengjie.bigdata.spark.rdd

import java.util

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2

/**
  *   自定義累加器步驟以下:
  *     1>.繼承AccumulatorV2;
  *     2>.實現抽象方法;
  *     3>.建立累加器
  */
class WordAccumulator extends AccumulatorV2[String, util.ArrayList[String]] {

  //定義一個集合,用於返回結果
  val list = new util.ArrayList[String]()

  //當前的累加器是否爲初始化狀態
  override def isZero: Boolean = {
    list.isEmpty
  }

  //複製累加器對象,根據你得需求自行實現
  override def copy(): AccumulatorV2[String, util.ArrayList[String]] = {
    new WordAccumulator()
  }

  //重置累加器對象
  override def reset(): Unit = {
    list.clear()
  }

  //往累加器中增長數據
  override def add(v: String): Unit = {
    if (v.contains("o")){
      list.add(v)   //判斷字符串是否包含字母"o",若是爲真則寫入累加器
    }
  }

  //合併累加器,即Driverd端將各個Executor返回的list集合的數據進行累加操做
  override def merge(other: AccumulatorV2[String, util.ArrayList[String]]): Unit = {
    list.addAll(other.value)
  }

  //獲取累加器的結果,咱們直接將定義的list集合返回
  override def value: util.ArrayList[String] = {
    list
  }
}


object CustomAccumulator {
  def main(args: Array[String]): Unit = {
    //建立spark配置信息
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD")

    //建立SparkContext
    val sc = new SparkContext(sparkConf)

    val dataRDD:RDD[String] = sc.makeRDD(Array("Hadoop","Spark","Flume","HBase","Sqoop","Flink","Storm","Hive"),3)

    //建立我們自定義的累加器
    val wordAccumulator = new WordAccumulator
    //註冊自定義累加器,目的是讓Spark知曉
    sc.register(wordAccumulator)
    //使用我們的累加器變量
    dataRDD.foreach{
      case data => {
        wordAccumulator.add(data)   //將每個元素和累加器相加
      }
    }

    //獲取累加器的值
    println("words = " + wordAccumulator.value)

    //釋放資源
    sc.stop()
  }
}
相關文章
相關標籤/搜索