sparkStreaming基本概念

概述

Spark Streaming 是 Spark Core API 的擴展, 它支持彈性的, 高吞吐的, 容錯的實時數據流的處理. 數據能夠經過多種數據源獲取, 例如 Kafka, Flume, Kinesis 以及 TCP sockets, 也能夠經過例如 map, reduce, join, window 等的高級函數組成的複雜算法處理. 最終, 處理後的數據能夠輸出到文件系統, 數據庫以及實時儀表盤中. 事實上, 你還能夠在 data streams(數據流)上使用 機器學習 以及 圖計算算法.html

運行原理算法

1542184745858

  1. sparkStreaming不斷的從Kafka等數據源獲取數據(連續的數據流),並將這些數據按照週期劃分紅爲batch
  2. 將每一個batch的數據提交給SparkEngine來處理(每一個batch的處理實際上仍是批處理,只不過批量很小,幾乎解決了實時處理)
  3. 整個過程是持續的,即不斷的接收數據並處理數據和輸出結果

DStream

  1. DStream : Discretized Stream 離散流
  2. 爲了便於理解,Spark Straming提出了DStream對象,表明一個接二連三的輸入流
  3. DStream是一個持續的RDD序列,每一個RDD表明一個計算週期(DStream裏面有多個RDD)
  4. 全部應用在DStream上的操做,都會被映射爲對DStream內部的RDD上的操做
  5. DStream本質上是一個以時間爲鍵,RDD爲值的哈希表,保存了按時間順序產生的RDD,。Spark Streaming每次將新產生的RDD添加到哈希表中,而對於已經再也不須要的RDD則會從這個哈希表中刪除,因此DStream也能夠簡單地理解爲以時間爲鍵的RDD的動態序列,。設批處理時間間隔爲1s,下圖爲4s內產生的DStream示意圖。streaming-dstream

初始化注意點:數據庫

  • 一旦一個 context 已經啓動,將不會有新的數據流的計算能夠被建立或者添加到它。.
  • 一旦一個 context 已經中止,它不會被從新啓動.
  • 同一時間內在 JVM 中只有一個 StreamingContext 能夠被激活.
  • 在 StreamingContext 上的 stop() 一樣也中止了 SparkContext 。爲了只中止 StreamingContext ,設置 stop() 的可選參數,名叫 stopSparkContext 爲 false.
  • 一個 SparkContext 就能夠被重用以建立多個 StreamingContexts,只要前一個 StreamingContext 在下一個StreamingContext 被建立以前中止(不中止 SparkContext).

DStream輸入源

  • Basic sources
    • file systems: sparkContext.textFileStream(dir)
      • 只監控指定文件夾中的文件,不監控裏面的文件夾
      • 以文件的修改時間爲準
      • 一旦開始處理,對文件的修改在當前窗口不會被讀取
      • 文件夾下面文件越多掃描時間越長(和文件是否修改無關)
      • hdfs在打開輸出流的時候就設置了更新時間,這個時候write操做還未完成就被讀,能夠先將文件寫到一個未被監控的文件夾,待write 操做完成後,再移入監控的文件夾中
    • socket connections:sparkContext.socketTextStream()
    • Akka actors
  • Advanced sources
    • Kafka: KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
    • Flume
    • Kinesis
    • Twitter
  • multiple input stream
    • ssc.union(Seq(stream1,stream2,...))
    • stream1.union(stream2)

Batch durationapache

對於源源不斷的數據,Spark Streaming是經過切分的方式,先將連續的數據流進行離散化處理。數據流每被切分一次,對應生成一個RDD,每一個RDD都包含了一個時間間隔內所獲取到的全部數據。數組

批處理時間間隔的設置會伴隨Spark Streaming應用程序的整個生命週期,沒法在程序運行期間動態修改網絡

  1. duration設置:new StreamingContext(sparkConf,Seconds(1))
  2. Spark Streaming按照設置的batch duration來積累數據,週期結束時把週期內的數據做爲一個RDD,並添加任務給Spark Engine
  3. batch duration的大小決定了Spark Streaming提交做業的頻率和處理延遲
  4. batch duration大小設定取決於用戶需求,通常不會太大

Receiver接收器

1542187582639

  1. 除了FileInputDStream,其他輸入源都會關聯一個Receiver。
  2. receiver以任務的形式運行在應用的執行器進程中,從輸入源收集數據並保存爲RDD。
  3. receiver會將接收到的數據複製到另外一個工做節點上進行加工處理。
  4. core的最小數量是2,一個負責接收,一個負責處理(fileSysInput除外)
  5. 分配給處理數據的cores應該多餘分配給receivers的數量

轉換操做app

  • 無狀態操做
    • 和spark core語義一致
    • 對DStream的transform操做,實際做用於DStream中的每個RDD
    • 若是DStream沒有提供RDD操做,可經過transform函數實現,dstream.transform(fun)
    • 不能跨多個batch中的RDD執行
  • 有狀態操做
    • updateStateByKey :定一個一個狀態和更新函數,返回新的狀態,updateStateByKey必須配置檢查點
    • window: 流式計算是週期性進行的,有時處理處理當前週期的數據,還須要處理最近幾個週期的數據,這時候就須要窗口操做方法了。咱們能夠設置數據滑動窗口,將數個原始Dstream合併成一個窗口DStream。window操做默認執行persist in mermory
    • streaming-dstream-window
      • windowDuration: 窗口時間間隔又稱爲窗口長度,它是一個抽象的時間概念,決定了Spark Streaming對RDD序列進行處理的範圍與粒度,即用戶能夠經過設置窗口長度來對必定時間範圍內的數據進行統計和分析
      • bathDuration: batch大小
      • 每次計算的batch數:windowDuration/batchDuration
      • slideDuration: 滑動時間間隔,控制多長時間計算一次默認和batchDuration相等

操做規約機器學習

普通規約是每次把window裏面每一個RDD都計算一遍,增量規約是每次只計算新進入window的數據,而後減去離開window的數據,獲得的就是window數據的大小,在使用上,增量規約須要提供一個規約函數的逆函數,好比+對應的逆函數爲-socket

  • 普通規約:val wordCounts=words.map(x=>(x,1)).reduceByKeyAndWindow(_+_,Seconds(5s),seconds(1))ide

  • 增量規約:val wordCounts=words.map(x=>(x+1)).reduceByKeyAndWindow(_+_,_-_,Seconds(5s),seconds(1))

DStream輸出

  1. 輸出操做:print,foreachRDD,saveAsObjectFiles,saveAsTextFiles,saveAsHadoopFiles
  2. 碰到輸出操做時開始計算求值
  3. 輸出操做特色:惰性求值
  4. 最佳創建連接的方式
// 1. con't not create before foreachPartition function(cont't create in driver)
// 2. use foreachPartition instead of foreach
// 3. use connect pool instead of create connect every time
dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

DSream持久化

  1. 默認持久化:MEMORY_ONLY_SER
  2. 對於來源於網絡的數據源(kafka,flume等): MEMORY_AND_DISK_SER_2
  3. 對於window操做默認進行MEMORY_ONLY持久化

checkpoint容錯

sparkStreaming 週期性的把應用數據存儲到HDFS等可靠的存儲系統中能夠供回覆時使用的機制叫作檢查點機制,

做用:

  1. 控制發生失敗時須要計算的狀態數:經過lineage重算,檢查點機制能夠控制須要在Lineage中回溯多遠
  2. 提供驅動器程序(driver)的容錯:能夠從新啓動驅動程序,並讓驅動程序從檢查點恢復,這樣spark streaming就能夠讀取以前運行的程序處理數據的進度,並從哪裏開始繼續。

數據類型:

  • Metadata(元數據): streaming計算邏輯,主要來恢復driver。

    • Configuration:配置文件,用於建立該streaming application的全部配置

    • DStream operations:對DStream進行轉換的操做集合

    • Incomplete batches:未完成batchs,那些提交了job在隊列等待還沒有完成的job信息。

  • Data checkpointing: 已經生成的RDD但還未保存到HDFS或者會影響後續RDD的生成。

注意點

  1. 對於window和stateful操做必須指定checkpint
  2. 默認按照batch duration來作checkpoint

Checkpoint類

checkpoint的形式是將類CheckPoint的實例序列化後寫入外部內存

1542198071413

缺點

SparkStreaming 的checkpoint機制是對CheckPoint對象進行序列化後的數據進行存儲,那麼SparkStreaming Application從新編譯後,再去反序列化checkpoint數據就會失敗,這個時候必須新建StreamingContext

針對這種狀況,在結合SparkStreaming+kafka的應用中,須要自行維護消費offsets,這樣即便從新編譯了application,仍是能夠從須要的offsets來消費數據。對於其餘狀況須要結合實際的需求進行處理。

使用

checkpoint的時間間隔正常狀況下應該是sliding interval的5-10倍,可經過dstream.checkpoint(checkpointInterval)配置每一個流的interval。

若是想要application能從driver失敗中恢復,則application須要知足

  • 若application首次重啓,將建立一個新的StreamContext實例
  • 若application從失敗中重啓,將會從chekcpoint目錄導入chekpoint數據來從新建立StreamingContext實例
def createStreamingContext()={
    ...
    val sparkConf=new SparkConf().setAppName("xxx")
    val ssc=new StreamingContext(sparkConf,Seconds(1))
    ssc.checkpoint(checkpointDir)
}
...
val ssc=StreamingContext.getOrCreate(checkpointDir,createSreamingContext _)

Accumulators, Broadcast Variables, and Checkpoints

在sparkStreaming中累加器和廣播變量不可以在checkpoints中恢復,廣播變量是在driver上執行的,可是當driver重啓後並無執行廣播,當slaves調用廣播變量時報Exception: (Exception("Broadcast variable '0' not loaded!",)

能夠爲累加器和廣播變量建立延遲實例化的單例實例,以便在驅動程序從新啓動失敗後從新實例化它們

問題參考:https://issues.apache.org/jira/browse/SPARK-5206

容錯

系統的容錯主要從三個方面,接收數據,數據處理和輸出數據,在sparkStreaming中,接收數據和數據來源有關係,處理數據能夠保證exactly once,輸出數據能夠保證at least once。

輸入容錯

sparStreaming並不能徹底的像RDD那樣實現lineage,由於其有的數據源是經過網絡傳輸的,不可以重複獲取。

接收數據根據數據源不一樣容錯級別不一樣

  • with file:經過hdfs等文件系統中讀取數據時能夠保證exactly-once
  • with reciever-base-source:
    • reliable reciever:當reciever接收失敗時不給數據源答覆接收成功,在reciever重啓後繼續接收
    • unreliable reciever:接收數據後不給數據源返回接收結果,則數據源也不會再次下發數據

sparkStreaming經過write-ahead-logs 提供了at least once的保證。在spark1.3版本以後,針對kafka數據源,能夠作到exactly once ,更多內容

輸出容錯

相似於foreachRdd操做,能夠保證at least once,若是輸出時想實現exactly once可經過如下兩種方式:

  • Idempotent updates:冪等更新,屢次嘗試將數據寫入同一個文件
  • Transactional updates:事物更新,實現方式:經過batch time和the index of rdd實現RDD的惟一標識,經過惟一標識去更新外部系統,即若是已經存在則跳過更新,若是不存在則更新。eg:
dstream.foreachRDD { (rdd, time) =>
    rdd.foreachPartition { partitionIterator =>
      val partitionId = TaskContext.get.partitionId()
      val uniqueId = generateUniqueId(time.milliseconds, partitionId)
      // use this uniqueId to transactionally commit the data in partitionIterator
    }
  }

調優

sparkStreaming調優主要從兩方面進行:開源節流——提升處理速度和減小輸入數據。

  • 行時間優化
    • 設置合理的批處理時間和窗口大小
    • 提升並行度
      • 增長接收器數目
      • 將接收到數據從新分區
      • 提升聚合計算的並行度,例如對reduceByKey等shuffle操做設置跟高的並行度
  • 內存使用與垃圾回收
    • 控制批處理時間間隔內的數據量
    • 及時清理再也不使用的數據
    • 減小序列化和反序列化負擔

詳情參考:http://spark.apache.org/docs/latest/tuning.html#level-of-parallelism

原文:streaming-programing-guide

相關文章
相關標籤/搜索