Spark Streaming場景應用-Kafka數據讀取方式

概述

Spark Streaming 支持多種實時輸入源數據的讀取,其中包括Kafka、flume、socket流等等。除了Kafka之外的實時輸入源,因爲咱們的業務場景沒有涉及,在此將不會討論。本篇文章主要着眼於咱們目前的業務場景,只關注Spark Streaming讀取Kafka數據的方式。 Spark Streaming 官方提供了兩種方式讀取Kafka數據:api

  • 一是Receiver-based Approach。該種讀取模式官方最早支持,並在Spark 1.2提供了數據零丟失(zero-data loss)的支持;
  • 一是Direct Approach (No Receivers)。該種讀取方式在Spark 1.3引入。

此兩種讀取方式存在很大的不一樣,固然也各有優劣。接下來就讓咱們具體剖解這兩種數據讀取方式。網絡

1、Receiver-based Approach

如前文所述,Spark官方最早提供了基於Receiver的Kafka數據消費模式。但會存在程序失敗丟失數據的可能,後在Spark 1.2時引入一個配置參數spark.streaming.receiver.writeAheadLog.enable以規避此風險。如下是官方的原話:架構

under default configuration, this approach can lose data under failures (see receiver reliability. To ensure zero-data loss, you have to additionally enable Write Ahead Logs in Spark Streaming (introduced in Spark 1.2). This synchronously saves all the received Kafka data into write ahead logs on a distributed file system (e.g HDFS), so that all the data can be recovered on failure.併發

Receiver-based 讀取方式

Receiver-based的Kafka讀取方式是基於Kafka高階(high-level) api來實現對Kafka數據的消費。在提交Spark Streaming任務後,Spark集羣會劃出指定的Receivers來專門、持續不斷、異步讀取Kafka的數據,讀取時間間隔以及每次讀取offsets範圍能夠由參數來配置。讀取的數據保存在Receiver中,具體StorageLevel方式由用戶指定,諸如MEMORY_ONLY等。當driver 觸發batch任務的時候,Receivers中的數據會轉移到剩餘的Executors中去執行。在執行完以後,Receivers會相應更新ZooKeeper的offsets。如要確保at least once的讀取方式,能夠設置spark.streaming.receiver.writeAheadLog.enable爲true。具體Receiver執行流程見下圖:app

輸入圖片說明

Receiver-based 讀取實現

Kafka的high-level數據讀取方式讓用戶能夠專一於所讀數據,而不用關注或維護consumer的offsets,這減小用戶的工做量以及代碼量並且相對比較簡單。所以,在剛開始引入Spark Streaming計算引擎時,咱們優先考慮採用此種方式來讀取數據,具體的代碼以下:異步

/*讀取kafka數據函數*/
  def getKafkaInputStream(zookeeper: String,
                            topic: String,
                            groupId: String,
                            numRecivers: Int,
                            partition: Int,
                            ssc: StreamingContext): DStream[String] = {
    val kafkaParams = Map(
      ("zookeeper.connect", zookeeper),
      ("auto.offset.reset", "largest"),
      ("zookeeper.connection.timeout.ms", "30000"),
      ("fetch.message.max.bytes", (1024 * 1024 * 50).toString),
      ("group.id", groupId)
    )
    val topics = Map(topic -> partition / numRecivers)

    val kafkaDstreams = (1 to numRecivers).map { _ =>
      KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc,
        kafkaParams,
        topics,
        StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
    }

    ssc.union(kafkaDstreams)
  }

如上述代碼,函數getKafkaInputStream提供了zookeepertopicgroupIdnumReceiverspartition以及ssc,其傳入函數分別對應:socket

  • zookeeper: ZooKeeper鏈接信息
  • topic: Kafka中輸入的topic信息
  • groupId: consumer信息
  • numReceivers: 打算開啓的receiver個數, 並用來調整併發
  • partition: Kafka中對應topic的分區數

以上幾個參數主要用來鏈接Kafka並讀取Kafka數據。具體執行的步驟以下:ide

  • Kafka相關讀取參數配置,其中 zookeeper.connect即傳入進來的zookeeper參數;auto.offset.reset設置從topic的最新處開始讀取數據;zookeeper.connection.timeout.ms指zookeepr鏈接超時時間,以防止網絡不穩定的狀況;fetch.message.max.bytes則是指單次讀取數據的大小;group.id則是指定consumer。
  • 指定topic的併發數,當指定receivers個數以後,可是因爲receivers個數小於topic的partition個數,因此在每一個receiver上面會起相應的線程來讀取不一樣的partition。
  • 讀取Kafka數據,numReceivers的參數在此用於指定咱們須要多少Executor來做爲Receivers,開多個Receivers是爲了提升應用吞吐量。
  • union用於將多個Receiver讀取的數據關聯起來

Receiver-based 讀取問題

採用Reveiver-based方式知足咱們的一些場景需求,並基於此抽象出了一些micro-batch、內存計算模型等。在具體的應用場景中,咱們也對此種的方式作了一些優化:函數

  • 防數據丟失。作checkpoint操做以及配置spark.streaming.receiver.writeAheadLog.enable參數;
  • 提升receiver數據吞吐量。採用MEMORY_AND_DISK_SER方式讀取數據、提升單Receiver的內存或是調大並行度,將數據分散到多個Receiver中去。

以上處理方式在必定程度上知足了咱們的應用場景,諸如micro-batch以及內存計算模型等。可是同時由於這兩方面以及其餘方面的一些因素,致使也會出現各類狀況的問題:fetch

  • 配置spark.streaming.receiver.writeAheadLog.enable參數,每次處理以前須要將該batch內的日誌備份到checkpoint目錄中,這下降了數據處理效率,反過來又加劇了Receiver端的壓力;另外因爲數據備份機制,會受到負載影響,負載一高就會出現延遲的風險,致使應用崩潰。
  • 採用MEMORY_AND_DISK_SER下降對內存的要求。可是在必定程度上影響計算的速度
  • 單Receiver內存。因爲receiver也是屬於Executor的一部分,那麼爲了提升吞吐量,提升Receiver的內存。可是在每次batch計算中,參與計算的batch並不會使用到這麼多的內存,致使資源嚴重浪費。
  • 提升並行度,採用多個Receiver來保存Kafka的數據。Receiver讀取數據是異步的,並不參與計算。若是開較高的並行度來平衡吞吐量很不划算。
  • Receiver和計算的Executor的異步的,那麼遇到網絡等因素緣由,致使計算出現延遲,計算隊列一直在增長,而Receiver則在一直接收數據,這很是容易致使程序崩潰。
  • 在程序失敗恢復時,有可能出現數據部分落地,可是程序失敗,未更新offsets的狀況,這致使數據重複消費。

爲了回闢以上問題,下降資源使用,咱們後來採用Direct Approach來讀取Kafka的數據,具體接下來細說。

2、Direct Approach (No Receivers)

區別於Receiver-based的數據消費方法,Spark 官方在Spark 1.3時引入了Direct方式的Kafka數據消費方式。相對於Receiver-based的方法,Direct方式具備如下方面的優點:

  • 簡化並行(Simplified Parallelism)。不現須要建立以及union多輸入源,Kafka topic的partition與RDD的partition一一對應,官方描述以下:

No need to create multiple input Kafka streams and union them. With directStream, Spark Streaming will create as many RDD partitions as there are Kafka partitions to consume, which will all read data from Kafka in parallel. So there is a one-to-one mapping between Kafka and RDD partitions, which is easier to understand and tune.

  • 高效(Efficiency)。Receiver-based保證數據零丟失(zero-data loss)須要配置spark.streaming.receiver.writeAheadLog.enable,此種方式須要保存兩份數據,浪費存儲空間也影響效率。而Direct方式則不存在這個問題。

Achieving zero-data loss in the first approach required the data to be stored in a Write Ahead Log, which further replicated the data. This is actually inefficient as the data effectively gets replicated twice - once by Kafka, and a second time by the Write Ahead Log. This second approach eliminates the problem as there is no receiver, and hence no need for Write Ahead Logs. As long as you have sufficient Kafka retention, messages can be recovered from Kafka.

  • 強一致語義(Exactly-once semantics)。High-level數據由Spark Streaming消費,可是Offsets則是由Zookeeper保存。經過參數配置,能夠實現at-least once消費,此種狀況有重複消費數據的可能。

The first approach uses Kafka’s high level API to store consumed offsets in Zookeeper. This is traditionally the way to consume data from Kafka. While this approach (in combination with write ahead logs) can ensure zero data loss (i.e. at-least once semantics), there is a small chance some records may get consumed twice under some failures. This occurs because of inconsistencies between data reliably received by Spark Streaming and offsets tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API that does not use Zookeeper. Offsets are tracked by Spark Streaming within its checkpoints. This eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each record is received by Spark Streaming effectively exactly once despite failures. In order to achieve exactly-once semantics for output of your results, your output operation that saves the data to an external data store must be either idempotent, or an atomic transaction that saves results and offsets (see Semantics of output operations in the main programming guide for further information).

Direct 讀取方式

Direct方式採用Kafka簡單的consumer api方式來讀取數據,無需經由ZooKeeper,此種方式再也不須要專門Receiver來持續不斷讀取數據。當batch任務觸發時,由Executor讀取數據,並參與到其餘Executor的數據計算過程當中去。driver來決定讀取多少offsets,並將offsets交由checkpoints來維護。將觸發下次batch任務,再由Executor讀取Kafka數據並計算。今後過程咱們能夠發現Direct方式無需Receiver讀取數據,而是須要計算時再讀取數據,因此Direct方式的數據消費對內存的要求不高,只須要考慮批量計算所須要的內存便可;另外batch任務堆積時,也不會影響數據堆積。其具體讀取方式以下圖:

輸入圖片說明

Direct 讀取實現

Spark Streaming提供了一些重載讀取Kafka數據的方法,本文中關注兩個基於Scala的方法,這在咱們的應用場景中會用到,具體的方法代碼以下:

  • 方法createDirectStream中,ssc是StreamingContext;kafkaParams的具體配置見Receiver-based之中的配置,與之同樣;這裏面須要指出的是fromOffsets ,其用來指定從什麼offset處開始讀取數據。
def createDirectStream[
    K: ClassTag,
    V: ClassTag,
    KD <: Decoder[K]: ClassTag,
    VD <: Decoder[V]: ClassTag,
    R: ClassTag] (
      ssc: StreamingContext,
      kafkaParams: Map[String, String],
      fromOffsets: Map[TopicAndPartition, Long],
      messageHandler: MessageAndMetadata[K, V] => R
  ): InputDStream[R] = {
    val cleanedHandler = ssc.sc.clean(messageHandler)
    new DirectKafkaInputDStream[K, V, KD, VD, R](
      ssc, kafkaParams, fromOffsets, cleanedHandler)
  }
  • 方法createDirectStream中,該方法只須要3個參數,其中kafkaParams仍是同樣,並未有什麼變化,不過其中有個配置auto.offset.reset能夠用來指定是從largest或者是smallest處開始讀取數據;topic是指Kafka中的topic,能夠指定多個。具體提供的方法代碼以下:
def createDirectStream[
    K: ClassTag,
    V: ClassTag,
    KD <: Decoder[K]: ClassTag,
    VD <: Decoder[V]: ClassTag] (
      ssc: StreamingContext,
      kafkaParams: Map[String, String],
      topics: Set[String]
  ): InputDStream[(K, V)] = {
    val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
    val kc = new KafkaCluster(kafkaParams)
    val fromOffsets = getFromOffsets(kc, kafkaParams, topics)
    new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
      ssc, kafkaParams, fromOffsets, messageHandler)
  }

在實際的應用場景中,咱們會將兩種方法結合起來使用,大致的方向分爲兩個方面:

  • 應用啓動。當程序開發並上線,還未消費Kafka數據,此時從largest處讀取數據,採用第二種方法;
  • 應用重啓。因資源、網絡等其餘緣由致使程序失敗重啓時,須要保證從上次的offsets處開始讀取數據,此時就須要採用第一種方法來保證咱們的場景。

整體方向上,咱們採用以上方法知足咱們的須要,固然具體的策略咱們不在本篇中討論,後續會有專門的文章來介紹。從largest或者是smallest處讀Kafka數據代碼實現以下:

/**
    * 讀取kafka數據,從最新的offset開始讀
    *
    * @param ssc         : StreamingContext
    * @param kafkaParams : kafka參數
    * @param topics      : kafka topic
    * @return : 返回流數據
    */
private def getDirectStream(ssc: StreamingContext,
                            kafkaParams: Map[String, String],
                            topics: Set[String]): DStream[String] = {
  val kafkaDStreams = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
    ssc,
    kafkaParams,
    topics
  )
  kafkaDStreams.map(_._2)
}

程序失敗重啓的邏輯代碼以下:

/**
    * 若是已有offset,則從offset開始讀數據
    *
    * @param ssc         : StreamingContext
    * @param kafkaParams : kafkaParams配置參數
    * @param fromOffsets : 已有的offsets
    * @return : 返回流數據
    */
private def getDirectStreamWithOffsets(ssc: StreamingContext,
                                       kafkaParams: Map[String, String],
                                       fromOffsets: Map[TopicAndPartition, Long]): DStream[String] = {
  val kfkData = try {
    KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](
      ssc,
      kafkaParams,
      fromOffsets,
      (mmd: MessageAndMetadata[String, String]) => mmd.message()
    )
  } catch { //offsets失效, 從最新的offsets讀。
    case _: Exception =>
    val topics = fromOffsets.map { case (tap, _) =>
      tap.topic
    }.toSet
    getDirectStream(ssc, kafkaParams, topics)
  }
  kfkData
}

代碼中的fromOffsets參數從外部存儲獲取並須要處理轉換,其代碼以下:

val fromOffsets = offsets.map { consumerInfo =>
  TopicAndPartition(consumerInfo.topic, consumerInfo.part) -> consumerInfo.until_offset
}.toMap

該方法提供了從指定offsets處讀取Kafka數據。若是發現讀取數據異常,咱們認爲是offsets失敗,此種狀況去捕獲這個異常,而後從largest處讀取Kafka數據。

Direct 讀取問題

在實際的應用中,Direct Approach方式很好地知足了咱們的須要,與Receiver-based方式相比,有如下幾方面的優點:

  • 下降資源。Direct不須要Receivers,其申請的Executors所有參與到計算任務中;而Receiver-based則須要專門的Receivers來讀取Kafka數據且不參與計算。所以相同的資源申請,Direct 可以支持更大的業務。

  • 下降內存。Receiver-based的Receiver與其餘Exectuor是異步的,並持續不斷接收數據,對於小業務量的場景還好,若是遇到大業務量時,須要提升Receiver的內存,可是參與計算的Executor並沒有需那麼多的內存。而Direct 由於沒有Receiver,而是在計算時讀取數據,而後直接計算,因此對內存的要求很低。實際應用中咱們能夠把原先的10G降至如今的2-4G左右。

  • 魯棒性更好。Receiver-based方法須要Receivers來異步持續不斷的讀取數據,所以遇到網絡、存儲負載等因素,致使實時任務出現堆積,但Receivers卻還在持續讀取數據,此種狀況很容易致使計算崩潰。Direct 則沒有這種顧慮,其Driver在觸發batch 計算任務時,纔會讀取數據並計算。隊列出現堆積並不會引發程序的失敗。

至於其餘方面的優點,好比 簡化並行(Simplified Parallelism)、高效(Efficiency)以及強一致語義(Exactly-once semantics)在以前已列出,在此再也不介紹。雖然Direct 有以上這些優點,可是也存在一些不足,具體以下:

  • 提升成本。Direct須要用戶採用checkpoint或者第三方存儲來維護offsets,而不像Receiver-based那樣,經過ZooKeeper來維護Offsets,此提升了用戶的開發成本。
  • 監控可視化。Receiver-based方式指定topic指定consumer的消費狀況均能經過ZooKeeper來監控,而Direct則沒有這種便利,若是作到監控並可視化,則須要投入人力開發。

總結

本文介紹了基於Spark Streaming的Kafka數據讀取方式,包括Receiver-based以及Direct兩種方式。兩種方式各有優劣,但相對來講Direct 適用於更多的業務場景以及有更好的可護展性。至於如何選擇以上兩種方式,除了業務場景外也跟團隊相關,若是是應用初期,爲了快速迭代應用,能夠考慮採用第一種方式;若是要深刻使用的話則建議採用第二種方式。本文只介紹了兩種讀取方式,並未涉及到讀取策略、優化等問題。這些會在後續的文章中詳細介紹。

關於做者

徐勝國,大連理工大學碩士畢業,360大數據中心數據研發工程師,主要負責基於Spark Streaming的項目架構及研發工做。郵箱 : xshguo_better@yeah.net。若有問題,可郵件聯繫,歡迎交流。

相關文章
相關標籤/搜索