Spark Streaming的事務處理

本期內容數據庫

  1. exactly once安全

  2. 輸入不重複併發

  3. 輸出不重複性能

 

exactly once :有且僅被執行一次。(很少,很多,一次恰好)atom

首先和你們聊下概念:spa

事務是恢復和併發控制的基本單位。日誌

事務應該具備4個屬性:原子性、一致性、隔離性、持久性。這四個屬性一般稱爲ACID特性orm

原子性(atomicity)。一個事務是一個不可分割的工做單位,事務中包括的諸操做要麼都作,要麼都不作。htm

一致性(consistency)。事務必須是使數據庫從一個一致性狀態變到另外一個一致性狀態。一致性與原子性是密切相關的。隊列

隔離性(isolation)。一個事務的執行不能被其餘事務干擾。即一個事務內部的操做及使用的數據對併發的其餘事務是隔離的,併發執行的各個事務之間不能互相干擾。

持久性(durability)。持久性也稱永久性(permanence),指一個事務一旦提交,它對數據庫中數據的改變就應該是永久性的。接下來的其餘操做或故障不該該對其有任何影響。

簡單記憶法則(一持原隔)

如:銀行轉帳,A向B轉帳500元,這個步驟能夠分爲A扣500元,B加500元 兩部分。

若是 A減500元成功後,在B加500元的時候失敗了,那麼A減掉的500就不生效。也就是說。要麼兩個操做都成功,要麼兩個操做都失敗。

 

先了解下SparkStreaming的數據流轉流程

 

數據一致性的要求:

  1. 數據源可靠。數據源產生出來後,萬一crash要能夠恢復。數據存在kafka是很好的選擇,既能夠存儲,又能夠作高吞吐消息隊列

  2. Receiver可靠。以數據來自Kafka爲例。運行在Executor上的Receiver在接收到來自Kafka的數據時會向Kafka發送ACK確認收到信息並讀取下一條信息,kafkaupdateOffset來記錄Receiver接收到的偏移,這種方式保證了在Executor數據零丟失。

  3. Driver可靠。checkpoint可解決。

 

下面是幾個數據非一致性的場景及解決方案

輸入不丟失

 

數據丟失的場景:

  在Receiver收到數據且經過Driver的調度,Executor開始計算數據的時候,若是Driver忽然崩潰,此時Executor也會被Kill掉,那麼Executor中的數據就會丟失,此時就必須經過WAL機制讓全部的數據經過相似HDFS的方式進行安全性容錯處理,從而解決Executor被Kill掉後致使數據丟失的問題。

  數據重複讀取的場景:

  在Receiver收到數據且保存到了HDFS時,若是Receiver崩潰,且此時沒有來得及更新ZooKeeper上的offsets,那麼Receiver從新啓動後就會從管理Kafka的ZooKeeper中再次讀取元數據從而致使重複讀取元數據;從Spark Streaming來看是成功的,可是Kafka認爲是失敗的(由於Receiver崩潰時沒有及時更新offsets到ZooKeeper中)從新恢復時會從新消費一次,此時會致使數據從新消費的狀況。

  Spark 1.3以前的版本在這個場景下其實有性能問題:

 

  1. 經過WAL方式保證數據不丟失,但弊端是經過WAL方式會極大的損傷Spark Streaming中的Receiver接收數據的性能(現網生產環境一般會Kafka Direct  Api直接處理)。

  2. 若是經過Kafka做爲數據來源的話,Kafka中有數據,而後Receiver接收數據的時候又會有數據副本,這個時候實際上是存儲資源的浪費。(重複讀取數據解決辦法,讀取數據時能夠將元數據信息放入內存數據庫中,再次計算時檢查元數據是否被計算過)。

  Spark從1.3版本開始,爲了不WAL的性能損失和實現Exactly Once而提供了Kafka Direct Api,把Kafka做爲文件存儲系統。此時Kafka兼具備流的優點和文件系統的優點,至此,Spark Streaming+Kafka就構建了完美的流處理世界!

  數據不須要拷貝副本,不須要WAL性能損耗,不須要Receiver,而直接經過Kafka Direct Api直接消費數據,全部的Executors經過Kafka Api直接消費數據,直接管理offset,因此也不會重複消費數據;事務實現啦!

 

WAL的弊端:

WAL也不能徹底的解決數據丟失的問題,就像Oracle同樣,日誌文件的寫,也是先寫到內存中,而後根據必定的觸發條件再將數據寫到磁盤。若是尚未來的及寫WAL日誌,此時數據也會有不一致的狀況(數據已經接收,可是尚未寫到WAL的這部分數據是恢復不出來的。)。

 

輸出不重複

  爲何會有這個問題,由於SparkStreaming在計算的時候基於SparkCore,SparkCore天生會在下列場景中致使輸出重複:

  1.Task、Stage甚至Job重試;

  2.慢任務推測;

      3.消息偏移量未及時更新;

  會致使數據的丟失。

 

  對應的解決方案:

  1.一個任務失敗就是job 失敗,設置spark.task.maxFailures次數爲1;再也不重試。

  2.設置spark.speculation爲關閉狀態(由於慢任務推測其實很是消耗性能,因此關閉後能夠顯著的提升Spark Streaming處理性能)

  3.Spark streaming on kafka的話,假如job失敗後能夠設置kafka的auto.offset.reset爲largest的方式會自動恢復job的執行。

 

 

  最後再次強調:

  能夠經過transform和foreachRDD基於業務邏輯代碼進行邏輯控制來實現數據不重複消費和輸出不重複!這二個方法相似於spark的後門,能夠作任意想象的控制操做!

相關文章
相關標籤/搜索