spark版本定製四:Spark Streaming的Exactly-One的事務處理

本期內容:數據庫

一、Exactly once緩存

二、輸出不重複安全

 事務概念:好比銀行轉帳,數據必定會被處理,且只被處理一次,可以輸出,且只能輸出一次,A轉帳給B只輸出一次,B接收且只接收一次,雙方要麼同時成功或者同時失敗!微信

1、Exactly once

Spark Core是怎麼處理一個Job的?具體過程以下圖所示:架構

 

流程說明:InputStream不斷的輸入數據,Executor中的Reciver不停的接收數據,爲了保證接收到的數據不丟失,Receiver把接收到數據經過BlockManager寫入內存和磁盤,或者經過WAL機制記錄日誌,而後把元數據metedata信息彙報給Driver,Driver收到數據後,爲了數據安全,在運行前會把元數據寫入磁盤或者HDFS上(checkpointer),最後真正執行Job是在Executor,固然能夠有多個Executor;性能

可能出現的問題:spa

      問題:Receiver中接收到的數據達到必定的閾值後,纔會觸發WAL,若是數據收到一半,Receiver掛掉了,也會存在數據丟失的可能?日誌

      解決:經過Kafka發送數據給Receiver,若是Receiver掛掉,數據會緩存在Kafka中,待Receiver恢復後,會從新接收到Kafka中的數據;orm

 

數據丟失及其具體的解決方式:blog

在Receiver收到數據且經過Driver的調度Executor開始計算數據的時候若是Driver忽然崩潰,則此時Executor會被Kill掉,那麼Executor中的數據就會丟失,此時就必須經過例如WAL的方式讓全部的數據都經過例如HDFS的方式首先進行安全性容錯處理,此時若是Executor中的數據丟失的話就能夠經過WAL恢復回來;

 

Exactly Once的事務處理:

1,數據零丟失:必須有可靠的數據來源和可靠的Receiver,且整個應用程序的metadata必須進行checkpoint,且經過WAL來保證數據安全;

2,Spark Streaming 1.3的時候爲了不WAL的性能損失和實現Exactly Once而提供了Kafka Direct API,把Kafka做爲文件存儲系統!!!此時兼具備流的優點和文件系統的優點,至此,Spark Streaming+Kafka就構建了完美的流處理世界!!!全部的Executors經過Kafka API直接消費數據,直接管理Offset,因此也不會重複消費數據;實務實現啦!!!此時數據在Kafka中因此必定會被處理,且只被處理一次;

 

爲何說Spark Streaming+Kafka的方式是完美的呢?

1.數據不須要拷貝副本;

2.不須要進行WAL,避免了沒必要要的性能損耗;

3.Kafka比Hdfs高效不少,由於Kafka內部會進行memory-copy;

 

2、輸出不重複

爲了解決第一點中出現的各種問題,演變出了以下的架構圖:

 

數據重複讀取的狀況:

在Receiver收到數據且保存到了HDFS等持久化引擎可是沒有來得及進行updateOffsets,此時Receiver崩潰後從新啓動就會經過管理Kafka的ZooKeeper中元數據再次重複讀取數據,可是此時SparkStreaming認爲是成功的,可是Kafka認爲是失敗的(由於沒有更新offset到ZooKeeper中),此時就會致使數據從新消費的狀況。

如今Zookeeper和Kafka都很成熟了,因此數據重複讀取的狀況出現的機率較小;

 

問題:數據重複消費的問題怎麼解決?

答:能夠在程序中處理,程序讀取到元數據後,放入內存數據庫中,每次處理的時候讀取內存數據庫,處理事後打個標記,下次再處理的時候經過標記判斷,已處理的就再也不處理

      第一點中的「Exactly Once的事務處理」第二小點,Spark Streaming 1.3的時候爲了不WAL的性能損失和實現Exactly Once而提供了Kafka Direct API,自然的解決了數據重複消費的問題;

 

性能損失:

1,經過WAL方式會極大的損傷Spark Streaming中Receivers接受數據的性能;(實際生產環境中,用Receiver的狀況並很少,更多的是直接基於Kafka  API進行處理)

2,若是經過Kafka的做爲數據來源的話,Kafka中有數據,而後Receiver接受的時候又會有數據副本,這個時候實際上是存儲資源的浪費;

 

關於Spark Streaming數據輸出屢次重寫及其解決方案:

1,爲何會有這個問題,由於Spark Streaming在計算的時候基於Spark Core,Spark Core天生會作如下事情致使Spark Streaming的結果(部分)重複輸出:

  Task重試;

  慢任務推測

  Stage重複;

  Job重試;

2,具體解決方案:

  設置spark.task.maxFailures次數爲1;最大容許失敗的次數,設爲1就沒有task、stage、job等的重試;

  設置spark.speculation爲關閉狀態(由於慢任務推測其實很是消耗性能,因此關閉後能夠顯著提升Spark Streaming處理性能)

  Spark Streaming on Kafka的話,Job失敗會致使任務失敗,Job失敗後能夠設置auto.offset.reset爲「largest」的方式;

 

最後再次強調能夠經過transform和foreachRDD基於業務邏輯代碼進行邏輯控制來實現數據不重複消費和輸出不重複!這兩個方式相似於Spark Streaming的後門,能夠作任意想象的控制操做!

 

特別感謝王家林老師的獨具一格的講解:

王家林老師名片:

中國Spark第一人

新浪微博:http://weibo.com/ilovepains

微信公衆號:DT_Spark

博客:http://blog.sina.com.cn/ilovepains

QQ:1740415547

YY課堂:天天20:00現場授課頻道68917580

相關文章
相關標籤/搜索