spark streaming 接收kafka消息之四 -- 運行在 worker 上的 receiver

使用分佈式receiver來獲取數據
使用 WAL 來實現 At least once 操做:
conf.set("spark.streaming.receiver.writeAheadLog.enable","true") // 開啓 WAL
// 一、At most once - 每條數據最多被處理一次(0次或1次),這種語義下會出現數據丟失的問題;
// 二、At least once - 每條數據最少被處理一次 (1次或更多),這個不會出現數據丟失,可是會出現數據重複;
// 三、Exactly once - 每條數據只會被處理一次,沒有數據會丟失,而且沒有數據會被屢次處理,這種語義是你們最想要的,可是也是最難實現的。html

若是不作容錯,將會帶來數據丟失,由於Receiver一直在接收數據,在其沒有處理的時候(已通知zk數據接收到),Executor忽然掛掉(或是driver掛掉通知executor關閉),緩存在內存中的數據就會丟失。由於這個問題,Spark1.2開始加入了WAL(Write ahead log)開啓 WAL,將receiver獲取數據的存儲級別修改成StorageLevel. MEMORY_AND_DISK_SER_2java

1 // 缺點,不能本身維護消費 topic partition 的 offset 2 // 優勢,開啓 WAL,來確保 exactly-once 語義
3 val stream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder]( 4     ssc,kafkaParams,map,StorageLevel.MEMORY_AND_DISK_SER_2)

從Kafka 中讀取數據

Driver 規劃 receiver 運行的信息

org.apache.spark.streaming.StreamingContext#start中啓動了 JobScheduler實例node

 1 // private[streaming] val scheduler = new JobScheduler(this)  2 
 3 // Start the streaming scheduler in a new thread, so that thread local properties  4 // like call sites and job groups can be reset without affecting those of the  5 // current thread.
 6 ThreadUtils.runInNewThread("streaming-start") { // 單獨的一個daemon線程運行函數題
 7  sparkContext.setCallSite(startSite.get)  8  sparkContext.clearJobGroup()  9   sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false") 10 // 執行start 方法
11  scheduler.start() 12 } 13 state = StreamingContextState.ACTIVE

 

org.apache.spark.streaming.scheduler.JobScheduler#start 源碼以下:apache

 1 def start(): Unit = synchronized {  2   if (eventLoop != null) return // scheduler has already been started
 3 
 4   logDebug("Starting JobScheduler")  5   eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {  6     override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)  7 
 8     override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)  9  } 10  eventLoop.start() 11 
12   // attach rate controllers of input streams to receive batch completion updates
13   for { 14     inputDStream <- ssc.graph.getInputStreams 15     rateController <- inputDStream.rateController 16  } ssc.addStreamingListener(rateController) 17 
18  listenerBus.start(ssc.sparkContext) 19   receiverTracker = new ReceiverTracker(ssc) 20   inputInfoTracker = new InputInfoTracker(ssc) 21  receiverTracker.start() 22  jobGenerator.start() 23   logInfo("Started JobScheduler") 24 }

 

 

ReceiverTracker 的類聲明以下:api

1 This class manages the execution of the receivers of ReceiverInputDStreams. Instance of this class must be created after all input streams have been added and StreamingContext.start() has been called because it needs the final set of input streams at the time of instantiation. 2 此類負責執行ReceiverInputDStreams的receiver。必須在添加全部輸入流並調用StreamingContext.start()以後建立此類的實例,由於它在實例化時須要最終的輸入流集。

 

其 start 方法以下:數組

 1 /** Start the endpoint and receiver execution thread. */
 2 def start(): Unit = synchronized {  3   if (isTrackerStarted) {  4     throw new SparkException("ReceiverTracker already started")  5  }  6 
 7   if (!receiverInputStreams.isEmpty) {  8 // 創建rpc endpoint
 9     endpoint = ssc.env.rpcEnv.setupEndpoint( // 注意,這是一個driver的 endpoint
10       "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) 11 // driver節點上發送啓動 receiver 命令
12     if (!skipReceiverLaunch) launchReceivers() 13     logInfo("ReceiverTracker started") 14     trackerState = Started 15  } 16 } 17 
18 /**
19  * Get the receivers from the ReceiverInputDStreams, distributes them to the 20  * worker nodes as a parallel collection, and runs them. 21  */
22 // 從ReceiverInputDStreams 獲取到 receivers,而後將它們分配到不一樣的 worker 節點並運行它們。
23 private def launchReceivers(): Unit = { 24   val receivers = receiverInputStreams.map(nis => { 25 // 未啓用WAL 是KafkaReceiver,啓動WAL後是ReliableKafkaReceiver
26     val rcvr = nis.getReceiver() 27  rcvr.setReceiverId(nis.id) 28  rcvr 29  }) 30   // 運行一個簡單的應用來確保全部的salve node都已經啓動起來,避免全部的 receiver 任務都在同一個local node上
31  runDummySparkJob() 32 
33   logInfo("Starting " + receivers.length + " receivers") 34   endpoint.send(StartAllReceivers(receivers)) // 發送請求driver 轉發 啓動 receiver 的命令
35 }

Driver 端StartAllReceivers 的處理代碼以下:緩存

 1 override def receive: PartialFunction[Any, Unit] = {  2   // Local messages
 3   case StartAllReceivers(receivers) =>
 4 // schduleReceiver
 5     val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)  6     for (receiver <- receivers) {  7       val executors = scheduledLocations(receiver.streamId)  8  updateReceiverScheduledExecutors(receiver.streamId, executors)  9       receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation 10  startReceiver(receiver, executors) 11  } 12 …… 13 }

 

getExecutors源碼以下:session

 1 /**
 2  * Get the list of executors excluding driver  3  */
 4 // 若是是 local 模式,返回 本地線程; 若是是 yarn 模式,返回 非driver 節點上的 excutors
 5 private def getExecutors: Seq[ExecutorCacheTaskLocation] = {  6   if (ssc.sc.isLocal) { // 若是在 local 模式下運行
 7     val blockManagerId = ssc.sparkContext.env.blockManager.blockManagerId  8  Seq(ExecutorCacheTaskLocation(blockManagerId.host, blockManagerId.executorId))  9   } else { // 在 yarn 模式下,過濾掉 driver 的 executor
10     ssc.sparkContext.env.blockManager.master.getMemoryStatus.filter { case (blockManagerId, _) =>
11       blockManagerId.executorId != SparkContext.DRIVER_IDENTIFIER // Ignore the driver location
12     }.map { case (blockManagerId, _) =>
13  ExecutorCacheTaskLocation(blockManagerId.host, blockManagerId.executorId) 14  }.toSeq 15  } 16 }

org.apache.spark.streaming.scheduler.ReceiverSchedulingPolicy#scheduleReceivers的解釋以下:app

1 Try our best to schedule receivers with evenly distributed. However, if the preferredLocations of receivers are not even, we may not be able to schedule them evenly because we have to respect them. Here is the approach to schedule executors: 2 First, schedule all the receivers with preferred locations (hosts), evenly among the executors running on those host. 3 Then, schedule all other receivers evenly among all the executors such that overall distribution over all the receivers is even. 4 This method is called when we start to launch receivers at the first time. 5 該方法就是確保receiver 可以在worker node 上均勻分佈的。遵循如下兩個原則: 6 1.使用 preferred location 分配 receiver 到這些node 上 7 2.將其餘的未分配的receiver均勻分佈均勻分佈到 每個 worker node 上

org.apache.spark.streaming.scheduler.ReceiverTracker#updateReceiverScheduledExecutors 負責更新receiverid 和 receiver info 的映射關係,源碼以下:負載均衡

 1 private def updateReceiverScheduledExecutors(  2     receiverId: Int, scheduledLocations: Seq[TaskLocation]): Unit = {  3   val newReceiverTrackingInfo = receiverTrackingInfos.get(receiverId) match {  4     case Some(oldInfo) =>
 5       oldInfo.copy(state = ReceiverState.SCHEDULED,  6         scheduledLocations = Some(scheduledLocations))  7     case None =>
 8  ReceiverTrackingInfo(  9  receiverId, 10  ReceiverState.SCHEDULED, 11  Some(scheduledLocations), 12         runningExecutor = None) 13  } 14  receiverTrackingInfos.put(receiverId, newReceiverTrackingInfo) 15 }

 

Driver 發送分佈式啓動receiver job

startReceiver 負責啓動 receiver,源碼以下:

 1 /**  2  * Start a receiver along with its scheduled executors  3  */
 4 private def startReceiver(  5  receiver: Receiver[_],  6     scheduledLocations: Seq[TaskLocation]): Unit = {  7   def shouldStartReceiver: Boolean = {  8     // It's okay to start when trackerState is Initialized or Started
 9     !(isTrackerStopping || isTrackerStopped) 10  } 11 
12   val receiverId = receiver.streamId 13   if (!shouldStartReceiver) { 14  onReceiverJobFinish(receiverId) 15     return
16  } 17 
18   val checkpointDirOption = Option(ssc.checkpointDir) 19   val serializableHadoopConf =
20     new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration) 21 
22 // 在 worker node 上啓動 receiver 的方法
23   val startReceiverFunc: Iterator[Receiver[_]] => Unit =
24     (iterator: Iterator[Receiver[_]]) => { 25       if (!iterator.hasNext) { 26         throw new SparkException( 27           "Could not start receiver as object not found.") 28  } 29       if (TaskContext.get().attemptNumber() == 0) { 30         val receiver = iterator.next() 31         assert(iterator.hasNext == false) 32         val supervisor = new ReceiverSupervisorImpl( 33  receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption) 34  supervisor.start() 35  supervisor.awaitTermination() 36       } else { 37         // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
38  } 39  } 40 
41   // Create the RDD using the scheduledLocations to run the receiver in a Spark job
42   val receiverRDD: RDD[Receiver[_]] =
43     if (scheduledLocations.isEmpty) { 44       ssc.sc.makeRDD(Seq(receiver), 1) 45     } else { 46       val preferredLocations = scheduledLocations.map(_.toString).distinct 47       ssc.sc.makeRDD(Seq(receiver -> preferredLocations)) 48  } 49   receiverRDD.setName(s"Receiver $receiverId") 50   ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId") 51  ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite())) 52   // 提交分佈式receiver 啓動任務
53   val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit]( 54     receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ()) 55   // We will keep restarting the receiver job until ReceiverTracker is stopped
56  future.onComplete { 57     case Success(_) =>
58       if (!shouldStartReceiver) { 59  onReceiverJobFinish(receiverId) 60       } else { 61         logInfo(s"Restarting Receiver $receiverId") 62  self.send(RestartReceiver(receiver)) 63  } 64     case Failure(e) =>
65       if (!shouldStartReceiver) { 66  onReceiverJobFinish(receiverId) 67       } else { 68         logError("Receiver has been stopped. Try to restart it.", e) 69         logInfo(s"Restarting Receiver $receiverId") 70  self.send(RestartReceiver(receiver)) 71  } 72  }(submitJobThreadPool) 73   logInfo(s"Receiver ${receiver.streamId} started") 74 }

Worker節點啓動 receiver監管服務

org.apache.spark.streaming.receiver.ReceiverSupervisorImpl#ReceiverSupervisorImpl 的 start 方法以下:

 1 /** Start the supervisor */
 2 def start() {  3  onStart()  4  startReceiver()  5 }  6 override protected def onStart() { // 啓動 BlockGenerator 服務
 7  registeredBlockGenerators.foreach { _.start() }  8 }  9 // startReceiver 方法以下:
10 /** Start receiver */
11 def startReceiver(): Unit = synchronized { 12   try { 13     if (onReceiverStart()) { // 註冊receiver 成功
14       logInfo("Starting receiver") 15       receiverState = Started 16       receiver.onStart() // 啓動 receiver
17       logInfo("Called receiver onStart") 18     } else { 19       // The driver refused us
20       stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None) 21  } 22   } catch { 23     case NonFatal(t) =>
24       stop("Error starting receiver " + streamId, Some(t)) 25  } 26 }

 

註冊 receiver 到 driver節點

1 override protected def onReceiverStart(): Boolean = { 2   val msg = RegisterReceiver( 3  streamId, receiver.getClass.getSimpleName, host, executorId, endpoint) 4  trackerEndpoint.askWithRetry[Boolean](msg) 5 }

 

簡單描述一下driver 端作的事情,主要負責將其歸入到org.apache.spark.streaming.scheduler.ReceiverTracker 的管理中來,具體streamid 和 ReceiverTrackingInfo 的映射關係保存在receiverTrackingInfos中。

org.apache.spark.streaming.scheduler.ReceiverTracker#registerReceiver關鍵代碼以下:

 1 val name = s"${typ}-${streamId}"
 2 val receiverTrackingInfo = ReceiverTrackingInfo(  3  streamId,  4  ReceiverState.ACTIVE,  5   scheduledLocations = None,  6   runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)),  7   name = Some(name),  8   endpoint = Some(receiverEndpoint))  9 receiverTrackingInfos.put(streamId, receiverTrackingInfo) 10 listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo))

 

啓動 receiver 線程

因爲咱們啓用了 WAL, 因此 這裏的receiver 是ReliableKafkaReceiver 的實例
receiver.onStart 即 org.apache.spark.streaming.kafka.ReliableKafkaReceiver#onStart, 源碼以下:

 1 override def onStart(): Unit = {  2   logInfo(s"Starting Kafka Consumer Stream with group: $groupId")  3 
 4   // Initialize the topic-partition / offset hash map.  5 // 1. 負責維護消費的 topic-partition 和 offset 的映射關係
 6   topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]  7 
 8   // Initialize the stream block id / offset snapshot hash map.  9 // 2. 負責維護 block-id 和 partition-offset 之間的映射關係
10   blockOffsetMap = new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]() 11 
12   // Initialize the block generator for storing Kafka message. 13 // 3. 負責保存 kafka message 的 block generator,入參是GeneratedBlockHandler 實例,這是一個負責監聽 block generator事件的一個監聽器 14 // Generates batches of objects received by a org.apache.spark.streaming.receiver.Receiver and puts them into appropriately named blocks at regular intervals. This class starts two threads, one to periodically start a new batch and prepare the previous batch of as a block, the other to push the blocks into the block manager. 
15   blockGenerator = supervisor.createBlockGenerator(new GeneratedBlockHandler) 16   // 4. 關閉consumer 自動提交 offset 選項 17 // auto_offset_commit 應該是 false
18   if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") { 19     logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " +
20       "otherwise we will manually set it to false to turn off auto offset commit in Kafka") 21  } 22 
23   val props = new Properties() 24   kafkaParams.foreach(param => props.put(param._1, param._2)) 25   // Manually set "auto.commit.enable" to "false" no matter user explicitly set it to true, 26   // we have to make sure this property is set to false to turn off auto commit mechanism in Kafka.
27   props.setProperty(AUTO_OFFSET_COMMIT, "false") 28 
29   val consumerConfig = new ConsumerConfig(props) 30 
31   assert(!consumerConfig.autoCommitEnable) 32 
33   logInfo(s"Connecting to Zookeeper: ${consumerConfig.zkConnect}") 34 // 5. 初始化 consumer 對象 35 // consumerConnector 是ZookeeperConsumerConnector的實例
36   consumerConnector = Consumer.create(consumerConfig) 37   logInfo(s"Connected to Zookeeper: ${consumerConfig.zkConnect}") 38   // 6. 初始化zookeeper 的客戶端
39   zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs, 40  consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer) 41    // 7. 建立線程池來處理消息流,池的大小是固定的,爲partition 的總數,並指定線程池中每個線程的name 的前綴,內部使用ThreadPoolExecutor,而且 建立線程的 factory類是guava 工具包提供的。
42   messageHandlerThreadPool = ThreadUtils.newDaemonFixedThreadPool( 43     topics.values.sum, "KafkaMessageHandler") 44    // 8. 啓動 BlockGenerator內的兩個線程
45  blockGenerator.start() 46 
47 // 9. 建立MessageStream對象
48   val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) 49  .newInstance(consumerConfig.props) 50  .asInstanceOf[Decoder[K]] 51 
52   val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) 53  .newInstance(consumerConfig.props) 54  .asInstanceOf[Decoder[V]] 55  
56   val topicMessageStreams = consumerConnector.createMessageStreams( 57  topics, keyDecoder, valueDecoder) 58 // 10. 將待處理的MessageHandler 放入 線程池中,等待執行
59   topicMessageStreams.values.foreach { streams =>
60     streams.foreach { stream =>
61       messageHandlerThreadPool.submit(new MessageHandler(stream)) 62  } 63  } 64 }

其中, 第9 步,建立MessageStream對象,
kafka.consumer.ZookeeperConsumerConnector#createMessageStreams 方法以下:

1 def createMessageStreams[K,V](topicCountMap: Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V]) 2     : Map[String, List[KafkaStream[K,V]]] = { 3   if (messageStreamCreated.getAndSet(true)) 4     throw new MessageStreamsExistException(this.getClass.getSimpleName +
5                                  " can create message streams at most once",null) 6  consume(topicCountMap, keyDecoder, valueDecoder) 7 }

 

其調用了 consume 方法,源碼以下:

def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V]) : Map[String,List[KafkaStream[K,V]]] = { debug("entering consume ") if (topicCountMap == null) throw new RuntimeException("topicCountMap is null") // 1. 初始化 topicCount
  val topicCount = TopicCount.constructTopicCount(consumerIdString, topicCountMap) // 2. 獲取 每個topic 和 threadId 集合的映射關係
  val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic // make a list of (queue,stream) pairs, one pair for each threadId // 3. 獲得每個 threadId 對應 (queue, stream) 的映射列表
  val queuesAndStreams = topicThreadIds.values.map(threadIdSet => threadIdSet.map(_ => { val queue =  new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages) val stream = new KafkaStream[K,V]( queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId) (queue, stream) }) ).flatten.toList // 4. 獲取 groupId 在 zookeeper 中的path
  val dirs = new ZKGroupDirs(config.groupId) // 5. 註冊 consumer 到 groupId(在zk中)
 registerConsumerInZK(dirs, consumerIdString, topicCount) // 6. 從新初始化 consumer
 reinitializeConsumer(topicCount, queuesAndStreams) // 7. 返回流 
 loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K,V]]]] }

 

consumer消費kafka數據

在 kafka.consumer.ZookeeperConsumerConnector#consume方法中,有以下操做:

 1 // 獲得每個 threadId 對應 (queue, stream) 的映射列表
 2   val queuesAndStreams = topicThreadIds.values.map(threadIdSet =>
 3     threadIdSet.map(_ => {  4       val queue =  new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)  5       val stream = new KafkaStream[K,V](  6  queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId)  7  (queue, stream)  8  })  9  ).flatten.toList 10  // 獲取 groupId 在 zookeeper 中的path
11   val dirs = new ZKGroupDirs(config.groupId) 12 // 註冊 consumer 到 groupId(在zk中)
13  registerConsumerInZK(dirs, consumerIdString, topicCount) 14 // 從新初始化 consumer
15   reinitializeConsumer(topicCount, queuesAndStreams)

在上面的代碼中,能夠看到初始化的queue(LinkedBlockingQueue實例)除了被傳入stream(KafkaStream)的構造函數被迭代器從中取數據,還和 stream 重組成Tuple2[LinkedBlockingQueue[FetchedDataChunk]的list,以後被傳入reinitializeConsumer 方法中。
kafka.consumer.ZookeeperConsumerConnector#reinitializeConsume 其源碼以下:

 1 private def reinitializeConsumer[K,V](  2  topicCount: TopicCount,  3  queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[K,V])]) {  4  // 1. 獲取 該groupid 在 zk 中的路徑
 5   val dirs = new ZKGroupDirs(config.groupId)  6 
 7   // listener to consumer and partition changes  8 // 2. 初始化loadBalancerListener,這個負載均衡listener 會時刻監控 consumer 和 partition 的變化
 9   if (loadBalancerListener == null) { 10     val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[K,V]]] 11     loadBalancerListener = new ZKRebalancerListener( 12  config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]]) 13  } 14 
15   // create listener for session expired event if not exist yet 16   // 3. 監控 session 過時的listner, 有新session註冊初始化,會通知 loadBalancer
17 if (sessionExpirationListener == null) 18     sessionExpirationListener = new ZKSessionExpireListener( 19  dirs, consumerIdString, topicCount, loadBalancerListener) 20 
21   // create listener for topic partition change event if not exist yet 22 // 4. 初始化ZKTopicPartitionChangeListener實例,當topic partition 變化時,這個listener會通知 loadBalancer
23   if (topicPartitionChangeListener == null) 24     topicPartitionChangeListener = new ZKTopicPartitionChangeListener(loadBalancerListener) 25  // 5. 將queuesAndStreams 的值通過一系列轉換,並添加到loadBalancerListener.kafkaMessageAndMetadataStreams 中
26   val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams 27 
28   // map of {topic -> Set(thread-1, thread-2, ...)}
29   val consumerThreadIdsPerTopic: Map[String, Set[ConsumerThreadId]] =
30  topicCount.getConsumerThreadIdsPerTopic 31 
32   val allQueuesAndStreams = topicCount match { 33     case wildTopicCount: WildcardTopicCount => // 這裏是WildcardTopicCount,走這個分支
34       /*
35  * Wild-card consumption streams share the same queues, so we need to 36  * duplicate the list for the subsequent zip operation. 37        */
38       (1 to consumerThreadIdsPerTopic.keySet.size).flatMap(_ => queuesAndStreams).toList 39     case statTopicCount: StaticTopicCount =>
40  queuesAndStreams 41  } 42 
43   val topicThreadIds = consumerThreadIdsPerTopic.map { 44     case(topic, threadIds) =>
45  threadIds.map((topic, _)) 46  }.flatten 47 
48   require(topicThreadIds.size == allQueuesAndStreams.size, 49     "Mismatch between thread ID count (%d) and queue count (%d)"
50  .format(topicThreadIds.size, allQueuesAndStreams.size)) 51   val threadQueueStreamPairs = topicThreadIds.zip(allQueuesAndStreams) 52 
53   threadQueueStreamPairs.foreach(e => { 54     val topicThreadId = e._1 55     val q = e._2._1 56  topicThreadIdAndQueues.put(topicThreadId, q) 57     debug("Adding topicThreadId %s and queue %s to topicThreadIdAndQueues data structure".format(topicThreadId, q.toString)) 58  newGauge( 59       "FetchQueueSize", 60       new Gauge[Int] { 61         def value = q.size 62  }, 63       Map("clientId" -> config.clientId, 64         "topic" -> topicThreadId._1, 65         "threadId" -> topicThreadId._2.threadId.toString) 66  ) 67  }) 68 
69   val groupedByTopic = threadQueueStreamPairs.groupBy(_._1._1) 70   groupedByTopic.foreach(e => { 71     val topic = e._1 72     val streams = e._2.map(_._2._2).toList 73     topicStreamsMap += (topic -> streams) 74     debug("adding topic %s and %d streams to map.".format(topic, streams.size)) 75  }) 76 
77   // listener to consumer and partition changes 78 // 6. 使用 zkClient 註冊sessionExpirationListener 實例
79  zkClient.subscribeStateChanges(sessionExpirationListener) 80  // 7. 使用 zkClient 註冊loadBalancerListener 實例
81  zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener) 82  // 遍歷每個topic,使用zkClient 註冊topicPartitionChangeListener 實例
83   topicStreamsMap.foreach { topicAndStreams =>
84     // register on broker partition path changes
85     val topicPath = BrokerTopicsPath + "/" + topicAndStreams._1 86  zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListener) 87  } 88 
89   // explicitly trigger load balancing for this consumer 90 // 8. 使用 loadBalancerListener 同步作負載均衡
91  loadBalancerListener.syncedRebalance() 92 }

重點看 第 8 步,使用 loadBalancerListener 同步作負載均衡。
kafka.consumer.ZookeeperConsumerConnector.ZKRebalancerListener#syncedRebalance 源碼以下:

 1 def syncedRebalance() {  2   rebalanceLock synchronized {  3  rebalanceTimer.time {  4       if(isShuttingDown.get())  { // 若是ZookeeperConsumerConnector
 5 已經shutdown了,直接返回  6         return
 7       } else {  8         for (i <- 0 until config.rebalanceMaxRetries) { // 默認是 4 次
 9           info("begin rebalancing consumer " + consumerIdString + " try #" + i) 10           var done = false
11           var cluster: Cluster = null
12           try { 13             // 1. 根據zkClient 實例 獲取並建立Cluster 對象,這個 cluster 實例包含了一個 Broker(broker的id,broker在zk中的路徑) 列表
14             cluster = getCluster(zkClient) 15             // 2. 在cluster中作 rebalance操做
16             done = rebalance(cluster) 17           } catch { 18             case e: Throwable =>
19               /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating. 20  * For example, a ZK node can disappear between the time we get all children and the time we try to get 21  * the value of a child. Just let this go since another rebalance will be triggered. 22  **/
23               info("exception during rebalance ", e) 24  } 25           info("end rebalancing consumer " + consumerIdString + " try #" + i) 26           if (done) { 27             return
28           } else { 29             /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should 30  * clear the cache */
31             info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered") 32  } 33           // stop all fetchers and clear all the queues to avoid data duplication
34           closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2)) 35  Thread.sleep(config.rebalanceBackoffMs) 36  } 37  } 38  } 39  } 40 
41   throw new ConsumerRebalanceFailedException(consumerIdString + " can't rebalance after " + config.rebalanceMaxRetries +" retries") 42 }

 

重點看 第2 步,在 cluster 中作 rebalance 操做,kafka.consumer.ZookeeperConsumerConnector.ZKRebalancerListener#rebalance 源碼以下:

 1 private def rebalance(cluster: Cluster): Boolean = {  2   // 1. 獲取 group和 threadId 的Map 映射關係
 3   val myTopicThreadIdsMap = TopicCount.constructTopicCount(  4  group, consumerIdString, zkClient, config.excludeInternalTopics).getConsumerThreadIdsPerTopic  5   // 2. 獲取kafka cluster 中全部可用的node
 6   val brokers = getAllBrokersInCluster(zkClient)  7   if (brokers.size == 0) { // 若是可用節點爲空,設置listener訂閱,返回 true  8     // This can happen in a rare case when there are no brokers available in the cluster when the consumer is started.  9     // We log an warning and register for child changes on brokers/id so that rebalance can be triggered when the brokers 10     // are up.
11     warn("no brokers found when trying to rebalance.") 12  zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, loadBalancerListener) 13     true
14  } 15   else { 16     /**
17  * fetchers must be stopped to avoid data duplication, since if the current 18  * rebalancing attempt fails, the partitions that are released could be owned by another consumer. 19  * But if we don't stop the fetchers first, this consumer would continue returning data for released 20  * partitions in parallel. So, not stopping the fetchers leads to duplicate data. 21      */
22    // 3. 作rebalance 以前的準備工做 23    // 3.1. 關閉現有 fetcher 鏈接
24  closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap) 25    // 3.2 釋放 partition 的全部權(主要是刪除zk下的owner 節點的數據以及解除內存中的topic和 fetcher的關聯關係)
26  releasePartitionOwnership(topicRegistry) 27    // 3.3. 從新給partition分配 fetcher
28     val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient) 29     val partitionOwnershipDecision = partitionAssignor.assign(assignmentContext) 30     val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]( 31       valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo])) 32 
33     // fetch current offsets for all topic-partitions 34     // 3.4 獲取當前fetcher對應的 partitions 的 offsets,這裏的offset是指 consumer 下一個要消費的offset
35     val topicPartitions = partitionOwnershipDecision.keySet.toSeq 36 
37     val offsetFetchResponseOpt = fetchOffsets(topicPartitions) 38 
39     if (isShuttingDown.get || !offsetFetchResponseOpt.isDefined) 40       false
41     else { 42       // 3.5 更新 partition 和 fetcher 的對應關係
43       val offsetFetchResponse = offsetFetchResponseOpt.get 44       topicPartitions.foreach(topicAndPartition => { 45         val (topic, partition) = topicAndPartition.asTuple 46 // requestInfo是OffsetFetchResponse實例中的成員變量,它是一個Map[TopicAndPartition, OffsetMetadataAndError]實例
47         val offset = offsetFetchResponse.requestInfo(topicAndPartition).offset 48         val threadId = partitionOwnershipDecision(topicAndPartition) 49  addPartitionTopicInfo(currentTopicRegistry, partition, topic, offset, threadId) 50  }) 51 
52       /**
53  * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt 54  * A rebalancing attempt is completed successfully only after the fetchers have been started correctly 55        */
56       if(reflectPartitionOwnershipDecision(partitionOwnershipDecision)) { 57         allTopicsOwnedPartitionsCount = partitionOwnershipDecision.size 58 
59         partitionOwnershipDecision.view.groupBy { case(topicPartition, consumerThreadId) => topicPartition.topic } 60                                   .foreach { case (topic, partitionThreadPairs) =>
61           newGauge("OwnedPartitionsCount", 62             new Gauge[Int] { 63               def value() = partitionThreadPairs.size 64  }, 65  ownedPartitionsCountMetricTags(topic)) 66  } 67         // 3.6 將已經新的 topic registry 覆蓋舊的
68         topicRegistry = currentTopicRegistry 69 // 4. 更新 fetcher
70  updateFetcher(cluster) 71         true
72       } else { 73         false
74  } 75  } 76  } 77 }

其中addPartitionTopicInfo 源碼以下:

 1 private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]],  2  partition: Int, topic: String,  3  offset: Long, consumerThreadId: ConsumerThreadId) {  4 //若是map沒有對應的 key,會使用valueFactory初始化鍵值對,並返回 對應的 value
 5     val partTopicInfoMap = currentTopicRegistry.getAndMaybePut(topic)  6 
 7     val queue = topicThreadIdAndQueues.get((topic, consumerThreadId))  8     val consumedOffset = new AtomicLong(offset)  9     val fetchedOffset = new AtomicLong(offset) 10     val partTopicInfo = new PartitionTopicInfo(topic, 11  partition, 12  queue, 13  consumedOffset, 14  fetchedOffset, 15                                                new AtomicInteger(config.fetchMessageMaxBytes), 16  config.clientId) 17     // 1. 將其註冊到新的 Topic註冊中心中,即註冊 partition 和 fetcher 的關係
18 partTopicInfoMap.put(partition, partTopicInfo) 19     debug(partTopicInfo + " selected new offset " + offset) 20 // 2. 更新consumer 的 已經消費的offset信息
21  checkpointedZkOffsets.put(TopicAndPartition(topic, partition), offset) 22  } 23 }

 

第4步, 更新 fetcher 源碼以下:

 1 private def updateFetcher(cluster: Cluster) {  2   // update partitions for fetcher
 3   var allPartitionInfos : List[PartitionTopicInfo] = Nil  4   for (partitionInfos <- topicRegistry.values)  5     for (partition <- partitionInfos.values)  6       allPartitionInfos ::= partition  7   info("Consumer " + consumerIdString + " selected partitions : " +
 8     allPartitionInfos.sortWith((s,t) => s.partitionId < t.partitionId).map(_.toString).mkString(","))  9 
10  fetcher match { 11     case Some(f) =>
12  f.startConnections(allPartitionInfos, cluster) 13     case None =>
14  } 15 }

 

其中,f.startConnections方法真正執行 更新操做。此時引入一個新的類。即 fetcher 類,kafka.consumer.ConsumerFetcherManager。

kafka.consumer.ConsumerFetcherManager#startConnections 的源碼以下:

 1 def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster) {  2 // LeaderFinderThread 在 topic 的leader node可用時,將 fetcher 添加到 leader 節點上
 3   leaderFinderThread = new LeaderFinderThread(consumerIdString + "-leader-finder-thread")  4  leaderFinderThread.start()  5 
 6  inLock(lock) {  7 // 更新ConsumerFetcherManager 成員變量
 8     partitionMap = topicInfos.map(tpi => (TopicAndPartition(tpi.topic, tpi.partitionId), tpi)).toMap  9     this.cluster = cluster 10     noLeaderPartitionSet ++= topicInfos.map(tpi => TopicAndPartition(tpi.topic, tpi.partitionId)) 11  cond.signalAll() 12  } 13 }

 

ConsumerFetcherManager 有一個LeaderFinderThread 實例,該類的父類kafka.utils.ShutdownableThread ,run 方法以下:

 1 override def run(): Unit = {  2   info("Starting ")  3   try{  4     while(isRunning.get()){  5  doWork()  6  }  7   } catch{  8     case e: Throwable =>
 9       if(isRunning.get()) 10         error("Error due to ", e) 11  } 12  shutdownLatch.countDown() 13   info("Stopped ") 14 }

doWork其實就是一個抽象方法,其子類LeaderFinderThread的實現以下:

 1 // thread responsible for adding the fetcher to the right broker when leader is available
 2 override def doWork() {  3 // 1. 獲取 partition 和leader node的映射關係
 4   val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker]  5  lock.lock()  6   try {  7     while (noLeaderPartitionSet.isEmpty) { // 這個字段在startConnections 已更新新值
 8       trace("No partition for leader election.")  9  cond.await() 10  } 11 
12     trace("Partitions without leader %s".format(noLeaderPartitionSet)) 13     val brokers = getAllBrokersInCluster(zkClient) // 獲取全部可用broker 節點 14     // 獲取kafka.api.TopicMetadata 序列,kafka.api.TopicMetadata 保存了 topic 和 partitionId,isr,leader,replicas 的信息
15 val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, 16  brokers, 17  config.clientId, 18  config.socketTimeoutMs, 19  correlationId.getAndIncrement).topicsMetadata 20     if(logger.isDebugEnabled) topicsMetadata.foreach(topicMetadata => debug(topicMetadata.toString())) 21 // 2. 根據獲取到的 partition 和 leader node 的關係更新noLeaderPartitionSet 和leaderForPartitionsMap 兩個map集合,其中noLeaderPartitionSet 包含的是沒有肯定leader 的 partition 集合,leaderForPartitionsMap 是 已經肯定了 leader 的 partition 集合。
22     topicsMetadata.foreach { tmd =>
23       val topic = tmd.topic 24       tmd.partitionsMetadata.foreach { pmd =>
25         val topicAndPartition = TopicAndPartition(topic, pmd.partitionId) 26         if(pmd.leader.isDefined && noLeaderPartitionSet.contains(topicAndPartition)) { 27           val leaderBroker = pmd.leader.get 28  leaderForPartitionsMap.put(topicAndPartition, leaderBroker) 29           noLeaderPartitionSet -= topicAndPartition 30  } 31  } 32  } 33   } catch { 34     case t: Throwable => { 35         if (!isRunning.get()) 36           throw t /* If this thread is stopped, propagate this exception to kill the thread. */
37         else
38           warn("Failed to find leader for %s".format(noLeaderPartitionSet), t) 39  } 40   } finally { 41  lock.unlock() 42  } 43 
44   try { 45 // 3. 具體爲 partition 分配 fetcher
46  addFetcherForPartitions(leaderForPartitionsMap.map{ 47       case (topicAndPartition, broker) =>
48         topicAndPartition -> BrokerAndInitialOffset(broker, partitionMap(topicAndPartition).getFetchOffset())} 49  ) 50   } catch { 51     case t: Throwable => { 52       if (!isRunning.get()) 53         throw t /* If this thread is stopped, propagate this exception to kill the thread. */
54       else { 55         warn("Failed to add leader for partitions %s; will retry".format(leaderForPartitionsMap.keySet.mkString(",")), t) 56  lock.lock() 57         noLeaderPartitionSet ++= leaderForPartitionsMap.keySet 58  lock.unlock() 59  } 60  } 61  } 62   // 4. 關閉空閒fetcher線程
63  shutdownIdleFetcherThreads() 64  Thread.sleep(config.refreshLeaderBackoffMs) 65 }

 

重點看第3 步,具體爲 partition 分配 fetcher,addFetcherForPartitions 源碼以下:

 1 def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset]) {  2   mapLock synchronized {  3 // 獲取 fetcher 和 partition的映射關係
 4     val partitionsPerFetcher = partitionAndOffsets.groupBy{ case(topicAndPartition, brokerAndInitialOffset) =>
 5  BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic, topicAndPartition.partition))}  6     for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) {  7 
 8       var fetcherThread: AbstractFetcherThread = null
 9  fetcherThreadMap.get(brokerAndFetcherId) match { 10         case Some(f) => fetcherThread = f 11         case None =>
12 // 根據brokerAndFetcherId 去初始化Fetcher並啓動 fetcher
13           fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker) 14  fetcherThreadMap.put(brokerAndFetcherId, fetcherThread) 15  fetcherThread.start 16  } 17 
18       fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (topicAndPartition, brokerAndInitOffset) =>
19         topicAndPartition -> brokerAndInitOffset.initOffset 20  }) 21  } 22  } 23 
24   info("Added fetcher for partitions %s".format(partitionAndOffsets.map{ case (topicAndPartition, brokerAndInitialOffset) =>
25     "[" + topicAndPartition + ", initOffset " + brokerAndInitialOffset.initOffset + " to broker " + brokerAndInitialOffset.broker + "] "})) 26 }

 

kafka.consumer.ConsumerFetcherManager#createFetcherThread的源碼以下:

1 override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = { 2   new ConsumerFetcherThread( 3     "ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id), 4     config, sourceBroker, partitionMap, this) 5 }

 

先來看ConsumerFetcherThread的構造方法聲明:

 1 class ConsumerFetcherThread(name: String,  2  val config: ConsumerConfig,  3  sourceBroker: Broker,  4  partitionMap: Map[TopicAndPartition, PartitionTopicInfo],  5  val consumerFetcherManager: ConsumerFetcherManager)  6         extends AbstractFetcherThread(name = name,  7                                       clientId = config.clientId,  8                                       sourceBroker = sourceBroker,  9                                       socketTimeout = config.socketTimeoutMs, 10                                       socketBufferSize = config.socketReceiveBufferBytes, 11                                       fetchSize = config.fetchMessageMaxBytes, 12                                       fetcherBrokerId = Request.OrdinaryConsumerId, 13                                       maxWait = config.fetchWaitMaxMs, 14                                       minBytes = config.fetchMinBytes, 15                                       isInterruptible = true)

注意,partitionMap 中的value 是PartitionTopicInfo ,這個對象中封裝了存放fetch結果值的BlockingQueue[FetchedDataChunk] 實例。
再來看 run 方法,其使用的是 kafka.utils.ShutdownableThread#run 方法,上面咱們已經看過了,主要看該子類是如何從新 doWork方法的:

 1 override def doWork() {  2   inLock(partitionMapLock) { // 加鎖,執行,釋放鎖
 3     if (partitionMap.isEmpty) // 若是沒有須要執行的 fetch 操做,等待200ms後返回
 4       partitionMapCond.await(200L, TimeUnit.MILLISECONDS)  5     partitionMap.foreach { // 將全部的 fetch 的信息添加到fetchRequestBuilder中
 6       case((topicAndPartition, offset)) =>
 7  fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,  8  offset, fetchSize)  9  } 10  } 11   // 構建批抓取的fetchRequest對象
12   val fetchRequest = fetchRequestBuilder.build() 13 // 處理 FetchRequest
14   if (!fetchRequest.requestInfo.isEmpty) 15  processFetchRequest(fetchRequest) 16 }

 

其中 kafka.server.AbstractFetcherThread#processFetchRequest 源碼以下:

 1 private def processFetchRequest(fetchRequest: FetchRequest) {  2   val partitionsWithError = new mutable.HashSet[TopicAndPartition]  3   var response: FetchResponse = null
 4   try {  5     trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))  6 // 發送請求,並獲取返回值。  7 // simpleConsumer 就是SimpleConsumer 實例,已做說明,再也不贅述。
 8     response = simpleConsumer.fetch(fetchRequest)  9   } catch { 10     case t: Throwable =>
11       if (isRunning.get) { 12         warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.toString)) 13         partitionMapLock synchronized { 14           partitionsWithError ++= partitionMap.keys 15  } 16  } 17  } 18  fetcherStats.requestRate.mark() 19 
20   if (response != null) { 21     // process fetched data
22     inLock(partitionMapLock) { // 獲取鎖,執行處理response 操做,釋放鎖
23  response.data.foreach { 24         case(topicAndPartition, partitionData) =>
25           val (topic, partitionId) = topicAndPartition.asTuple 26           val currentOffset = partitionMap.get(topicAndPartition) 27           // we append to the log if the current offset is defined and it is the same as the offset requested during fetch
28           if (currentOffset.isDefined && fetchRequest.requestInfo(topicAndPartition).offset == currentOffset.get) { 29             partitionData.error match { // 根據返回碼來肯定具體執行哪部分處理邏輯
30               case ErrorMapping.NoError => // 成功返回,沒有錯誤
31                 try { 32                   val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet] 33                   val validBytes = messages.validBytes 34                   val newOffset = messages.shallowIterator.toSeq.lastOption match { 35                     case Some(m: MessageAndOffset) => m.nextOffset 36                     case None => currentOffset.get 37  } 38  partitionMap.put(topicAndPartition, newOffset) 39                   fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset 40  fetcherStats.byteRate.mark(validBytes) 41                   // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
42  processPartitionData(topicAndPartition, currentOffset.get, partitionData) 43                 } catch { 44                   case ime: InvalidMessageException => // 消息獲取不完整 45                     // we log the error and continue. This ensures two things 46                     // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag 47                     // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and 48                     // should get fixed in the subsequent fetches
49                     logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage) 50                   case e: Throwable =>
51                     throw new KafkaException("error processing data for partition [%s,%d] offset %d"
52  .format(topic, partitionId, currentOffset.get), e) 53  } 54               case ErrorMapping.OffsetOutOfRangeCode => // offset out of range error
55                 try { 56                   val newOffset = handleOffsetOutOfRange(topicAndPartition) 57  partitionMap.put(topicAndPartition, newOffset) 58                   error("Current offset %d for partition [%s,%d] out of range; reset offset to %d"
59  .format(currentOffset.get, topic, partitionId, newOffset)) 60                 } catch { 61                   case e: Throwable =>
62                     error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e) 63                     partitionsWithError += topicAndPartition 64  } 65               case _ =>
66                 if (isRunning.get) { 67                   error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id, 68  ErrorMapping.exceptionFor(partitionData.error).getClass)) 69                   partitionsWithError += topicAndPartition 70  } 71  } 72  } 73  } 74  } 75  } 76 
77   if(partitionsWithError.size > 0) { 78     debug("handling partitions with error for %s".format(partitionsWithError)) 79  handlePartitionsWithErrors(partitionsWithError) 80  } 81 }

 

其中processPartitionData 源碼以下,它負責處理具體的返回消息:

 1 // process fetched data
 2 def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) {  3 // partitionMap 是一個成員變量,在構造函數中做爲入參
 4   val pti = partitionMap(topicAndPartition)  5   if (pti.getFetchOffset != fetchOffset)  6     throw new RuntimeException("Offset doesn't match for partition [%s,%d] pti offset: %d fetch offset: %d"
 7  .format(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset, fetchOffset))  8 // 數據入隊
 9  pti.enqueue(partitionData.messages.asInstanceOf[ByteBufferMessageSet]) 10 }

能夠看到,終於在這裏,把從leader中fetch的消息放入了BlockingQueue[FetchedDataChunk] 緩衝堵塞隊列中。

KafkaStream從queue中堵塞式獲取數據

KafkaStream 是依賴於 LinkedBlockingQueue 的同理 KafkaStream 也會返回一個迭代器 kafka.consumer.ConsumerIterator,用於迭代訪問 KafkaStream 中的數據。
kafka.consumer.ConsumerIterator 的主要源碼以下:

 1 // 判斷是否有下一個元素
 2 def hasNext(): Boolean = {  3   if(state == FAILED)  4     throw new IllegalStateException("Iterator is in failed state")  5  state match {  6     case DONE => false
 7     case READY => true
 8     case _ => maybeComputeNext()  9  } 10 } 11 // 獲取下一個元素,父類實現
12 def next(): T = { 13   if(!hasNext()) 14     throw new NoSuchElementException() 15   state = NOT_READY 16   if(nextItem == null) 17     throw new IllegalStateException("Expected item but none found.") 18  nextItem 19 } 20 // 獲取下一個元素,使用子類ConsumerIterator實現
21 override def next(): MessageAndMetadata[K, V] = { 22   val item = super.next() // 調用父類實現
23   if(consumedOffset < 0) 24     throw new KafkaException("Offset returned by the message set is invalid %d".format(consumedOffset)) 25  currentTopicInfo.resetConsumeOffset(consumedOffset) 26   val topic = currentTopicInfo.topic 27   trace("Setting %s consumed offset to %d".format(topic, consumedOffset)) 28  consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark() 29  consumerTopicStats.getConsumerAllTopicStats().messageRate.mark() 30  item 31 } 32  // 或許有,嘗試計算一下下一個
33 def maybeComputeNext(): Boolean = { 34   state = FAILED 35   nextItem = makeNext() 36   if(state == DONE) { 37     false
38   } else { 39     state = READY 40     true
41  } 42 } 43 // 建立下一個元素,這個在子類ConsumerIterator中有實現
44 protected def makeNext(): MessageAndMetadata[K, V] = { 45 // 首先channel 是 LinkedBlockingQueue實例, 是 KafkaStream 中的 queue 成員變量,queue 成員變量
46   var currentDataChunk: FetchedDataChunk = null
47   // if we don't have an iterator, get one
48   var localCurrent = current.get() 49 // 若是沒有迭代器或者是沒有下一個元素了,須要從channel中取一個
50   if(localCurrent == null || !localCurrent.hasNext) { 51 // 刪除並返回隊列的頭節點。
52     if (consumerTimeoutMs < 0) 53       currentDataChunk = channel.take // 阻塞方法,一直等待,直到有可用元素
54     else { 55       currentDataChunk = channel.poll(consumerTimeoutMs,  TimeUnit.MILLISECONDS) // 阻塞方法,等待指定時間,超時也會返回
56       if (currentDataChunk == null) { // 若是沒有數據,重置狀態爲NOT_READY 57         // reset state to make the iterator re-iterable
58  resetState() 59         throw new ConsumerTimeoutException 60  } 61  } 62 // 關閉命令
63     if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) { 64       debug("Received the shutdown command") 65       return allDone // 該函數將狀態設爲DONE, 返回null
66     } else { 67       currentTopicInfo = currentDataChunk.topicInfo 68       val cdcFetchOffset = currentDataChunk.fetchOffset 69       val ctiConsumeOffset = currentTopicInfo.getConsumeOffset 70       if (ctiConsumeOffset < cdcFetchOffset) { 71         error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data"
72  .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo)) 73  currentTopicInfo.resetConsumeOffset(cdcFetchOffset) 74  } 75       localCurrent = currentDataChunk.messages.iterator 76 
77  current.set(localCurrent) 78  } 79     // if we just updated the current chunk and it is empty that means the fetch size is too small!
80     if(currentDataChunk.messages.validBytes == 0) 81       throw new MessageSizeTooLargeException("Found a message larger than the maximum fetch size of this consumer on topic " +
82                                              "%s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow."
83  .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset)) 84  } 85   var item = localCurrent.next() 86   // reject the messages that have already been consumed
87   while (item.offset < currentTopicInfo.getConsumeOffset && localCurrent.hasNext) { 88     item = localCurrent.next() 89  } 90   consumedOffset = item.nextOffset 91 
92   item.message.ensureValid() // validate checksum of message to ensure it is valid 93  // 返回處理封裝好的 kafka 數據
94   new MessageAndMetadata(currentTopicInfo.topic, currentTopicInfo.partitionId, item.message, item.offset, keyDecoder, valueDecoder) 95 }

 

消費到的數據cache 到WAL中

咱們再來看,org.apache.spark.streaming.kafka.ReliableKafkaReceiver#onStart 的第10 步相應的代碼:

1 // 10. 將待處理的MessageHandler 放入 線程池中,等待執行
2   topicMessageStreams.values.foreach { streams =>
3     streams.foreach { stream =>
4       messageHandlerThreadPool.submit(new MessageHandler(stream)) 5  } 6   }

其中 MessageHandler 是一個 Runnable 對象,其 run 方法以下:

 1 override def run(): Unit = {  2   while (!isStopped) {  3     try {  4 // 1. 獲取ConsumerIterator 迭代器對象
 5       val streamIterator = stream.iterator()  6       // 2. 遍歷迭代器中獲取每一條數據,而且保存message和相應的 metadata 信息
 7 while (streamIterator.hasNext) {  8  storeMessageAndMetadata(streamIterator.next)  9  } 10     } catch { 11       case e: Exception =>
12         reportError("Error handling message", e) 13  } 14  } 15 }

 

其中第二步中關鍵方法,org.apache.spark.streaming.kafka.ReliableKafkaReceiver#storeMessageAndMetadata 方法以下:

1 /** Store a Kafka message and the associated metadata as a tuple. */
2 private def storeMessageAndMetadata( 3     msgAndMetadata: MessageAndMetadata[K, V]): Unit = { 4   val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.partition) 5   val data = (msgAndMetadata.key, msgAndMetadata.message) 6   val metadata = (topicAndPartition, msgAndMetadata.offset) 7 // 添加數據到 block
8  blockGenerator.addDataWithCallback(data, metadata) 9 }

addDataWithCallback 源碼以下:

 1 /**
 2  * Push a single data item into the buffer. After buffering the data, the  3  * `BlockGeneratorListener.onAddData` callback will be called.  4  */
 5 def addDataWithCallback(data: Any, metadata: Any): Unit = {  6   if (state == Active) {  7  waitToPush()  8     synchronized {  9       if (state == Active) { 10 // 1. 將數據放入 buffer 中,以便處理線程從中獲取數據
11         currentBuffer += data 12 // 2. 在啓動 receiver線程中,能夠知道listener 是指GeneratedBlockHandler 實例
13  listener.onAddData(data, metadata) 14       } else { 15         throw new SparkException( 16           "Cannot add data as BlockGenerator has not been started or has been stopped") 17  } 18  } 19   } else { 20     throw new SparkException( 21       "Cannot add data as BlockGenerator has not been started or has been stopped") 22  } 23 }

 

第二步比較簡單,先看一下第二步:
org.apache.spark.streaming.kafka.ReliableKafkaReceiver.GeneratedBlockHandler#onAddData的源碼以下:

 1 def onAddData(data: Any, metadata: Any): Unit = {  2   // Update the offset of the data that was added to the generator
 3   if (metadata != null) {  4     val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)]  5  updateOffset(topicAndPartition, offset)  6  }  7 }  8 // 這裏的 updateOffset 調用的是//org.apache.spark.streaming.kafka.ReliableKafkaReceiver#updateOffset,源碼以下:
 9 /** Update stored offset */
10 private def updateOffset(topicAndPartition: TopicAndPartition, offset: Long): Unit = { 11  topicPartitionOffsetMap.put(topicAndPartition, offset) 12 }

 

第一步的原理以下:
在 BlockGenerator中有一個定時器,定時(200ms)去執行檢查currentBuffer是否爲empty任務, 若不爲空,則執行以下操做並把它放入等待生成block 的隊列中,有兩外一個線程來時刻監聽這個隊列,有數據,則執行pushBlock 操做。
第一個定時器線程以下:

 1 private val blockIntervalTimer =
 2   new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")  3 
 4 // 其中,updateCurrentBuffer 方法以下
 5 /** Change the buffer to which single records are added to. */
 6 private def updateCurrentBuffer(time: Long): Unit = {  7   try {  8     var newBlock: Block = null
 9     synchronized { 10       if (currentBuffer.nonEmpty) { 11         val newBlockBuffer = currentBuffer 12         currentBuffer = new ArrayBuffer[Any] 13         val blockId = StreamBlockId(receiverId, time - blockIntervalMs) 14  listener.onGenerateBlock(blockId) 15         newBlock = new Block(blockId, newBlockBuffer) 16  } 17  } 18 
19     if (newBlock != null) { 20       blocksForPushing.put(newBlock)  // put is blocking when queue is full
21  } 22   } catch { 23     case ie: InterruptedException =>
24       logInfo("Block updating timer thread was interrupted") 25     case e: Exception =>
26       reportError("Error in block updating thread", e) 27  } 28 } 29 
30 // listener.onGenerateBlock(blockId) 代碼以下:
31 def onGenerateBlock(blockId: StreamBlockId): Unit = { 32   // Remember the offsets of topics/partitions when a block has been generated
33  rememberBlockOffsets(blockId) 34 } 35 // rememberBlockOffsets 代碼以下:
36 /**
37  * Remember the current offsets for each topic and partition. This is called when a block is 38  * generated. 39  */
40 private def rememberBlockOffsets(blockId: StreamBlockId): Unit = { 41   // Get a snapshot of current offset map and store with related block id.
42   val offsetSnapshot = topicPartitionOffsetMap.toMap 43  blockOffsetMap.put(blockId, offsetSnapshot) 44  topicPartitionOffsetMap.clear() 45 } 46 // 能夠看出,主要是清除 topic-partition-> offset 映射關係 47 // 創建 block 和topic-partition-> offset的映射關係

其中,blocksForPushing是一個有界阻塞隊列,另一個線程會一直輪詢它。

 1 private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)  2 private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }  3 
 4 /** Keep pushing blocks to the BlockManager. */
 5 // 這個方法主要的做用就是一直不停地輪詢blocksForPushing隊列,並處理相應的push block 事件。
 6 private def keepPushingBlocks() {  7   logInfo("Started block pushing thread")  8 
 9   def areBlocksBeingGenerated: Boolean = synchronized { 10     state != StoppedGeneratingBlocks 11  } 12 
13   try { 14     // While blocks are being generated, keep polling for to-be-pushed blocks and push them.
15     while (areBlocksBeingGenerated) { // 線程沒有被中止,則一直循環 16 // 超時poll操做獲取並刪除頭節點,超過期間(10ms)則返回
17       Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match { 18         case Some(block) => pushBlock(block) // 若是有數據則進行處理。
19         case None =>
20  } 21  } 22 
23     // At this point, state is StoppedGeneratingBlock. So drain the queue of to-be-pushed blocks.
24     logInfo("Pushing out the last " + blocksForPushing.size() + " blocks") 25     while (!blocksForPushing.isEmpty) { // 若是隊列中還有數據,繼續進行處理
26       val block = blocksForPushing.take() // 這是一個堵塞方法,不過如今會立刻返回,由於隊列裏面有數據。
27       logDebug(s"Pushing block $block") 28       pushBlock(block) // 處理數據
29       logInfo("Blocks left to push " + blocksForPushing.size()) 30  } 31     logInfo("Stopped block pushing thread") 32   } catch { 33     case ie: InterruptedException =>
34       logInfo("Block pushing thread was interrupted") 35     case e: Exception =>
36       reportError("Error in block pushing thread", e) 37  } 38 }

 

其中的pushBlock源碼以下:

1 private def pushBlock(block: Block) { 2  listener.onPushBlock(block.id, block.buffer) 3   logInfo("Pushed block " + block.id) 4 }

其調用的listener(org.apache.spark.streaming.kafka.ReliableKafkaReceiver.GeneratedBlockHandler)的 onPushBlock 源碼以下:

1 def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = { 2   // Store block and commit the blocks offset
3  storeBlockAndCommitOffset(blockId, arrayBuffer) 4 }

其中,storeBlockAndCommitOffset具體代碼以下:

 1 /**
 2  * Store the ready-to-be-stored block and commit the related offsets to zookeeper. This method  3  * will try a fixed number of times to push the block. If the push fails, the receiver is stopped.  4  */
 5 private def storeBlockAndCommitOffset(  6     blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {  7   var count = 0
 8   var pushed = false
 9   var exception: Exception = null
10   while (!pushed && count <= 3) { // 整個過程,總共容許3 次重試
11     try { 12  store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]]) 13       pushed = true
14     } catch { 15       case ex: Exception =>
16         count += 1
17         exception = ex 18  } 19  } 20   if (pushed) { // 已經push block 21 // 更新 offset
22  Option(blockOffsetMap.get(blockId)).foreach(commitOffset) 23 // 若是已經push 到 BlockManager 中,則不會再保留 block和topic-partition-> offset的映射關係
24  blockOffsetMap.remove(blockId) 25   } else { 26     stop("Error while storing block into Spark", exception) 27  } 28 } 29 // 其中,commitOffset源碼以下:
30 /**
31  * Commit the offset of Kafka's topic/partition, the commit mechanism follow Kafka 0.8.x's 32  * metadata schema in Zookeeper. 33  */
34 private def commitOffset(offsetMap: Map[TopicAndPartition, Long]): Unit = { 35   if (zkClient == null) { 36     val thrown = new IllegalStateException("Zookeeper client is unexpectedly null") 37     stop("Zookeeper client is not initialized before commit offsets to ZK", thrown) 38     return
39  } 40 
41   for ((topicAndPart, offset) <- offsetMap) { 42     try { 43 // 獲取在 zk 中 comsumer 的partition的目錄
44       val topicDirs = new ZKGroupTopicDirs(groupId, topicAndPart.topic) 45       val zkPath = s"${topicDirs.consumerOffsetDir}/${topicAndPart.partition}"
46       // 更新 consumer 的已消費topic-partition 的offset 操做
47  ZkUtils.updatePersistentPath(zkClient, zkPath, offset.toString) 48     } catch { 49       case e: Exception =>
50         logWarning(s"Exception during commit offset $offset for topic" +
51           s"${topicAndPart.topic}, partition ${topicAndPart.partition}", e) 52  } 53 
54     logInfo(s"Committed offset $offset for topic ${topicAndPart.topic}, " +
55       s"partition ${topicAndPart.partition}") 56  } 57 }

關鍵方法store 以下:

1 /** Store an ArrayBuffer of received data as a data block into Spark's memory. */
2 def store(dataBuffer: ArrayBuffer[T]) { 3  supervisor.pushArrayBuffer(dataBuffer, None, None) 4 }

其調用了supervisor(org.apache.spark.streaming.receiver.ReceiverSupervisorImpl實例)的pushArrayBuffer方法,內部操做以下:

1 /** Store an ArrayBuffer of received data as a data block into Spark's memory. */
2 def pushArrayBuffer( 3  arrayBuffer: ArrayBuffer[_], 4  metadataOption: Option[Any], 5  blockIdOption: Option[StreamBlockId] 6  ) { 7  pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption) 8 }

org.apache.spark.streaming.receiver.ReceiverSupervisorImpl#pushAndReportBlock 源碼以下:

 1 /** Store block and report it to driver */
 2 def pushAndReportBlock(  3  receivedBlock: ReceivedBlock,  4  metadataOption: Option[Any],  5  blockIdOption: Option[StreamBlockId]  6  ) {  7 // 1.準備blockId,time等信息
 8   val blockId = blockIdOption.getOrElse(nextBlockId)  9   val time = System.currentTimeMillis 10 // 2. 執行存儲 block 操做
11   val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock) 12   logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms") 13 // 3. 獲取保存的message 的記錄數
14   val numRecords = blockStoreResult.numRecords 15 // 4. 通知trackerEndpoint已經添加block,執行更新driver 的WAL操做
16   val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult) 17  trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo)) 18   logDebug(s"Reported block $blockId") 19 }

其中,receivedBlockHandler 的賦值語句以下:

 1 private val receivedBlockHandler: ReceivedBlockHandler = {  2   if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {  3     if (checkpointDirOption.isEmpty) {  4       throw new SparkException(  5         "Cannot enable receiver write-ahead log without checkpoint directory set. " +
 6           "Please use streamingContext.checkpoint() to set the checkpoint directory. " +
 7           "See documentation for more details.")  8  }  9 // enable WAL而且checkpoint dir 不爲空,即,在這裏,返回WriteAheadLogBasedBlockHandler 對象,這個對象持有了 blockmanager,streamid,storagelevel,conf,checkpointdir 等信息
10     new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId, 11  receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get) 12   } else { 13     new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel) 14  } 15 }

ReceivedBlockHandler 的 storeBlock方法源碼以下:

 1 /**
 2  * This implementation stores the block into the block manager as well as a write ahead log.  3  * It does this in parallel, using Scala Futures, and returns only after the block has  4  * been stored in both places.  5  */
 6 // 並行地將block 存入 blockmanager 和 write ahead log,使用scala 的Future 機制實現的,當兩個都寫完畢以後,返回。
 7 def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {  8 
 9   var numRecords = None: Option[Long] 10   // Serialize the block so that it can be inserted into both 11 // 1. 將ReceivedBlock序列化(未使用壓縮機制)成字節數組
12   val serializedBlock = block match { // serializedBlock 就是序列化後的結果
13     case ArrayBufferBlock(arrayBuffer) => // go this branch
14       numRecords = Some(arrayBuffer.size.toLong) 15  blockManager.dataSerialize(blockId, arrayBuffer.iterator) 16     case IteratorBlock(iterator) =>
17       val countIterator = new CountingIterator(iterator) 18       val serializedBlock = blockManager.dataSerialize(blockId, countIterator) 19       numRecords = countIterator.count 20  serializedBlock 21     case ByteBufferBlock(byteBuffer) =>
22  byteBuffer 23     case _ =>
24       throw new Exception(s"Could not push $blockId to block manager, unexpected block type") 25  } 26 
27   // 2. Store the block in block manager
28   val storeInBlockManagerFuture = Future { 29     val putResult =
30       blockManager.putBytes(blockId, serializedBlock, effectiveStorageLevel, tellMaster = true) 31     if (!putResult.map { _._1 }.contains(blockId)) { 32       throw new SparkException( 33         s"Could not store $blockId to block manager with storage level $storageLevel") 34  } 35  } 36 
37   // 3. Store the block in write ahead log
38   val storeInWriteAheadLogFuture = Future { 39  writeAheadLog.write(serializedBlock, clock.getTimeMillis()) 40  } 41 
42   // 4. Combine the futures, wait for both to complete, and return the write ahead log record handle
43   val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2) 44 // 等待future任務結果返回。默認時間是 30s, 使用spark.streaming.receiver.blockStoreTimeout 參數來變動默認值
45   val walRecordHandle = Await.result(combinedFuture, blockStoreTimeout) 46   // 返回cache以後的block 相關信息
47 WriteAheadLogBasedStoreResult(blockId, numRecords, walRecordHandle) 48 }

將WAL的block信息發送給driver

注意WriteAheadLogBasedStoreResult 這個 WriteAheadLogBasedStoreResult 實例,後面 RDD 在處理的時候會使用到。
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl#pushAndReportBlock 通知driver addBlock 的源碼以下:

1 // 4. 通知trackerEndpoint已經添加block,執行更新driver 的WAL操做
2   val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult) 3  trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo)) 4   logDebug(s"Reported block $blockId")

Driver將WAL block數據寫入到 driver 的WAL中

跳過中間的RPC操做,直接到 driver 端org.apache.spark.streaming.scheduler.ReceiverTracker.ReceiverTrackerEndpoint#receiveAndReply 中:

 1 case AddBlock(receivedBlockInfo) =>
 2   if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {  3     walBatchingThreadPool.execute(new Runnable {  4       override def run(): Unit = Utils.tryLogNonFatalError {  5         if (active) {  6  context.reply(addBlock(receivedBlockInfo))  7         } else {  8           throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")  9  } 10  } 11  }) 12   } else { 13  context.reply(addBlock(receivedBlockInfo)) 14   }

其中 addBlock方法源碼以下:

1 /** Add new blocks for the given stream */
2 private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { 3  receivedBlockTracker.addBlock(receivedBlockInfo) 4 }

 

其中,org.apache.spark.streaming.scheduler.ReceivedBlockTracker#addBlock 源碼以下:

 1 /** Add received block. This event will get written to the write ahead log (if enabled). */
 2 def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {  3   try {  4     val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))  5     if (writeResult) {  6       synchronized {  7         getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo  8  }  9       logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
10         s"block ${receivedBlockInfo.blockStoreResult.blockId}") 11     } else { 12       logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " +
13         s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.") 14  } 15  writeResult 16   } catch { 17     case NonFatal(e) =>
18       logError(s"Error adding block $receivedBlockInfo", e) 19       false
20  } 21 } 22 /** Write an update to the tracker to the write ahead log */
23 private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = { 24   if (isWriteAheadLogEnabled) { 25     logTrace(s"Writing record: $record") 26     try { 27  writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)), 28  clock.getTimeMillis()) 29       true
30     } catch { 31       case NonFatal(e) =>
32         logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e) 33         false
34  } 35   } else { 36     true
37  } 38 } 39 /** Get the queue of received blocks belonging to a particular stream */
40 private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = { 41   streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue) 42 }

上述代碼,主要是將BlockAdditionEvent寫WAL和更新隊列(其實就是mutable.HashMap[Int, ReceivedBlockQueue]),這個隊列中存放的是streamId ->UnallocatedBlock 的映射關係

從WAL RDD 中讀取數據

createStream 源碼以下:

 1 /**
 2  * Create an input stream that pulls messages from Kafka Brokers.  3  * @param ssc StreamingContext object  4  * @param kafkaParams Map of kafka configuration parameters,  5  * see http://kafka.apache.org/08/configuration.html
 6  * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed  7  * in its own thread.  8  * @param storageLevel Storage level to use for storing the received objects  9  * @tparam K type of Kafka message key 10  * @tparam V type of Kafka message value 11  * @tparam U type of Kafka message key decoder 12  * @tparam T type of Kafka message value decoder 13  * @return DStream of (Kafka message key, Kafka message value) 14  */
15 def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag]( 16  ssc: StreamingContext, 17  kafkaParams: Map[String, String], 18  topics: Map[String, Int], 19  storageLevel: StorageLevel 20   ): ReceiverInputDStream[(K, V)] = { 21 // 能夠經過設置spark.streaming.receiver.writeAheadLog.enable參數爲 true來開啓WAL
22   val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf) 23   new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel) 24 }

建立的是KafkaInputDStream對象:

 1 /**
 2  * Input stream that pulls messages from a Kafka Broker.  3  *  4  * @param kafkaParams Map of kafka configuration parameters.  5  * See: http://kafka.apache.org/configuration.html
 6  * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed  7  * in its own thread.  8  * @param storageLevel RDD storage level.  9  */
10 private[streaming] 11 class KafkaInputDStream[ 12  K: ClassTag, 13  V: ClassTag, 14   U <: Decoder[_]: ClassTag, 15   T <: Decoder[_]: ClassTag]( 16  ssc_ : StreamingContext, 17  kafkaParams: Map[String, String], 18  topics: Map[String, Int], 19  useReliableReceiver: Boolean, 20  storageLevel: StorageLevel 21   ) extends ReceiverInputDStream[(K, V)](ssc_) with Logging { 22 
23   def getReceiver(): Receiver[(K, V)] = { 24     if (!useReliableReceiver) { // 未啓用 WAL,會使用 KafkaReceiver 對象
25       new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) 26     } else { // 若是啓用了WAL, 使用ReliableKafkaReceiver
27       new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) 28  } 29  } 30 }

 

org.apache.spark.streaming.kafka.KafkaInputDStream 繼承父類的 compute方法:

 1 /**
 2  * Generates RDDs with blocks received by the receiver of this stream. */
 3 override def compute(validTime: Time): Option[RDD[T]] = {  4   val blockRDD = {  5 
 6     if (validTime < graph.startTime) {  7       // If this is called for any time before the start time of the context,  8       // then this returns an empty RDD. This may happen when recovering from a  9       // driver failure without any write ahead log to recover pre-failure data.
10       new BlockRDD[T](ssc.sc, Array.empty) 11     } else { 12       // Otherwise, ask the tracker for all the blocks that have been allocated to this stream 13       // for this batch
14       val receiverTracker = ssc.scheduler.receiverTracker 15       val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty) 16 
17       // Register the input blocks information into InputInfoTracker
18       val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum) 19  ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) 20 
21       // Create the BlockRDD
22  createBlockRDD(validTime, blockInfos) 23  } 24  } 25  Some(blockRDD) 26 }

getBlocksOfBatch 以下:

1 /** Get the blocks for the given batch and all input streams. */
2 def getBlocksOfBatch(batchTime: Time): Map[Int, Seq[ReceivedBlockInfo]] = { 3  receivedBlockTracker.getBlocksOfBatch(batchTime) 4 } 5 調用: 6 /** Get the blocks allocated to the given batch. */
7 def getBlocksOfBatch(batchTime: Time): Map[Int, Seq[ReceivedBlockInfo]] = synchronized { 8  timeToAllocatedBlocks.get(batchTime).map { _.streamIdToAllocatedBlocks }.getOrElse(Map.empty) 9 }

JobGenerator將WAL block 分配給一個batch,並生成job

取出WAL block 信息

在 org.apache.spark.streaming.scheduler.JobGenerator 中聲明瞭一個定時器:

1 // timer 會按照批次間隔 生成 GenerateJobs 任務,並放入eventLoop 堵塞隊列中
2 private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, 3   longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

EventLoop 實例化代碼以下:

1 eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { 2   override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) 3 
4   override protected def onError(e: Throwable): Unit = { 5     jobScheduler.reportError("Error in job generator", e) 6  } 7 } 8 eventLoop.start()

EventLoop裏定義了一個LinkedBlockingDeque雙端堵塞隊列和一個執行daemon線程,daemon線程會不停從 雙端堵塞隊列中堵塞式取數據,一旦取到數據,會調 onReceive 方法,即 processEvent 方法:

 1 /** Processes all events */
 2 private def processEvent(event: JobGeneratorEvent) {  3   logDebug("Got event " + event)  4  event match {  5     case GenerateJobs(time) => generateJobs(time)  6     case ClearMetadata(time) => clearMetadata(time)  7     case DoCheckpoint(time, clearCheckpointDataLater) =>
 8  doCheckpoint(time, clearCheckpointDataLater)  9     case ClearCheckpointData(time) => clearCheckpointData(time) 10  } 11 }

因爲是GenerateJobs 事件, 會繼續調用generateJobs 方法:

 1 /** Generate jobs and perform checkpoint for the given `time`. */
 2 private def generateJobs(time: Time) {  3   // Set the SparkEnv in this thread, so that job generation code can access the environment  4   // Example: BlockRDDs are created in this thread, and it needs to access BlockManager  5   // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
 6  SparkEnv.set(ssc.env)  7  Try {  8 // 1. 將 WAL block 信息 分配給batch(這些數據塊信息是worker 節點cache 到WAL 以後發送給driver 端的)
 9  jobScheduler.receiverTracker.allocateBlocksToBatch(time) 10 // 2. 使用分配的block數據塊來生成任務
11     graph.generateJobs(time) // generate jobs using allocated block
12  } match { 13     case Success(jobs) =>
14       val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) 15  jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) 16     case Failure(e) =>
17       jobScheduler.reportError("Error generating jobs for time " + time, e) 18  } 19 // 發佈DoCheckpoint 事件,保存checkpoint操做,主要是將新的checkpoint 數據寫入到 hdfs, 刪除舊的 checkpoint 數據
20   eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) 21 }

第一步中調用的
org.apache.spark.streaming.scheduler.ReceiverTracker#allocateBlocksToBatch方法以下:

1 /** Allocate all unallocated blocks to the given batch. */
2 def allocateBlocksToBatch(batchTime: Time): Unit = { 3   if (receiverInputStreams.nonEmpty) { 4  receivedBlockTracker.allocateBlocksToBatch(batchTime) 5  } 6 }

其中,org.apache.spark.streaming.scheduler.ReceivedBlockTracker#allocateBlocksToBatch 方法以下:

 1 def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {  2   if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {  3 // 遍歷輸入流,根據流的 streamId 獲取未被分配的block隊列,並返回[streamId, seq[receivedBlockInfo]],由此可知,到此爲止,數據其實已經從receiver中讀出來了。  4    // 獲取 streamid和 WAL的blocks 的映射關係
 5 val streamIdToBlocks = streamIds.map { streamId =>
 6         (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))  7  }.toMap  8     val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)  9     if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) { 10  timeToAllocatedBlocks.put(batchTime, allocatedBlocks) 11       lastAllocatedBatchTime = batchTime 12     } else { 13       logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery") 14  } 15   } else { 16     // This situation occurs when: 17     // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent, 18     // possibly processed batch job or half-processed batch job need to be processed again, 19     // so the batchTime will be equal to lastAllocatedBatchTime. 20     // 2. Slow checkpointing makes recovered batch time older than WAL recovered 21     // lastAllocatedBatchTime. 22     // This situation will only occurs in recovery time.
23     logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery") 24  } 25 }

其中,getReceivedBlockQueue的源碼以下:

1 /** Get the queue of received blocks belonging to a particular stream */
2 private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = { 3   streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue) 4 }

能夠看到,worker node 發送過來的block 數據被取出來了。

 

根據WAL block建立 RDD

org.apache.spark.streaming.dstream.ReceiverInputDStream#createBlockRDD 源碼以下:

 1 private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = {  2 
 3   if (blockInfos.nonEmpty) {  4     val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray  5    // 全部的block已經有了WriteAheadLogRecordHandle, 建立一個WALBackedBlockRDD便可, 不然建立BlockRDD。  6 // 其中,WriteAheadLogRecordHandle 是一個跟WAL 相關聯的EntryInfo,實現類FileBasedWriteAheadLogSegment就包含了WAL segment 的path, offset 以及 length 信息。RDD 在真正須要數據時,根據這些handle信息從 WAL 中讀取數據。  7     // Are WAL record handles present with all the blocks
 8     val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }  9 
10     if (areWALRecordHandlesPresent) { 11       // If all the blocks have WAL record handle, then create a WALBackedBlockRDD
12       val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray 13       val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray 14       new WriteAheadLogBackedBlockRDD[T]( 15  ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid) 16     } else { 17       // Else, create a BlockRDD. However, if there are some blocks with WAL info but not 18       // others then that is unexpected and log a warning accordingly.
19       if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) { 20         if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) { 21           logError("Some blocks do not have Write Ahead Log information; " +
22             "this is unexpected and data may not be recoverable after driver failures") 23         } else { 24           logWarning("Some blocks have Write Ahead Log information; this is unexpected") 25  } 26  } 27       val validBlockIds = blockIds.filter { id =>
28  ssc.sparkContext.env.blockManager.master.contains(id) 29  } 30       if (validBlockIds.size != blockIds.size) { 31         logWarning("Some blocks could not be recovered as they were not found in memory. " +
32           "To prevent such data loss, enabled Write Ahead Log (see programming guide " +
33           "for more details.") 34  } 35       new BlockRDD[T](ssc.sc, validBlockIds) 36  } 37   } else { 38     // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD 39     // according to the configuration
40     if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) { 41       new WriteAheadLogBackedBlockRDD[T]( 42  ssc.sparkContext, Array.empty, Array.empty, Array.empty) 43     } else { 44       new BlockRDD[T](ssc.sc, Array.empty) 45  } 46  } 47 }

org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD#compute 的源碼以下:

 1 /**
 2  * Gets the partition data by getting the corresponding block from the block manager.  3  * If the block does not exist, then the data is read from the corresponding record  4  * in write ahead log files.  5  */
 6 override def compute(split: Partition, context: TaskContext): Iterator[T] = {  7  assertValid()  8   val hadoopConf = broadcastedHadoopConf.value  9   val blockManager = SparkEnv.get.blockManager 10   val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition] 11   val blockId = partition.blockId 12 
13   def getBlockFromBlockManager(): Option[Iterator[T]] = { 14  blockManager.get(blockId).map(_.data.asInstanceOf[Iterator[T]]) 15  } 16 
17   def getBlockFromWriteAheadLog(): Iterator[T] = { 18     var dataRead: ByteBuffer = null
19     var writeAheadLog: WriteAheadLog = null
20     try { 21       // The WriteAheadLogUtils.createLog*** method needs a directory to create a 22       // WriteAheadLog object as the default FileBasedWriteAheadLog needs a directory for 23       // writing log data. However, the directory is not needed if data needs to be read, hence 24       // a dummy path is provided to satisfy the method parameter requirements. 25       // FileBasedWriteAheadLog will not create any file or directory at that path. 26       // FileBasedWriteAheadLog will not create any file or directory at that path. Also, 27       // this dummy directory should not already exist otherwise the WAL will try to recover 28       // past events from the directory and throw errors.
29       val nonExistentDirectory = new File( 30         System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString).getAbsolutePath 31       writeAheadLog = WriteAheadLogUtils.createLogForReceiver( 32  SparkEnv.get.conf, nonExistentDirectory, hadoopConf) 33       dataRead = writeAheadLog.read(partition.walRecordHandle) 34     } catch { 35       case NonFatal(e) =>
36         throw new SparkException( 37           s"Could not read data from write ahead log record ${partition.walRecordHandle}", e) 38     } finally { 39       if (writeAheadLog != null) { 40  writeAheadLog.close() 41         writeAheadLog = null
42  } 43  } 44     if (dataRead == null) { 45       throw new SparkException( 46         s"Could not read data from write ahead log record ${partition.walRecordHandle}, " +
47           s"read returned null") 48  } 49     logInfo(s"Read partition data of $this from write ahead log, record handle " +
50  partition.walRecordHandle) 51     if (storeInBlockManager) { 52  blockManager.putBytes(blockId, dataRead, storageLevel) 53       logDebug(s"Stored partition data of $this into block manager with level $storageLevel") 54  dataRead.rewind() 55  } 56  blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]] 57  } 58  // 若是partition.isBlockIdValid 爲true,則說明該 block 數據存在executors 中
59   if (partition.isBlockIdValid) { 60 // 先根據 BlockManager從 executor中讀取數據, 若是沒有,再從WAL 中讀取數據 61 // BlockManager 從內存仍是從磁盤上獲取的數據 ?
62 blockManager 從 local 或 remote 獲取 block,其中 local既能夠從 memory 中獲取也能夠從 磁盤中讀取, 其中remote獲取數據是同步的,即在fetch block 過程當中會一直blocking。 63  getBlockFromBlockManager().getOrElse { getBlockFromWriteAheadLog() } 64   } else { 65  getBlockFromWriteAheadLog() 66  } 67 }

 

至此,從啓動 receiver,到receiver 接收數據並保存到WAL block,driver 接收WAL 的block 信息,直到spark streaming 經過WAL RDD 來獲取數據等等都一一作了說明。

 

原文出處:https://www.cnblogs.com/johnny666888/p/11100334.html

相關文章
相關標籤/搜索