基於Spark2.X系列的累加器和Streaming基礎 Spark學習之編程進階總結(一) Spark學習之Spark Streaming

1、累加器API

  關於累加器,前面我也寫了一篇博客,順便粘貼這兒,對比學習,Spark學習之編程進階總結(一)。Spark 2.0系列引入了一個更加簡單和更高性能的累加器API,如在1.X版本中能夠這樣使用累加器:html

    val sparkSession = SparkSession.builder().master("local").appName("wordcount").getOrCreate()
    val sc = sparkSession.sparkContext
    sc.setLogLevel("WARN")
    
    // Spark1.x版本 定義累加器,這裏直接使用SparkContext內置的累加器
    val accum = sc.accumulator(0, "My Accumulator")
    sc.parallelize(Array(1,2,3,4)).foreach(x=>accum+=x)
    // 獲取累加器的值,Executor上面只能對累加器進行累加操做,只要Driver才能讀取累加器的值
    // Driver讀取值的時候會把各個Executor上存儲的本地累加器的值加起來,這裏的結果是10
    println(accum) // 輸出10

  在Spark 2.X版本里使用SparkContext裏內置的累加器:sql

    // 與Spark1.x不一樣的是,須要指定累加器的類型,目前SparkContext有Long類型和Double類型
    // 能夠直接使用,不須要指定初始值
    val accum1 = sc.longAccumulator("My Accumulator")
    sc.parallelize(Array(1,2,3,4)).foreach(x=>accum1.add(x))
    println(accum1.value) // 輸出10

  只使用SparkContext裏內置的累加器功能確定不能知足略微複雜的業務類型,此時咱們就能夠自定義累加器。這裏給出Spark2.X自定義累加器的參考代碼apache

  在Scala中有一種方式來自定義累加器,用戶只須要繼承Accumulable,就能夠把元素和返回值定義爲不一樣的類型,這樣咱們就能夠完成添加操做(如往Int類型的List裏添加整數,此時元素爲Int類型,而返回類型爲List)。編程

  在Spark 2.X中加入一個新的抽象類—AccumulatorV2,繼承這個類要實現如下幾種方法:
   add方法:指定元素相加操做。
   copy方法:指定對自定義累加器的複製操做。
   isZero方法:返回該累加器的值是否爲「零」。
   merge方法:合併兩個相同類型的累加器。
   reset方法:重置累加器。
   value方法:返回累加器當前的值。
  重寫這幾種方法以後,只需實例化自定義累加器,並連同累加器名字一塊兒傳給sparkContext.register方法。
  下面簡單實現一個把字符串合併爲數組的累加器:數組

// 首先要繼承AccumulatorV2,並指定輸入爲String類型,輸出爲ArrayBuffer[String]
class MyAccumulator extends AccumulatorV2[String,ArrayBuffer[String]]{
  // 設置累加器的結果,類型爲ArrayBuffer[String]
  private var result = ArrayBuffer[String]()
  
  // 判斷累加器當前值是否爲零值,這裏咱們指定若是result的size爲0,則累加器當前值是零值
  override def isZero:Boolean = this.result.size == 0
  
  // copy方法設置爲新建本累加器,並把result賦給新的累加器
  override def copy():AccumulatorV2[String,ArrayBuffer[String]] = {
    val newAccum = new MyAccumulator
    newAccum.result = this.result
    newAccum
  }
  
  // reset 方法設置爲把result設置爲新的ArrayBuffer
  override def reset():Unit = this.result = new ArrayBuffer[String]()
  
  // add 方法把傳進來的字符串添加到result中
  override def add(v:String):Unit = this.result+=v
  
  // merge 方法把兩個累加器的result合併起來
  override def merge(other:AccumulatorV2[String,ArrayBuffer[String]]):Unit={
    result.++=:(other.value)
  }
  
  // value 方法返回result
  override def value:ArrayBuffer[String] = this.result
}
    // 接着在main方法裏使用累加器
    val Myaccum = new MyAccumulator()
    // 向SparkContext註冊累加器
    sc.register(Myaccum)
    // 把 a b c d添加進累加器的result數組並打印出來
    sc.parallelize(Array("a","b","c","d")).foreach(x=>Myaccum.add(x))
    println(Myaccum.value)  // 輸出ArrayBuffer(a, b, c, d)

2、Spark2.X Streaming

  關於Streaming我之前也有一篇博客,這裏粘貼出來對比學習。Spark學習之Spark Streaming。這裏寫一個程序來監聽網絡端口發來的內容,而後進行WordCount。網絡

   第一步:建立程序入口SparkSession,並引入spark.implicits來容許Scalaobject隱式轉換爲DataFrame。
   第二步:建立流。配置從socket讀取流數據,地址和端口爲localhost:9999
   第三步:進行單詞統計。這裏lines是DataFrame,使用as[String]給它定義類型轉換爲DataSet。以後在DataSet裏進行單詞統計。  
   第四步:建立查詢句柄,定義打印結果方式並啓動程序。這裏使用writeStream方法,輸出模式爲所有輸出到控制檯。app

import org.apache.spark.sql.SparkSession

object StreamingTest {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local").appName("StructuredNetworkCount").getOrCreate()
    val sc = spark.sparkContext
    sc.setLogLevel("WARN")
    // 上面變量必定也得爲spark
    import spark.implicits._
    val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
    val words = lines.as[String].flatMap(_.split(" "))
    val wordcount = words.groupBy("value").count()
    val query = wordcount.writeStream.outputMode("complete").format("console").start()
    // 調用awaitTermination方法來防止程序在處理數據時中止
    query.awaitTermination()
  }
}

  運行上面的程序首先nc軟件要先運行起來,鏈接成功後再輸入下面的字符串。socket

  

  而後控制檯打印的結果爲:ide

  

  Streaming的關鍵思想如圖所示:把數據流視做一張數據不斷增長的表,這樣用戶就能夠基於這張表進行數據處理,就好像使用批處理來處理靜態數據同樣,但實際上Spark底層是把新數據不斷地增量添加到這張無界的表的下一行中。post

  

  Structured Streaming共有3種輸出模式,這3種模式都只適用於某些類型的查詢:

  (1)CompleteMode:完整模式。整個更新的結果表將被寫入外部存儲器。由存儲鏈接器決定如何處理整張表的寫入。聚合操做以及聚合以後的排序操做支持這種模式。
  (2)AppendMode:附加模式。只有自上次觸發執行後在結果表中附加的新行會被寫入外部存儲器。這僅適用於結果表中的現有行不會更改的查詢,如select、where、map、flatMap、filter、join等操做支持這種模式。
  (3)UpdateMode:更新模式(這個模式將在之後的版本中實現)。只有自上次觸發執行後在結果表中更新的行將被寫入外部存儲器(不輸出未更改的行)。

   這篇博文主要來自《Spark大數據商業實戰三部曲》這本書裏面的第二章,內容有刪減,還有本書的一些代碼的實驗結果。

相關文章
相關標籤/搜索