先從源碼來深刻理解一下 DirectKafkaInputDStream 的將 kafka 做爲輸入流時,如何確保 exactly-once 語義。apache
val stream: InputDStream[(String, String, Long)] = KafkaUtils.createDirectStream
[String, String, StringDecoder, StringDecoder, (String, String, Long)](
ssc, kafkaParams, fromOffsets,
(mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message(), mmd.offset))
對應的源碼以下:bootstrap
def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag, R: ClassTag] ( ssc: StreamingContext, kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition, Long], messageHandler: MessageAndMetadata[K, V] => R ): InputDStream[R] = { val cleanedHandler = ssc.sc.clean(messageHandler) new DirectKafkaInputDStream[K, V, KD, VD, R]( ssc, kafkaParams, fromOffsets, cleanedHandler) }
DirectKafkaInputDStream 的類聲明以下:網絡
A stream of org.apache.spark.streaming.kafka.KafkaRDD where each given Kafka topic/partition corresponds to an RDD partition.
The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number of messages per second that
each partition will accept. Starting offsets are specified in advance, and this DStream is not responsible for committing offsets,
so that you can control exactly-once semantics. For an easy interface to Kafka-managed offsets,
see org.apache.spark.streaming.kafka.KafkaCluster
簡言之,Kafka RDD 的一個流,每個指定的topic 的每個 partition 對應一個 RDD partitionapp
在父類 InputDStream 中,對 compute 方法的解釋以下:dom
Method that generates a RDD for the given time 對於給定的時間,生成新的Rdd
這就是生成RDD 的入口:socket
override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = { // 1. 先獲取這批次數據的 until offsets val untilOffsets = clamp(latestLeaderOffsets(maxRetries)) // 2. 生成KafkaRDD 實例 val rdd = KafkaRDD[K, V, U, T, R]( context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler) // Report the record number and metadata of this batch interval to InputInfoTracker. // 獲取 該批次 的 offset 的範圍 val offsetRanges = currentOffsets.map { case (tp, fo) => val uo = untilOffsets(tp) // 獲取 until offset OffsetRange(tp.topic, tp.partition, fo, uo.offset) } //3. 將當前批次的metadata和offset 的信息報告給 InputInfoTracker val description = offsetRanges.filter { offsetRange => // Don't display empty ranges. offsetRange.fromOffset != offsetRange.untilOffset }.map { offsetRange => s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" + s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}" }.mkString("\n") // Copy offsetRanges to immutable.List to prevent from being modified by the user val metadata = Map( "offsets" -> offsetRanges.toList, StreamInputInfo.METADATA_KEY_DESCRIPTION -> description) val inputInfo = StreamInputInfo(id, rdd.count, metadata) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) // 4. 更新當前的 offsets currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset) Some(rdd) }
詳細分析 獲取 leaderOffset 的步驟,即 latestLeaderOffsets 方法:ide
@tailrec protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = { val o = kc.getLatestLeaderOffsets(currentOffsets.keySet) // Either.fold would confuse @tailrec, do it manually if (o.isLeft) { // left 表明 error val err = o.left.get.toString if (retries <= 0) { throw new SparkException(err) } else { log.error(err) Thread.sleep(kc.config.refreshLeaderBackoffMs) latestLeaderOffsets(retries - 1) } } else { // right 表明結果 o.right.get } }
分析 kc.getLatestLeaderOffsets(currentOffsets.keySet)
字段賦值語句:protected val kc = new KafkaCluster(kafkaParams)
即調用了 KafkaCluster的getLatestLeaderOffsets
調用棧以下:函數
def getLatestLeaderOffsets( topicAndPartitions: Set[TopicAndPartition] ): Either[Err, Map[TopicAndPartition, LeaderOffset]] = getLeaderOffsets(topicAndPartitions, OffsetRequest.LatestTime) // 調用了下面的方法: def getLeaderOffsets( topicAndPartitions: Set[TopicAndPartition], before: Long ): Either[Err, Map[TopicAndPartition, LeaderOffset]] = { getLeaderOffsets(topicAndPartitions, before, 1).right.map { r => r.map { kv => // mapValues isnt serializable, see SI-7005 kv._1 -> kv._2.head } } } // getLeaderOffsets 調用了下面的方法,用於獲取leader 的offset,如今是最大的offset: def getLeaderOffsets( topicAndPartitions: Set[TopicAndPartition], before: Long, maxNumOffsets: Int ): Either[Err, Map[TopicAndPartition, Seq[LeaderOffset]]] = { // 獲取全部的partition 的leader的 host和 port 信息 findLeaders(topicAndPartitions).right.flatMap { tpToLeader => // tp -> (l.host -> l.port) ==> (l.host -> l.port) ->seq[tp] val leaderToTp: Map[(String, Int), Seq[TopicAndPartition]] = flip(tpToLeader) // 全部的leader 的 鏈接方式 val leaders = leaderToTp.keys var result = Map[TopicAndPartition, Seq[LeaderOffset]]() val errs = new Err // 經過leader 獲取每個 leader的offset,如今是最大的 offset withBrokers(leaders, errs) { consumer => val partitionsToGetOffsets: Seq[TopicAndPartition] = leaderToTp((consumer.host, consumer.port)) val reqMap = partitionsToGetOffsets.map { tp: TopicAndPartition => tp -> PartitionOffsetRequestInfo(before, maxNumOffsets) }.toMap val req = OffsetRequest(reqMap) val resp = consumer.getOffsetsBefore(req) val respMap = resp.partitionErrorAndOffsets partitionsToGetOffsets.foreach { tp: TopicAndPartition => respMap.get(tp).foreach { por: PartitionOffsetsResponse => if (por.error == ErrorMapping.NoError) { if (por.offsets.nonEmpty) { result += tp -> por.offsets.map { off => LeaderOffset(consumer.host, consumer.port, off) } } else { errs.append(new SparkException( s"Empty offsets for ${tp}, is ${before} before log beginning?")) } } else { errs.append(ErrorMapping.exceptionFor(por.error)) } } } if (result.keys.size == topicAndPartitions.size) { return Right(result) } } val missing = topicAndPartitions.diff(result.keySet) errs.append(new SparkException(s"Couldn't find leader offsets for ${missing}")) Left(errs) } } // 根據 TopicAndPartition 獲取partition leader 的 host 和 port 信息 def findLeaders( topicAndPartitions: Set[TopicAndPartition] ): Either[Err, Map[TopicAndPartition, (String, Int)]] = { val topics = topicAndPartitions.map(_.topic) // 獲取給定topics集合的全部的partition 的 metadata信息 val response = getPartitionMetadata(topics).right // 獲取全部的partition 的 leader 的 host 和port 信息 val answer = response.flatMap { tms: Set[TopicMetadata] => val leaderMap = tms.flatMap { tm: TopicMetadata => tm.partitionsMetadata.flatMap { pm: PartitionMetadata => val tp = TopicAndPartition(tm.topic, pm.partitionId) if (topicAndPartitions(tp)) { pm.leader.map { l => tp -> (l.host -> l.port) } } else { None } } }.toMap if (leaderMap.keys.size == topicAndPartitions.size) { Right(leaderMap) } else { val missing = topicAndPartitions.diff(leaderMap.keySet) val err = new Err err.append(new SparkException(s"Couldn't find leaders for ${missing}")) Left(err) } } answer } // 獲取給定的 topic集合的全部partition 的metadata 信息 def getPartitionMetadata(topics: Set[String]): Either[Err, Set[TopicMetadata]] = { // 建立TopicMetadataRequest對象 val req = TopicMetadataRequest( TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics.toSeq) val errs = new Err // 隨機打亂 broker-list的順序 withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer => val resp: TopicMetadataResponse = consumer.send(req) val respErrs = resp.topicsMetadata.filter(m => m.errorCode != ErrorMapping.NoError) if (respErrs.isEmpty) { return Right(resp.topicsMetadata.toSet) } else { respErrs.foreach { m => val cause = ErrorMapping.exceptionFor(m.errorCode) val msg = s"Error getting partition metadata for '${m.topic}'. Does the topic exist?" errs.append(new SparkException(msg, cause)) } } } Left(errs) } // Try a call against potentially multiple brokers, accumulating errors private def withBrokers(brokers: Iterable[(String, Int)], errs: Err) (fn: SimpleConsumer => Any): Unit = { //這裏雖然是一個 foreach循環,但一旦獲取到metadata,就返回,之因此使用一個foreach循環,是爲了增長重試次數,
// 防止kafka cluster 的單節點宕機,除此以外,還設計了 單節點的屢次重試機制。只不過是循環重試,即多個節點都訪問完後,
// 再sleep 200ms(默認),而後再進行下一輪訪問,能夠適用於節點瞬間服務不可用狀況。 brokers.foreach { hp => var consumer: SimpleConsumer = null try { // 獲取SimpleConsumer 的鏈接 consumer = connect(hp._1, hp._2) fn(consumer) // 發送請求並獲取到partition 的metadata /* fn 即 後面定義的 consumer => val resp: TopicMetadataResponse = consumer.send(req) val respErrs = resp.topicsMetadata.filter(m => m.errorCode != ErrorMapping.NoError) if (respErrs.isEmpty) { return Right(resp.topicsMetadata.toSet) } else { respErrs.foreach { m => val cause = ErrorMapping.exceptionFor(m.errorCode) val msg = s"Error getting partition metadata for '${m.topic}'. Does the topic exist?" errs.append(new SparkException(msg, cause)) } } } Left(errs) */ } catch { case NonFatal(e) => errs.append(e) } finally { if (consumer != null) { consumer.close() } } } } private def flip[K, V](m: Map[K, V]): Map[V, Seq[K]] = m.groupBy(_._2).map { kv => kv._1 -> kv._2.keys.toSeq }
而後,根據獲取的 每個 partition的leader 最大 offset 來,肯定每個partition的 until offset,即clamp 函數的功能:fetch
// limits the maximum number of messages per partition protected def clamp( leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = { maxMessagesPerPartition.map { mmp => leaderOffsets.map { case (tp, lo) => // 評估的until offset = 當前offset + 評估速率 // 從 每個topic partition leader 的最大offset 和 評估的 until offset 中選取較小值做爲 每個 topic partition 的 until offset tp -> lo.copy(offset = Math.min(currentOffsets(tp) + mmp, lo.offset)) } }.getOrElse(leaderOffsets) // 若是是第一次獲取數據,而且沒有設置spark.streaming.kafka.maxRatePerPartition 參數,則會返回 每個 leader 的最大大小 } protected def maxMessagesPerPartition: Option[Long] = { // rateController 是負責評估流速的 val estimatedRateLimit = rateController.map(_.getLatestRate().toInt) // 全部的 topic 分區數 val numPartitions = currentOffsets.keys.size // 獲取當前的流處理速率 val effectiveRateLimitPerPartition = estimatedRateLimit .filter(_ > 0) // 過濾掉非正速率 .map { limit => // 經過spark.streaming.kafka.maxRatePerPartition設置這個參數,默認是0 if (maxRateLimitPerPartition > 0) { // 從評估速率和設置的速率中取一個較小值 Math.min(maxRateLimitPerPartition, (limit / numPartitions)) } else { // 若是沒有設置,評估速率 / 分區數 limit / numPartitions } }.getOrElse(maxRateLimitPerPartition) // 若是速率評估率不起做用時,使用設置的速率,若是不設置是 0 if (effectiveRateLimitPerPartition > 0) { // 若是每個分區的有效速率大於0 val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 // 轉換成每ms的流速率 Some((secsPerBatch * effectiveRateLimitPerPartition).toLong) } else { None } }
KafkaRDD 伴生對象的 apply 方法: def apply[ K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag, R: ClassTag]( sc: SparkContext, kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition, Long], untilOffsets: Map[TopicAndPartition, LeaderOffset], messageHandler: MessageAndMetadata[K, V] => R ): KafkaRDD[K, V, U, T, R] = { // 從 untilOffsets 中獲取 TopicAndPartition 和 leader info( host, port) 的映射關係 val leaders = untilOffsets.map { case (tp, lo) => tp -> (lo.host, lo.port) }.toMap val offsetRanges = fromOffsets.map { case (tp, fo) => // 根據 fromOffsets 和 untilOffset ,拼接成OffsetRange 對象 val uo = untilOffsets(tp) OffsetRange(tp.topic, tp.partition, fo, uo.offset) }.toArray // 返回 KafkaRDD class 的實例 new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaders, messageHandler) }
先看KafkaRDD 的解釋:ui
A batch-oriented interface for consuming from Kafka. Starting and ending offsets are specified in advance, so that you can control exactly-once semantics. 從kafka 消費的針對批處理的API,開始和結束 的 offset 都提早設定了,因此咱們能夠控制exactly-once 的語義。
重點看 KafkaRDD 的 compute 方法,它以分區做爲參數:
override def compute(thePart: Partition, context: TaskContext): Iterator[R] = { val part = thePart.asInstanceOf[KafkaRDDPartition] assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part)) if (part.fromOffset == part.untilOffset) { // 若是 from offset == until offset,返回一個空的迭代器對象 log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " + s"skipping ${part.topic} ${part.partition}") Iterator.empty } else { new KafkaRDDIterator(part, context) } }
KafkaRDDIterator的源碼以下,首先這個類比較好理解,由於只重寫了兩個非private 方法,close和 getNext, close 是用於關閉 SimpleConsumer 實例的(主要用於關閉socket 鏈接和 用於讀response和寫request的blockingChannel),getNext 是用於獲取數據的
類源碼以下:
private class KafkaRDDIterator( part: KafkaRDDPartition, context: TaskContext) extends NextIterator[R] { context.addTaskCompletionListener{ context => closeIfNeeded() } log.info(s"Computing topic ${part.topic}, partition ${part.partition} " + s"offsets ${part.fromOffset} -> ${part.untilOffset}") // KafkaCluster 是與 kafka cluster通訊的client API val kc = new KafkaCluster(kafkaParams) // kafka 消息的 key 的解碼器 // classTag 是scala package 下的 package object – reflect定義的一個classTag方法,該方法返回一個 ClassTag 對象,
// 該對象中 runtimeClass 保存了運行時被擦除的範型Class對象, Decoder 的實現類都有一個 以VerifiableProperties
// 變量做爲入參的構造方法。獲取到構造方法後,利用反射實例化具體的Decoder實現對象,而後再向上轉型爲 Decoder val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) .newInstance(kc.config.props) .asInstanceOf[Decoder[K]] // kafka 消息的 value 的解碼器 val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) .newInstance(kc.config.props) .asInstanceOf[Decoder[V]] val consumer = connectLeader var requestOffset = part.fromOffset var iter: Iterator[MessageAndOffset] = null // The idea is to use the provided preferred host, except on task retry atttempts, // to minimize number of kafka metadata requests private def connectLeader: SimpleConsumer = { if (context.attemptNumber > 0) { // 若是重試次數大於 0, 則容許重試訪問--bootstrap-server 列表裏的全部 broker,一旦獲取到 topic 的partition 的leader 信息,則立刻返回 kc.connectLeader(part.topic, part.partition).fold( errs => throw new SparkException( s"Couldn't connect to leader for topic ${part.topic} ${part.partition}: " + errs.mkString("\n")), consumer => consumer ) } else { kc.connect(part.host, part.port) } } // 在fetch數據失敗時所作的操做,無疑,這是一個hook 函數 private def handleFetchErr(resp: FetchResponse) { if (resp.hasError) { val err = resp.errorCode(part.topic, part.partition) if (err == ErrorMapping.LeaderNotAvailableCode || err == ErrorMapping.NotLeaderForPartitionCode) { log.error(s"Lost leader for topic ${part.topic} partition ${part.partition}, " + s" sleeping for ${kc.config.refreshLeaderBackoffMs}ms") Thread.sleep(kc.config.refreshLeaderBackoffMs) } // Let normal rdd retry sort out reconnect attempts throw ErrorMapping.exceptionFor(err) } } //注意此時的 返回結果是MessageAndOffset(Message(ByteBuffer)和 offset) 的迭代器 private def fetchBatch: Iterator[MessageAndOffset] = { // 首先,見名之意,這是一個builder,做用就是構建一個FetchRequest 對象 val req = new FetchRequestBuilder() .addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes) .build() // 調用 SimpleConsumer 的 fetch 方法,發送 FetchRequest 請求並獲取返回的 topic 消息 val resp = consumer.fetch(req) // 查看是否有錯誤,若是有,則拋出一場,不然繼續處理返回的消息 handleFetchErr(resp) // kafka may return a batch that starts before the requested offset // 由於網絡延遲等緣由,可能會獲取到以前的發送的請求結果,此時的 offset 是小於當前的 offset 的,須要過濾掉 resp.messageSet(part.topic, part.partition) .iterator .dropWhile(_.offset < requestOffset) } override def close(): Unit = { if (consumer != null) { consumer.close() } } // 咱們重點看getNext 方法, 它的返回值 爲R, 從KafkaUtils類中的初始化KafkaRDD 方法能夠看出 R 實際上是 <K,V>, 即會返回一個key 和 value的pair override def getNext(): R = { if (iter == null || !iter.hasNext) { // 第一次或者是已經消費完了 iter = fetchBatch // 調用 fetchBatch 方法,獲取獲得MessageAndOffset的迭代器 } if (!iter.hasNext) { // 若是本批次沒有數據須要處理或者本批次內還有全部數據均被處理,直接修改標識位,返回null assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part)) finished = true null.asInstanceOf[R] } else { val item = iter.next() // 獲取下一個 MessageAndOffset 對象 if (item.offset >= part.untilOffset) { // 若是返回的消息大於等於本批次的until offset,則會返回 null assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, part)) finished = true null.asInstanceOf[R] } else { // 獲取的 MessageAndOffse的Offset 大於等於 from offset而且小於 until offset requestOffset = item.nextOffset // 須要請求 kafka cluster 的消息是本條消息的下一個offset對應的消息 // MessageAndMetadata 是封裝了單條消息的相關信息,包括 topic, partition, 對應的消息ByteBuffer,消息的offset,key解碼器,value解碼類 // messageHandler 是一個回調方法, 對應了本例中的(mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message(), mmd.offset)代碼 messageHandler(new MessageAndMetadata( part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder)) } } } }
有以下問題: 1.這個類是如何接收 kafka 的消息的? 經過KafkaRDD來獲取單批次的數據的,KafkaRDD的compute方法返回一個迭代器,這個迭代器封裝了kafka partition數據的批量抓取以及負責調用傳入的消息處理回調函數並將單條處理結果返回。 其中,spark streaming 的exactly-once 消費機制是經過 KafkaRDD 來保證的,在建立KafkaRDD以前,就已經經過 currentOffset和 估算出的速率,以及每一個分區的自定義最大抓取速率,和從partition的leader獲取的最大offset,肯定分區untilOffset的值,最終fromOffset和untilOffset構成OffsetRange,在KafkaRDD中生成的迭代器中會丟棄掉offset不在該OffsetRange內的數據,最終調用用戶傳入的消息處理函數,處理數據成用戶想要的數據格式。 2.這個類是如何將單個partition的消息轉換爲 RDD單個partition的數據的? KafkaRDD 的compute 方法 以 partition 做爲參數,這個partition是 KafkaRDDPartition 的實例, 包含了分區消息的 offset range,topic, partition 等信息,該方法會返回一個KafkaRDDIterat,該類提供了訪問 該分區內kafka 數據的 數據,內部經過SimpleConsumer 來從leader 節點來批量獲取數據,而後再從批量數據中獲取咱們想要的數據(由offset range來保證)。 3.這個類是如何估算 kafka 消費速率的? 提供了 PIDRateEstimator 類, 該類經過傳入batch 處理結束時間,batch 處理條數, 實際處理時間和 batch 調度時間來估算速率的。 4.這個類是如何作WAL 的?這個類作不了 WAL