第4課:Spark Streaming的Exactly-One的事務處理和不重複輸出完全掌握

本期內容:sql

1Exactly once 事務數據庫

什麼事Exactly once 事務?安全

數據僅處理一次而且僅輸出一次,這樣纔是完整的事務處理。微信

Spark在運行出錯時不能保證輸出也是事務級別的。在Task執行一半的時候出錯了,雖然在語義上作了事務處理,數據僅被處理一次,可是若是是輸出到數據庫中,那有空能將結果屢次保存到數據庫中。Spark在任務失敗時會進行重試,這樣會致使結果屢次保存到數據庫中。數據結構

以下圖,當運行在Executor上的Receiver接收到數據經過BlockManager寫入內存和磁盤,或者經過WAL機制寫記錄日誌,而後把metedata信息彙報給Driver。在Driver端按期進行checkpoint操做。Job的執行仍是基於Spark Core的調度模式在Executor上執行Task性能

Exactly once 事務的處理:大數據

1,數據零丟失:必須有可靠的數據來源和可靠的Receiver,且整個應用程序的metadata必須進行checkpoint,且經過WAL來保證數據安全。spa

咱們以數據來自Kafka爲例,運行在Executor上的Receiver在接收到來自Kafka的數據時會向Kafka發送ACK確認收到信息並讀取下一條信息,kafkaupdateOffset來記錄Receiver接收到的偏移,這種方式保證了在Executor數據零丟失。日誌


Driver端,按期進行checkpoint操做,出錯時從Checkpoint的文件系統中把數據讀取進來進行恢復,內部會從新構建StreamingContext(也就是構建SparkContext)並啓動,恢復出元數據metedata,再次產生RDD,恢復的是上次的Job,而後再次提交到集羣執行。orm

那麼數據可能丟失的地方有哪些呢和相應的解決方式?

    在Receiver收到數據且經過Driver的調度Executor開始計算數據的時候,若是Driver忽然奔潰,則此時Executor會被殺死,那麼Executor中的數據就會丟失(若是沒有進行WAL的操做)

解決方式:此時就必須經過例如WAL的方式,讓全部的數據都經過例如HDFS的方式首先進行安全性容錯處理。此時若是Executor中的數據丟失的話,就能夠經過WAL恢復回來。

這種方式的弊端是經過WAL的方式會極大額損傷SparkStreamingReceivers接收數據的性能。

數據重複讀取的狀況:

    在Receiver收到數據保存到HDFS等持久化引擎可是沒有來得及進行updateOffsets(Kafka爲例),此時Receiver崩潰後從新啓動就會經過管理KafkaZookeeper中元數據再次重複讀取數據,可是此時SparkStreaming認爲是成功的,可是kafka認爲是失敗的(由於沒有更新offsetZooKeeper),此時就會致使數據從新消費的狀況。

    解決方式:以Receiver基於ZooKeeper的方式,當讀取數據時去訪問Kafka的元數據信息,在處理代碼中例如foreachRDDtransform時,將信息寫入到內存數據庫中(memorySet),在計算時讀取內存數據庫信息,判斷是否已處理過,若是以處理過則跳過計算。這些元數據信息能夠保存到內存數據結構或者memsqlsqllite中。

 

若是經過Kafka做爲數據來源的話,Kafka中有數據,而後Receiver接收的時候又會有數據副本,這個時候實際上是存儲資源的浪費。

Spark1.3的時候爲了不WAL的性能損失和實現Exactly Once而提供了Kafka Direct API,把Kafka做爲文件存儲系統。此時兼具備流的優點和文件系統的優點,至此Spark Streaming+Kafka就構建了完美的流處理世界(1,數據不須要拷貝副本;2,不須要WAL對性能的損耗;3Kafka使用ZeroCopyHDFS更高效)。全部的Executors經過Kafka API直接消息數據,直接管理Offset,因此也不會重複消費數據。

 

2,輸出不重複

關於Spark Streaming數據輸出屢次重寫及其解決方案:

1,爲何會有這個問題,由於Spark Streaming在計算的時候基於Spark Core天生會作如下事情致使Spark Streaming的結果(部分)重複輸出。Task重試,慢任務推測,Stage重試,Job重試。

2,具體解決方案

設置spark.task.maxFailures次數爲1,這樣就不會有Task重試了。設置spark.speculation爲關閉狀態,就不會有慢任務推測了,由於慢任務推測很是消耗性能,因此關閉後能夠顯著提升Spark Streaming處理性能。

Spark Streaming On Kafka的話,Job失敗後能夠設置Kafka的參數auto.offset.resetlargest方式。

    最後再次強調能夠經過transformforeachRDD基於業務邏輯代碼進行邏輯控制來實現數據不重複消費和輸出不重複。這兩個方法相似於Spark Streaming的後門,能夠作任意想象的控制操做。

備註:

一、DT大數據夢工廠微信公衆號DT_Spark 二、IMF晚8點大數據實戰YY直播頻道號:68917580三、新浪微博: http://www.weibo.com/ilovepains

相關文章
相關標籤/搜索