論Spark Streaming的數據可靠性和一致性

摘要:Spark Streaming自發布起就獲得了普遍的關注,然而做爲一個年輕的項目,須要提高的地方一樣不少,好比1.2以前版本driver掛掉可能會丟失數據。這裏將分析它的可靠性機制。緩存


眼下大數據領域最熱門的詞彙之一即是流計算了,其中最耀眼的項目無疑是來自Spark社區的Spark Streaming項目,其從一誕生就受到普遍關注並迅速發展,目前已有追趕並超越Storm的架勢。安全

對於流計算而言,毫無疑問最核心的特色是它的低時延能力,這主要是來自對數據不落磁盤就進行計算的內部機制,但這也帶來了數據可靠性的問題,即有節點失效或者網絡異常時,如何在節點間進行合適的協商來進行重傳。更進一步的,若發生計劃外的數據重傳,怎麼能保證沒有產生重複的數據,全部數據都是精確一次的(Exact Once)?若是不解決這些問題,大數據的流計算將沒法知足大多數企業級可靠性要求而流於徒有虛名。微信

本文將重點分析Spark Streaming是如何設計可靠性機制並實現數據一致性的。網絡

Driver HA

因爲流計算系統是長期運行、數據不斷流入的,所以其Spark守護進程(Driver)的可靠性是相當重要的,它決定了Streaming程序可否一直正確地運行下去。運維

圖一 Driver數據持久化機器學習

Driver實現HA的解決方案就是將元數據持久化,以便重啓後的狀態恢復。如圖一所示,Driver持久化的元數據包括:socket

  • Block元數據(圖一中的綠色箭頭):Receiver從網絡上接收到的數據,組裝成Block後產生的Block元數據;編輯器

  • Checkpoint數據(圖一中的橙色箭頭):包括配置項、DStream操做、未完成的Batch狀態、和生成的RDD數據等;ide



圖二 Driver故障恢復oop

Driver失敗重啓後:

  • 恢復計算(圖二中的橙色箭頭):使用Checkpoint數據重啓driver,從新構造上下文並重啓接收器。

  • 恢復元數據塊(圖二中的綠色箭頭):恢復Block元數據。

  • 恢復未完成的做業(圖二中的紅色箭頭):使用恢復出來的元數據,再次產生RDD和對應的job,而後提交到Spark集羣執行。


經過如上的數據備份和恢復機制,Driver實現了故障後重啓、依然能恢復Streaming任務而不丟失數據,所以提供了系統級的數據高可靠。

可靠的上下游IO系統

流計算主要經過網絡socket通訊來實現與外部IO系統的數據交互。因爲網絡通訊的不可靠特色,發送端與接收端須要經過必定的協議來保證數據包的接收確認、和失敗重發機制。

不是全部的IO系統都支持重發,這至少須要實現數據流的持久化,同時還要實現高吞吐和低時延。在Spark Streaming官方支持的data source裏面,能同時知足這些要求的只有Kafka,所以在最近的Spark Streaming release裏面,也是把Kafka當成推薦的外部數據系統。

除了把Kafka當成輸入數據源(inbound data source)以外,一般也將其做爲輸出數據源(outbound data source)。全部的實時系統都經過Kafka這個MQ來作數據的訂閱和分發,從而實現流數據生產者和消費者的解耦。

一個典型的企業大數據中心數據流向視圖以下所示:


圖三 企業大數據中心數據流向視圖

除了從源頭保證數據可重發以外,Kafka更是流數據Exact Once語義的重要保障。Kafka提供了一套低級API,使得client能夠訪問topic數據流的同時也能訪問其元數據。Spark Streaming的每一個接收任務能夠從指定的Kafka topic、partition和offset去獲取數據流,各個任務的數據邊界很清晰,任務失敗後能夠從新去接收這部分數據而不會產生「重疊的」數據,於是保證了流數據「有且僅處理一次」。

可靠的接收器

在Spark 1.3版本以前,Spark Streaming是經過啓動專用的Receiver任務來完成從Kafka集羣的數據流拉取。

Receiver任務啓動後,會使用Kafka的高級API來建立topicMessageStreams對象,並逐條讀取數據流緩存,每一個batchInerval時刻到來時由JobGenerator提交生成一個spark計算任務。

因爲Receiver任務存在宕機風險,所以Spark提供了一個高級的可靠接收器-ReliableKafkaReceiver類型來實現可靠的數據收取,它利用了Spark 1.2提供的WAL(Write Ahead Log)功能,把接收到的每一批數據持久化到磁盤後,更新topic-partition的offset信息,再去接收下一批Kafka數據。萬一Receiver失敗,重啓後還能從WAL裏面恢復出已接收的數據,從而避免了Receiver節點宕機形成的數據丟失(如下代碼刪除了細枝末節的邏輯):

class ReliableKafkaReceiver{  private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, Long] = null  private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] = null  override def onStart(): Unit = {    // Initialize the topic-partition / offset hash map.    topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]    // Initialize the block generator for storing Kafka message.    blockGenerator = new BlockGenerator(new GeneratedBlockHandler, streamId, conf)    messageHandlerThreadPool = Utils.newDaemonFixedThreadPool(      topics.values.sum, "KafkaMessageHandler")    blockGenerator.start()    val topicMessageStreams = consumerConnector.createMessageStreams(      topics, keyDecoder, valueDecoder)    topicMessageStreams.values.foreach { streams =>      streams.foreach { stream =>        messageHandlerThreadPool.submit(new MessageHandler(stream))      }    }  }

啓用WAL後雖然Receiver的數據可靠性風險下降了,但卻因爲磁盤持久化帶來的開銷,系統總體吞吐率會有明顯的降低。所以,在最新發布的Spark 1.3版本里,Spark Streaming增長了使用Direct API的方式來實現Kafka數據源的訪問。

引入了Direct API後,Spark Streaming再也不啓動常駐的Receiver接收任務,而是直接分配給每一個Batch及RDD最新的topic partition offset。job啓動運行後Executor使用Kafka的simple consumer API去獲取那一段offset的數據。

這樣作的好處不只避免了Receiver宕機帶來的數據可靠性風險,同時也因爲避免使用ZooKeeper作offset跟蹤,而實現了數據的精確一次性(如下代碼刪除了細枝末節的邏輯):

class DirectKafkaInputDStream{  protected val kc = new KafkaCluster(kafkaParams)  protected var currentOffsets = fromOffsets  override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {    val untilOffsets = clamp(latestLeaderOffsets(maxRetries))    val rdd = KafkaRDD[K, V, U, T, R](      context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)    currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)    Some(rdd)  }

預寫日誌 Write Ahead Log

Spark 1.2開始提供了預寫日誌能力,用於Receiver數據及Driver元數據的持久化和故障恢復。WAL之因此能提供持久化能力,是由於它利用了可靠的HDFS作數據存儲。

Spark Streaming預寫日誌機制的核心API包括:

  • 管理WAL文件的WriteAheadLogManager

  • 讀/寫WAL的WriteAheadLogWriter和WriteAheadLogReader

  • 基於WAL的RDD:WriteAheadLogBackedBlockRDD

  • 基於WAL的Partition:WriteAheadLogBackedBlockRDDPartition


以上核心API在數據接收和恢復階段的交互示意圖如圖四所示。


圖四 基於WAL的數據接收和恢復示意圖

從WriteAheadLogWriter的源碼裏能夠清楚地看到,每次寫入一塊數據buffer到HDFS後都會調用flush方法去強制刷入磁盤,而後纔去取下一塊數據。所以receiver接收的數據是能夠保證持久化到磁盤了,於是作到了較好的數據可靠性。

private[streaming] class WriteAheadLogWriter{  private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf)  def write(data: ByteBuffer): WriteAheadLogFileSegment = synchronized {    data.rewind() // Rewind to ensure all data in the buffer is retrieved    val lengthToWrite = data.remaining()    val segment = new WriteAheadLogFileSegment(path, nextOffset, lengthToWrite)    stream.writeInt(lengthToWrite)    if (data.hasArray) {      stream.write(data.array())    } else {      while (data.hasRemaining) {        val array = new Array[Byte](data.remaining)        data.get(array)        stream.write(array)      }    }    flush()    nextOffset = stream.getPos()    segment  }

結束語

得益於Kafka這類可靠的data source、以及自身的checkpoint/WAL等機制,Spark Streaming的數據可靠性獲得了很好的保證,數據能保證「至少一次」(at least once)被處理。但因爲其outbound端的一致性實現還未完善,所以Exact once語義仍然不能端到端保證。Spark Streaming社區已經在跟進這個特性的實現(SPARK-4122),預計很快將合入trunk發佈。

推薦閱讀:

1,金融反欺詐場景下的Spark實踐

2,spark源碼系列以內部通信的三種機制

3,Spark源碼系列之spark2.2的StructuredStreaming使用及源碼介紹

4,Spark調優系列之序列化方式調優




關於Spark高級玩法

kafkahbasespark,Flink等入門到深刻源碼,spark機器學習,大數據安全,大數據運維,請關注浪尖公衆號,看高質量文章。

更多文章,敬請期待


本文分享自微信公衆號 - 浪尖聊大數據(bigdatatip)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索