kafka direct方式獲取數據解析

Receiver方式

處理流程

實際上作kafka receiver的時候,經過receiver來獲取數據,這個時候,kafka receiver是使用的kafka高層次的comsumer api來實現的。receiver會從kafka中獲取數據,而後把它存儲到咱們具體的Executor內存中。而後Spark streaming也就是driver中,會根據這獲取到的數據,啓動job去處理。api

kafkareceiver

receiver缺點

  1. 已經拉取的數據消費失敗後,會致使數據丟失。此問題雖然能夠經過WAL方式或者Memory_and_Disc2解決,可是存在耗時等問題
  2. 使用了kafka consumer的高階API,KafkaInputDStream的實現和咱們經常使用的consumer實現相似,須要zk額外的記錄偏移量

Direct方式

實現

在使用kafka接收消息時,都是調用了KafkaUtils裏面createStream的不一樣實現。數組

receiver方式的實現方式以下。app

/**
   * 建立一個inputStream,從kafkaBrokers上拉去消息,須要傳入zk集羣信息,默認會複製到另外一個excutor
   */
  def createStream(
      ssc: StreamingContext,// spark上下文
      zkQuorum: String,// zk集羣信息(hostname:port,hostname:port...)
      groupId: String,// 當前consumer所屬分組
      topics: Map[String, Int],// Map[topic_name,numPartitions],topic消費對應的分區
      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
    ): ReceiverInputDStream[(String, String)] = {
    val kafkaParams = Map[String, String](
      "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
      "zookeeper.connection.timeout.ms" -> "10000")
      // 寫日誌
	  val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf)
      // 組裝成KafkaInputDStream
      new KafkaInputDStream[K, V, U, T](
            ssc, kafkaParams, topics, walEnabled, 	storageLevel)
  }

direct方式實現消費ide

/**
   * 摒棄了高階的kafkaConsumerAPI直接從kafkaBrokers獲取信息,能夠保證每條消息只被消費一次
   * 特色:
   * - No receivers:沒有receivers,直接從kafka拉取數據
   * - Offsets:不用zookeeper存儲offsets,偏移量是經過stream本身跟蹤記錄的,能夠經過HasOffsetRanges獲取offset
   * - Failure Recovery故障恢復:須要開啓sparkContext的checkpoint功能
   * - End-to-end semantics最終一致性:保證消息被消費且只消費一次
   * @return DStream of (Kafka message key, Kafka message value)
   */
  def createDirectStream[
    K: ClassTag,
    V: ClassTag,
    KD <: Decoder[K]: ClassTag,
    VD <: Decoder[V]: ClassTag] (
      ssc: StreamingContext,
      // brokers列表,Map("metadata.broker.list" -> brokers)
      kafkaParams: Map[String, String],
      topics: Set[String]
  ): InputDStream[(K, V)] = {
    val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
    val kc = new KafkaCluster(kafkaParams)
    val fromOffsets = getFromOffsets(kc, kafkaParams, topics)
    new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
      ssc, kafkaParams, fromOffsets, messageHandler)
  }

特色

  1. Direct的方式是會直接操做kafka底層的元數據信息,這樣若是計算失敗了,能夠把數據從新讀一下,從新處理。即數據必定會被處理。拉數據,是RDD在執行的時候直接去拉數據。
  2. 因爲直接操做的是kafka,kafka就至關於你底層的文件系統。這個時候能保證嚴格的事務一致性,即必定會被處理,並且只會被處理一次。而Receiver的方式則不能保證,由於Receiver和ZK中的數據可能不一樣步,Spark Streaming可能會重複消費數據。而Direct api直接是操做kafka的,spark streaming本身負責追蹤消費這個數據的偏移量或者offset,而且本身保存到checkpoint,因此它的數據必定是同步的,必定不會被重複。
  3. 底層是直接讀數據,沒有所謂的Receiver,直接是週期性(Batch Intervel)的查詢kafka,處理數據的時候,咱們會使用基於kafka原生的Consumer api來獲取kafka中特定範圍(offset範圍)中的數據,這個時候,Direct Api訪問kafka帶來的一個顯而易見的性能上的好處就是,若是你要讀取多個partition,Spark也會建立RDD的partition,這個時候RDD的partition和kafka的partition是一致的。因此增長kafka中的topic的partition數量能夠提升並行度。
  4. 偏移量:默認從最新偏移量(largest)開始消費。若是設置了auto.offset.reset參數值爲smallest將從最小偏移處開始消費。

checkpoint恢復後,若是數據累積太多處理不過來,怎麼辦?函數

1)限速,經過spark.streaming.kafka.maxRatePerPartition參數配置性能

2)加強機器的處理能力fetch

3)放到數據緩衝池中。ui

模式比對

  1. .簡化並行性:無需建立多個輸入Kafka流而且結合它們。 使用directStream,Spark Streaming將建立與要消費的Kafkatopic中partition分區同樣多的RDD分區,這將從Kafka並行讀取數據。 所以,在Kafka和RDD分區之間存在一對一映射,這更容易理解和調整。
  2. 效率:在第一種方法中實現零數據丟失須要將數據存儲在預寫日誌中,該日誌進一步複製數據。 這其實是低效的,由於數據有效地被複制兩次 - 一次是Kafka,另外一次是寫入提早日誌。 第二種方法消除了問題,由於沒有接收器(zookeeper),所以不須要預寫日誌。 將元數據信息直接保存在kafka中,能夠從Kafka恢復消息。
  3. Exactly-once語義:第一種方法使用Kafka的高級API在Zookeeper中存儲消耗的偏移量。這是傳統上消費Kafka數據的方式。雖然這種方法(與預寫日誌結合)能夠確保零數據丟失(即至少一次語義),可是一些記錄在一些故障下可能被消耗兩次。這是由於Spark Streaming可靠接收的數據與Zookeeper跟蹤的偏移之間存在不一致。所以,在第二種方法中,咱們使用簡單的Kafka API,不使用Zookeeper的。偏移由Spark Streaming在其檢查點內跟蹤。這消除了Spark Streaming和Zookeeper / Kafka之間的不一致,因此每一個記錄被Spark Streaming有效地精確接收一次,儘管失敗了。爲了實現輸出結果的一次性語義,將數據保存到外部數據存儲的輸出操做必須是冪等的,或者是保存結果和偏移量的原子事務。

direct源碼

獲取offset集合,而後建立DirectKafkaInputDStream對象this

//  class KafkaUtils  
private[kafka] def getFromOffsets(
      kc: KafkaCluster,
      kafkaParams: Map[String, String],
      topics: Set[String]
    ): Map[TopicAndPartition, Long] = {
    // createDirectStream方法kafkaParams入參:消費起始位置
    val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase(Locale.ROOT))
    val result = for {
      topicPartitions <- kc.getPartitions(topics).right
      leaderOffsets <- (if (reset == Some("smallest")) {
        // smallest表示最小offset,即從topic的開始位置消費全部消息.
        kc.getEarliestLeaderOffsets(topicPartitions)
      } else {
        // largest表示接受接收最大的offset(即最新消息),
        kc.getLatestLeaderOffsets(topicPartitions)
      }).right
      // for循環中的 yield 會把當前的元素記下來,保存在集合中,循環結束後將返回該集合。Scala中for循環是有返回值的。若是被循環的是Map,返回的就是Map,被循環的是List,返回的就是List,以此類推。
    } yield {
      // 存放for循環的計算結果:map[TopicAndPartition, LeaderOffset]
      leaderOffsets.map { case (tp, lo) =>
          (tp, lo.offset)
      }
    }
    KafkaCluster.checkErrors(result)
  }

def createDirectStream{
    new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
      ssc, kafkaParams, fromOffsets, messageHandler)
}

DirectKafkaInputDStream.compute中建立KafkaRDD,並將offsets信息發送給inputStreamTracker.spa

override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
    // Map[TopicAndPartition, LeaderOffset] topic的partiton對應偏移量集合
    val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
    // 消息處理函數val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
    // 建立KafkaRDD
    val rdd = KafkaRDD[K, V, U, T, R](
      context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)

    // 將topic和partition信息包裝成OffsetRange對象中
    val offsetRanges = currentOffsets.map { case (tp, fo) =>
      val uo = untilOffsets(tp)
      OffsetRange(tp.topic, tp.partition, fo, uo.offset)
    }
   
 	// 將OffsetRange報告給InputInfoTracker記錄
    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
    currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
    Some(rdd)
  }

KafkaRDD計算時直接從kafka上拉取數據

override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {
    val part = thePart.asInstanceOf[KafkaRDDPartition]
    new KafkaRDDIterator(part, context)
}

private class KafkaRDDIterator(
      part: KafkaRDDPartition,
      context: TaskContext) extends NextIterator[R] {
	// 根據metadata.broker.list初始化KafkaCluster,用來鏈接到kafka
    val kc = new KafkaCluster(kafkaParams)
    
    var requestOffset = part.fromOffset
    var iter: Iterator[MessageAndOffset] = null

    // 提供一個最優的host優先訪問,最大化的減小重試次數
    val consumer:SimpleConsumer = {
        // 重試次數大於0
      if (context.attemptNumber > 0) {
        kc.connectLeader(part.topic, part.partition).fold(
          errs => throw new SparkException(
            s"Couldn't connect to leader for topic ${part.topic} ${part.partition}: " +
              errs.mkString("\n")),
          consumer => consumer
        )
      } else {
          // 不用重試,直接鏈接
        kc.connect(part.host, part.port)
      }
    }

    // 建立請求拉取數據
    private def fetchBatch: Iterator[MessageAndOffset] = {
      val req = new FetchRequestBuilder()
        .addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes)
        .build()
      val resp = consumer.fetch(req)
      // 失敗重試 
      handleFetchErr(resp)
      // kafka may return a batch that starts before the requested offset
      resp.messageSet(part.topic, part.partition)
        .iterator
        .dropWhile(_.offset < requestOffset)
    }
    
    // 拉取失敗,通知另外一個rdd從新嘗試
    private def handleFetchErr(resp: FetchResponse) {
      if (resp.hasError) {
   		// Let normal rdd retry sort out reconnect attempts
        throw ErrorMapping.exceptionFor(err)
      }
    }

    override def getNext(): R = {
      if (iter == null || !iter.hasNext) {
          // 拉取數據
        iter = fetchBatch
      }
      if (!iter.hasNext) {
        assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
        finished = true
        null.asInstanceOf[R]
      } else {
          // 遍歷拉取到的數據
        val item = iter.next()
        if (item.offset >= part.untilOffset) {
		  // 若是當前item的偏移量大於須要拉取的最大偏移量則結束
          finished = true
          null.asInstanceOf[R]
        } else {
          requestOffset = item.nextOffset
            // 將拉取到的數據交由messageHandler處理
          messageHandler(new MessageAndMetadata(
            part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder))
        }
      }
    }
  }
}

經過chekpoint的方式保存offset

// DStream中定義checkpoint的實現類
class DirectKafkaInputDStream extends InputDStream{
   override val checkpointData =new DirectKafkaInputDStreamCheckpointData
}
class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
    def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = {
        // 定義一個不可變數組保存offset信息
      data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]
    }
}
相關文章
相關標籤/搜索