Spark Streaming源碼解讀之No Receivers

前文有提到過Spark Streaming事務是如何保證exactly once的語義的。app

從spark core程序來說,讀取固定數據來源好比hdfs中,spark只是作爲一個計算框架。負載均衡

而在流處理中,只是多了一個時間維度。框架

若在某一時刻,知道所需處理數據的來源,直接讀取,而不用被動的接收(Receiver),那就是和普通的Spark 程序沒什麼差異了。函數

本文將着重Kafka中direct方式的讀取,以案例切入,跟蹤源碼分析。源碼分析

入口是KafkaUtils,先建立了一個回調函數定義,再獲取到kafka集羣,並獲取到起始偏移量,最後建立一個DirectKafkaInputDStream,用於建立RDD。spa

// KafkaUtils.scala line 473
  def createDirectStream[
    K: ClassTag,
    V: ClassTag,
    KD <: Decoder[K]: ClassTag,
    VD <: Decoder[V]: ClassTag] (
      ssc: StreamingContext,
      kafkaParams: Map[String, String],
      topics: Set[String]
  ): InputDStream[(K, V)] = {
    val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
    val kc = new KafkaCluster(kafkaParams)
    val fromOffsets = getFromOffsets(kc, kafkaParams, topics)
    new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
      ssc, kafkaParams, fromOffsets, messageHandler)
  }

KafkaCluster 在實例化時沒有任何動做,只是單純的建立對象。.net

接下來獲取每一個partition的偏移量,scala

  1. 先獲取設置的偏移量配置
  2. 獲取分區,使用的是Either,即要麼left要麼right。此處left爲拋出一個異常,right爲返回TopicAndPartition Set。scala語法博大精深。,後面的大量用到了Either。業務邏輯沒什麼複雜。
  3. 偏移量默認是最大的。

DStream建立以後,整個DAG回溯的lineage以下:code

DirectKafkaInputDStream -> MappedDStream > FlatMappedDStream -> MappedDStream -> ShuffledDStream -> ForEachDStream對象

當DAG回溯到DirectKafkaInputDStream時,會調用compute。建立KafkaRDD,而且將最新的偏移量保存,以便下次計算新的偏移量。

從當RDD進入計算時,會調用compute。,此處的offsetRanges就是Kafka的TopicAndPartition和對應的偏移量。最後結果就是kafka有多少個partition,spark就會有多少個partition與之對應。

 

直接抓Kafka數據的方式與Receiver的方式的對比:

  1. 實現方式
    1. Direct方式只是定時生產RDD,經過RDD 回溯至最先的KafkaRDD來獲取數據,是很天然的作法
    2. Receiver是以RDD的形式封裝Receiver,在Worker中啓動後,未到時的數據存在內存隊列中,定時觸發來收割數據,放入Spark中
  2. 數據可靠性
    1. Direct數據存放在外部Kafka中,kafka自帶副本。無需spark作冗餘
    2. Receiver須要WAL來預防數據丟失,可是由於wal的批處理的特性,仍是有可能丟失數據。
  3. 負載均衡
    1. Direct以RDD的形式,每一個Duration產生的RDD都會在執行時動態最優。
    2. Receiver 與 Worker綁定,沒法動態調整。
  4. 一致性保證
    1. direct藉助kafka的ack,失敗的消息會自動重試。
    2. 藉助wal實現一致性。
  5. 對資源的合理使用
    1. direct方式數據都存在kafka,沒冗餘,沒wal,
    2. receiver未收割的數據存在內存queue中,必要時要開啓wal,至少多了1分副本。
相關文章
相關標籤/搜索