使用分佈式receiver來獲取數據
使用 WAL 來實現 At least once 操做:
conf.set("spark.streaming.receiver.writeAheadLog.enable","true") // 開啓 WAL
// 一、At most once - 每條數據最多被處理一次(0次或1次),這種語義下會出現數據丟失的問題;
// 二、At least once - 每條數據最少被處理一次 (1次或更多),這個不會出現數據丟失,可是會出現數據重複;
// 三、Exactly once - 每條數據只會被處理一次,沒有數據會丟失,而且沒有數據會被屢次處理,這種語義是你們最想要的,可是也是最難實現的。html
若是不作容錯,將會帶來數據丟失,由於Receiver一直在接收數據,在其沒有處理的時候(已通知zk數據接收到),Executor忽然掛掉(或是driver掛掉通知executor關閉),緩存在內存中的數據就會丟失。由於這個問題,Spark1.2開始加入了WAL(Write ahead log)開啓 WAL,將receiver獲取數據的存儲級別修改成StorageLevel. MEMORY_AND_DISK_SER_2java
1 // 缺點,不能本身維護消費 topic partition 的 offset 2 // 優勢,開啓 WAL,來確保 exactly-once 語義 3 val stream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder]( 4 ssc,kafkaParams,map,StorageLevel.MEMORY_AND_DISK_SER_2)
org.apache.spark.streaming.StreamingContext#start中啓動了 JobScheduler實例node
1 // private[streaming] val scheduler = new JobScheduler(this) 2 3 // Start the streaming scheduler in a new thread, so that thread local properties 4 // like call sites and job groups can be reset without affecting those of the 5 // current thread. 6 ThreadUtils.runInNewThread("streaming-start") { // 單獨的一個daemon線程運行函數題 7 sparkContext.setCallSite(startSite.get) 8 sparkContext.clearJobGroup() 9 sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false") 10 // 執行start 方法 11 scheduler.start() 12 } 13 state = StreamingContextState.ACTIVE
org.apache.spark.streaming.scheduler.JobScheduler#start 源碼以下:apache
1 def start(): Unit = synchronized { 2 if (eventLoop != null) return // scheduler has already been started 3 4 logDebug("Starting JobScheduler") 5 eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { 6 override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) 7 8 override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) 9 } 10 eventLoop.start() 11 12 // attach rate controllers of input streams to receive batch completion updates 13 for { 14 inputDStream <- ssc.graph.getInputStreams 15 rateController <- inputDStream.rateController 16 } ssc.addStreamingListener(rateController) 17 18 listenerBus.start(ssc.sparkContext) 19 receiverTracker = new ReceiverTracker(ssc) 20 inputInfoTracker = new InputInfoTracker(ssc) 21 receiverTracker.start() 22 jobGenerator.start() 23 logInfo("Started JobScheduler") 24 }
ReceiverTracker 的類聲明以下:api
1 This class manages the execution of the receivers of ReceiverInputDStreams. Instance of this class must be created after all input streams have been added and StreamingContext.start() has been called because it needs the final set of input streams at the time of instantiation. 2 此類負責執行ReceiverInputDStreams的receiver。必須在添加全部輸入流並調用StreamingContext.start()以後建立此類的實例,由於它在實例化時須要最終的輸入流集。
其 start 方法以下:數組
1 /** Start the endpoint and receiver execution thread. */ 2 def start(): Unit = synchronized { 3 if (isTrackerStarted) { 4 throw new SparkException("ReceiverTracker already started") 5 } 6 7 if (!receiverInputStreams.isEmpty) { 8 // 創建rpc endpoint 9 endpoint = ssc.env.rpcEnv.setupEndpoint( // 注意,這是一個driver的 endpoint 10 "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) 11 // driver節點上發送啓動 receiver 命令 12 if (!skipReceiverLaunch) launchReceivers() 13 logInfo("ReceiverTracker started") 14 trackerState = Started 15 } 16 } 17 18 /** 19 * Get the receivers from the ReceiverInputDStreams, distributes them to the 20 * worker nodes as a parallel collection, and runs them. 21 */ 22 // 從ReceiverInputDStreams 獲取到 receivers,而後將它們分配到不一樣的 worker 節點並運行它們。 23 private def launchReceivers(): Unit = { 24 val receivers = receiverInputStreams.map(nis => { 25 // 未啓用WAL 是KafkaReceiver,啓動WAL後是ReliableKafkaReceiver 26 val rcvr = nis.getReceiver() 27 rcvr.setReceiverId(nis.id) 28 rcvr 29 }) 30 // 運行一個簡單的應用來確保全部的salve node都已經啓動起來,避免全部的 receiver 任務都在同一個local node上 31 runDummySparkJob() 32 33 logInfo("Starting " + receivers.length + " receivers") 34 endpoint.send(StartAllReceivers(receivers)) // 發送請求driver 轉發 啓動 receiver 的命令 35 }
Driver 端StartAllReceivers 的處理代碼以下:緩存
1 override def receive: PartialFunction[Any, Unit] = { 2 // Local messages 3 case StartAllReceivers(receivers) => 4 // schduleReceiver 5 val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors) 6 for (receiver <- receivers) { 7 val executors = scheduledLocations(receiver.streamId) 8 updateReceiverScheduledExecutors(receiver.streamId, executors) 9 receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation 10 startReceiver(receiver, executors) 11 } 12 …… 13 }
getExecutors源碼以下:session
1 /** 2 * Get the list of executors excluding driver 3 */ 4 // 若是是 local 模式,返回 本地線程; 若是是 yarn 模式,返回 非driver 節點上的 excutors 5 private def getExecutors: Seq[ExecutorCacheTaskLocation] = { 6 if (ssc.sc.isLocal) { // 若是在 local 模式下運行 7 val blockManagerId = ssc.sparkContext.env.blockManager.blockManagerId 8 Seq(ExecutorCacheTaskLocation(blockManagerId.host, blockManagerId.executorId)) 9 } else { // 在 yarn 模式下,過濾掉 driver 的 executor 10 ssc.sparkContext.env.blockManager.master.getMemoryStatus.filter { case (blockManagerId, _) => 11 blockManagerId.executorId != SparkContext.DRIVER_IDENTIFIER // Ignore the driver location 12 }.map { case (blockManagerId, _) => 13 ExecutorCacheTaskLocation(blockManagerId.host, blockManagerId.executorId) 14 }.toSeq 15 } 16 }
org.apache.spark.streaming.scheduler.ReceiverSchedulingPolicy#scheduleReceivers的解釋以下:app
1 Try our best to schedule receivers with evenly distributed. However, if the preferredLocations of receivers are not even, we may not be able to schedule them evenly because we have to respect them. Here is the approach to schedule executors: 2 First, schedule all the receivers with preferred locations (hosts), evenly among the executors running on those host. 3 Then, schedule all other receivers evenly among all the executors such that overall distribution over all the receivers is even. 4 This method is called when we start to launch receivers at the first time. 5 該方法就是確保receiver 可以在worker node 上均勻分佈的。遵循如下兩個原則: 6 1.使用 preferred location 分配 receiver 到這些node 上 7 2.將其餘的未分配的receiver均勻分佈均勻分佈到 每個 worker node 上
org.apache.spark.streaming.scheduler.ReceiverTracker#updateReceiverScheduledExecutors 負責更新receiverid 和 receiver info 的映射關係,源碼以下:負載均衡
1 private def updateReceiverScheduledExecutors( 2 receiverId: Int, scheduledLocations: Seq[TaskLocation]): Unit = { 3 val newReceiverTrackingInfo = receiverTrackingInfos.get(receiverId) match { 4 case Some(oldInfo) => 5 oldInfo.copy(state = ReceiverState.SCHEDULED, 6 scheduledLocations = Some(scheduledLocations)) 7 case None => 8 ReceiverTrackingInfo( 9 receiverId, 10 ReceiverState.SCHEDULED, 11 Some(scheduledLocations), 12 runningExecutor = None) 13 } 14 receiverTrackingInfos.put(receiverId, newReceiverTrackingInfo) 15 }
startReceiver 負責啓動 receiver,源碼以下:
1 /** 2 * Start a receiver along with its scheduled executors 3 */ 4 private def startReceiver( 5 receiver: Receiver[_], 6 scheduledLocations: Seq[TaskLocation]): Unit = { 7 def shouldStartReceiver: Boolean = { 8 // It's okay to start when trackerState is Initialized or Started 9 !(isTrackerStopping || isTrackerStopped) 10 } 11 12 val receiverId = receiver.streamId 13 if (!shouldStartReceiver) { 14 onReceiverJobFinish(receiverId) 15 return 16 } 17 18 val checkpointDirOption = Option(ssc.checkpointDir) 19 val serializableHadoopConf = 20 new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration) 21 22 // 在 worker node 上啓動 receiver 的方法 23 val startReceiverFunc: Iterator[Receiver[_]] => Unit = 24 (iterator: Iterator[Receiver[_]]) => { 25 if (!iterator.hasNext) { 26 throw new SparkException( 27 "Could not start receiver as object not found.") 28 } 29 if (TaskContext.get().attemptNumber() == 0) { 30 val receiver = iterator.next() 31 assert(iterator.hasNext == false) 32 val supervisor = new ReceiverSupervisorImpl( 33 receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption) 34 supervisor.start() 35 supervisor.awaitTermination() 36 } else { 37 // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it. 38 } 39 } 40 41 // Create the RDD using the scheduledLocations to run the receiver in a Spark job 42 val receiverRDD: RDD[Receiver[_]] = 43 if (scheduledLocations.isEmpty) { 44 ssc.sc.makeRDD(Seq(receiver), 1) 45 } else { 46 val preferredLocations = scheduledLocations.map(_.toString).distinct 47 ssc.sc.makeRDD(Seq(receiver -> preferredLocations)) 48 } 49 receiverRDD.setName(s"Receiver $receiverId") 50 ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId") 51 ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite())) 52 // 提交分佈式receiver 啓動任務 53 val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit]( 54 receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ()) 55 // We will keep restarting the receiver job until ReceiverTracker is stopped 56 future.onComplete { 57 case Success(_) => 58 if (!shouldStartReceiver) { 59 onReceiverJobFinish(receiverId) 60 } else { 61 logInfo(s"Restarting Receiver $receiverId") 62 self.send(RestartReceiver(receiver)) 63 } 64 case Failure(e) => 65 if (!shouldStartReceiver) { 66 onReceiverJobFinish(receiverId) 67 } else { 68 logError("Receiver has been stopped. Try to restart it.", e) 69 logInfo(s"Restarting Receiver $receiverId") 70 self.send(RestartReceiver(receiver)) 71 } 72 }(submitJobThreadPool) 73 logInfo(s"Receiver ${receiver.streamId} started") 74 }
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl#ReceiverSupervisorImpl 的 start 方法以下:
1 /** Start the supervisor */ 2 def start() { 3 onStart() 4 startReceiver() 5 } 6 override protected def onStart() { // 啓動 BlockGenerator 服務 7 registeredBlockGenerators.foreach { _.start() } 8 } 9 // startReceiver 方法以下: 10 /** Start receiver */ 11 def startReceiver(): Unit = synchronized { 12 try { 13 if (onReceiverStart()) { // 註冊receiver 成功 14 logInfo("Starting receiver") 15 receiverState = Started 16 receiver.onStart() // 啓動 receiver 17 logInfo("Called receiver onStart") 18 } else { 19 // The driver refused us 20 stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None) 21 } 22 } catch { 23 case NonFatal(t) => 24 stop("Error starting receiver " + streamId, Some(t)) 25 } 26 }
1 override protected def onReceiverStart(): Boolean = { 2 val msg = RegisterReceiver( 3 streamId, receiver.getClass.getSimpleName, host, executorId, endpoint) 4 trackerEndpoint.askWithRetry[Boolean](msg) 5 }
簡單描述一下driver 端作的事情,主要負責將其歸入到org.apache.spark.streaming.scheduler.ReceiverTracker 的管理中來,具體streamid 和 ReceiverTrackingInfo 的映射關係保存在receiverTrackingInfos中。
org.apache.spark.streaming.scheduler.ReceiverTracker#registerReceiver關鍵代碼以下:
1 val name = s"${typ}-${streamId}" 2 val receiverTrackingInfo = ReceiverTrackingInfo( 3 streamId, 4 ReceiverState.ACTIVE, 5 scheduledLocations = None, 6 runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)), 7 name = Some(name), 8 endpoint = Some(receiverEndpoint)) 9 receiverTrackingInfos.put(streamId, receiverTrackingInfo) 10 listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo))
因爲咱們啓用了 WAL, 因此 這裏的receiver 是ReliableKafkaReceiver 的實例
receiver.onStart 即 org.apache.spark.streaming.kafka.ReliableKafkaReceiver#onStart, 源碼以下:
1 override def onStart(): Unit = { 2 logInfo(s"Starting Kafka Consumer Stream with group: $groupId") 3 4 // Initialize the topic-partition / offset hash map. 5 // 1. 負責維護消費的 topic-partition 和 offset 的映射關係 6 topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long] 7 8 // Initialize the stream block id / offset snapshot hash map. 9 // 2. 負責維護 block-id 和 partition-offset 之間的映射關係 10 blockOffsetMap = new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]() 11 12 // Initialize the block generator for storing Kafka message. 13 // 3. 負責保存 kafka message 的 block generator,入參是GeneratedBlockHandler 實例,這是一個負責監聽 block generator事件的一個監聽器 14 // Generates batches of objects received by a org.apache.spark.streaming.receiver.Receiver and puts them into appropriately named blocks at regular intervals. This class starts two threads, one to periodically start a new batch and prepare the previous batch of as a block, the other to push the blocks into the block manager. 15 blockGenerator = supervisor.createBlockGenerator(new GeneratedBlockHandler) 16 // 4. 關閉consumer 自動提交 offset 選項 17 // auto_offset_commit 應該是 false 18 if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") { 19 logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " + 20 "otherwise we will manually set it to false to turn off auto offset commit in Kafka") 21 } 22 23 val props = new Properties() 24 kafkaParams.foreach(param => props.put(param._1, param._2)) 25 // Manually set "auto.commit.enable" to "false" no matter user explicitly set it to true, 26 // we have to make sure this property is set to false to turn off auto commit mechanism in Kafka. 27 props.setProperty(AUTO_OFFSET_COMMIT, "false") 28 29 val consumerConfig = new ConsumerConfig(props) 30 31 assert(!consumerConfig.autoCommitEnable) 32 33 logInfo(s"Connecting to Zookeeper: ${consumerConfig.zkConnect}") 34 // 5. 初始化 consumer 對象 35 // consumerConnector 是ZookeeperConsumerConnector的實例 36 consumerConnector = Consumer.create(consumerConfig) 37 logInfo(s"Connected to Zookeeper: ${consumerConfig.zkConnect}") 38 // 6. 初始化zookeeper 的客戶端 39 zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs, 40 consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer) 41 // 7. 建立線程池來處理消息流,池的大小是固定的,爲partition 的總數,並指定線程池中每個線程的name 的前綴,內部使用ThreadPoolExecutor,而且 建立線程的 factory類是guava 工具包提供的。 42 messageHandlerThreadPool = ThreadUtils.newDaemonFixedThreadPool( 43 topics.values.sum, "KafkaMessageHandler") 44 // 8. 啓動 BlockGenerator內的兩個線程 45 blockGenerator.start() 46 47 // 9. 建立MessageStream對象 48 val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) 49 .newInstance(consumerConfig.props) 50 .asInstanceOf[Decoder[K]] 51 52 val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) 53 .newInstance(consumerConfig.props) 54 .asInstanceOf[Decoder[V]] 55 56 val topicMessageStreams = consumerConnector.createMessageStreams( 57 topics, keyDecoder, valueDecoder) 58 // 10. 將待處理的MessageHandler 放入 線程池中,等待執行 59 topicMessageStreams.values.foreach { streams => 60 streams.foreach { stream => 61 messageHandlerThreadPool.submit(new MessageHandler(stream)) 62 } 63 } 64 }
其中, 第9 步,建立MessageStream對象,
kafka.consumer.ZookeeperConsumerConnector#createMessageStreams 方法以下:
1 def createMessageStreams[K,V](topicCountMap: Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V]) 2 : Map[String, List[KafkaStream[K,V]]] = { 3 if (messageStreamCreated.getAndSet(true)) 4 throw new MessageStreamsExistException(this.getClass.getSimpleName + 5 " can create message streams at most once",null) 6 consume(topicCountMap, keyDecoder, valueDecoder) 7 }
其調用了 consume 方法,源碼以下:
def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V]) : Map[String,List[KafkaStream[K,V]]] = { debug("entering consume ") if (topicCountMap == null) throw new RuntimeException("topicCountMap is null") // 1. 初始化 topicCount val topicCount = TopicCount.constructTopicCount(consumerIdString, topicCountMap) // 2. 獲取 每個topic 和 threadId 集合的映射關係 val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic // make a list of (queue,stream) pairs, one pair for each threadId // 3. 獲得每個 threadId 對應 (queue, stream) 的映射列表 val queuesAndStreams = topicThreadIds.values.map(threadIdSet => threadIdSet.map(_ => { val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages) val stream = new KafkaStream[K,V]( queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId) (queue, stream) }) ).flatten.toList // 4. 獲取 groupId 在 zookeeper 中的path val dirs = new ZKGroupDirs(config.groupId) // 5. 註冊 consumer 到 groupId(在zk中) registerConsumerInZK(dirs, consumerIdString, topicCount) // 6. 從新初始化 consumer reinitializeConsumer(topicCount, queuesAndStreams) // 7. 返回流 loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K,V]]]] }
在 kafka.consumer.ZookeeperConsumerConnector#consume方法中,有以下操做:
1 // 獲得每個 threadId 對應 (queue, stream) 的映射列表 2 val queuesAndStreams = topicThreadIds.values.map(threadIdSet => 3 threadIdSet.map(_ => { 4 val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages) 5 val stream = new KafkaStream[K,V]( 6 queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId) 7 (queue, stream) 8 }) 9 ).flatten.toList 10 // 獲取 groupId 在 zookeeper 中的path 11 val dirs = new ZKGroupDirs(config.groupId) 12 // 註冊 consumer 到 groupId(在zk中) 13 registerConsumerInZK(dirs, consumerIdString, topicCount) 14 // 從新初始化 consumer 15 reinitializeConsumer(topicCount, queuesAndStreams)
在上面的代碼中,能夠看到初始化的queue(LinkedBlockingQueue實例)除了被傳入stream(KafkaStream)的構造函數被迭代器從中取數據,還和 stream 重組成Tuple2[LinkedBlockingQueue[FetchedDataChunk]的list,以後被傳入reinitializeConsumer 方法中。
kafka.consumer.ZookeeperConsumerConnector#reinitializeConsume 其源碼以下:
1 private def reinitializeConsumer[K,V]( 2 topicCount: TopicCount, 3 queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[K,V])]) { 4 // 1. 獲取 該groupid 在 zk 中的路徑 5 val dirs = new ZKGroupDirs(config.groupId) 6 7 // listener to consumer and partition changes 8 // 2. 初始化loadBalancerListener,這個負載均衡listener 會時刻監控 consumer 和 partition 的變化 9 if (loadBalancerListener == null) { 10 val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[K,V]]] 11 loadBalancerListener = new ZKRebalancerListener( 12 config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]]) 13 } 14 15 // create listener for session expired event if not exist yet 16 // 3. 監控 session 過時的listner, 有新session註冊初始化,會通知 loadBalancer 17 if (sessionExpirationListener == null) 18 sessionExpirationListener = new ZKSessionExpireListener( 19 dirs, consumerIdString, topicCount, loadBalancerListener) 20 21 // create listener for topic partition change event if not exist yet 22 // 4. 初始化ZKTopicPartitionChangeListener實例,當topic partition 變化時,這個listener會通知 loadBalancer 23 if (topicPartitionChangeListener == null) 24 topicPartitionChangeListener = new ZKTopicPartitionChangeListener(loadBalancerListener) 25 // 5. 將queuesAndStreams 的值通過一系列轉換,並添加到loadBalancerListener.kafkaMessageAndMetadataStreams 中 26 val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams 27 28 // map of {topic -> Set(thread-1, thread-2, ...)} 29 val consumerThreadIdsPerTopic: Map[String, Set[ConsumerThreadId]] = 30 topicCount.getConsumerThreadIdsPerTopic 31 32 val allQueuesAndStreams = topicCount match { 33 case wildTopicCount: WildcardTopicCount => // 這裏是WildcardTopicCount,走這個分支 34 /* 35 * Wild-card consumption streams share the same queues, so we need to 36 * duplicate the list for the subsequent zip operation. 37 */ 38 (1 to consumerThreadIdsPerTopic.keySet.size).flatMap(_ => queuesAndStreams).toList 39 case statTopicCount: StaticTopicCount => 40 queuesAndStreams 41 } 42 43 val topicThreadIds = consumerThreadIdsPerTopic.map { 44 case(topic, threadIds) => 45 threadIds.map((topic, _)) 46 }.flatten 47 48 require(topicThreadIds.size == allQueuesAndStreams.size, 49 "Mismatch between thread ID count (%d) and queue count (%d)" 50 .format(topicThreadIds.size, allQueuesAndStreams.size)) 51 val threadQueueStreamPairs = topicThreadIds.zip(allQueuesAndStreams) 52 53 threadQueueStreamPairs.foreach(e => { 54 val topicThreadId = e._1 55 val q = e._2._1 56 topicThreadIdAndQueues.put(topicThreadId, q) 57 debug("Adding topicThreadId %s and queue %s to topicThreadIdAndQueues data structure".format(topicThreadId, q.toString)) 58 newGauge( 59 "FetchQueueSize", 60 new Gauge[Int] { 61 def value = q.size 62 }, 63 Map("clientId" -> config.clientId, 64 "topic" -> topicThreadId._1, 65 "threadId" -> topicThreadId._2.threadId.toString) 66 ) 67 }) 68 69 val groupedByTopic = threadQueueStreamPairs.groupBy(_._1._1) 70 groupedByTopic.foreach(e => { 71 val topic = e._1 72 val streams = e._2.map(_._2._2).toList 73 topicStreamsMap += (topic -> streams) 74 debug("adding topic %s and %d streams to map.".format(topic, streams.size)) 75 }) 76 77 // listener to consumer and partition changes 78 // 6. 使用 zkClient 註冊sessionExpirationListener 實例 79 zkClient.subscribeStateChanges(sessionExpirationListener) 80 // 7. 使用 zkClient 註冊loadBalancerListener 實例 81 zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener) 82 // 遍歷每個topic,使用zkClient 註冊topicPartitionChangeListener 實例 83 topicStreamsMap.foreach { topicAndStreams => 84 // register on broker partition path changes 85 val topicPath = BrokerTopicsPath + "/" + topicAndStreams._1 86 zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListener) 87 } 88 89 // explicitly trigger load balancing for this consumer 90 // 8. 使用 loadBalancerListener 同步作負載均衡 91 loadBalancerListener.syncedRebalance() 92 }
重點看 第 8 步,使用 loadBalancerListener 同步作負載均衡。
kafka.consumer.ZookeeperConsumerConnector.ZKRebalancerListener#syncedRebalance 源碼以下:
1 def syncedRebalance() { 2 rebalanceLock synchronized { 3 rebalanceTimer.time { 4 if(isShuttingDown.get()) { // 若是ZookeeperConsumerConnector 5 已經shutdown了,直接返回 6 return 7 } else { 8 for (i <- 0 until config.rebalanceMaxRetries) { // 默認是 4 次 9 info("begin rebalancing consumer " + consumerIdString + " try #" + i) 10 var done = false 11 var cluster: Cluster = null 12 try { 13 // 1. 根據zkClient 實例 獲取並建立Cluster 對象,這個 cluster 實例包含了一個 Broker(broker的id,broker在zk中的路徑) 列表 14 cluster = getCluster(zkClient) 15 // 2. 在cluster中作 rebalance操做 16 done = rebalance(cluster) 17 } catch { 18 case e: Throwable => 19 /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating. 20 * For example, a ZK node can disappear between the time we get all children and the time we try to get 21 * the value of a child. Just let this go since another rebalance will be triggered. 22 **/ 23 info("exception during rebalance ", e) 24 } 25 info("end rebalancing consumer " + consumerIdString + " try #" + i) 26 if (done) { 27 return 28 } else { 29 /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should 30 * clear the cache */ 31 info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered") 32 } 33 // stop all fetchers and clear all the queues to avoid data duplication 34 closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2)) 35 Thread.sleep(config.rebalanceBackoffMs) 36 } 37 } 38 } 39 } 40 41 throw new ConsumerRebalanceFailedException(consumerIdString + " can't rebalance after " + config.rebalanceMaxRetries +" retries") 42 }
重點看 第2 步,在 cluster 中作 rebalance 操做,kafka.consumer.ZookeeperConsumerConnector.ZKRebalancerListener#rebalance 源碼以下:
1 private def rebalance(cluster: Cluster): Boolean = { 2 // 1. 獲取 group和 threadId 的Map 映射關係 3 val myTopicThreadIdsMap = TopicCount.constructTopicCount( 4 group, consumerIdString, zkClient, config.excludeInternalTopics).getConsumerThreadIdsPerTopic 5 // 2. 獲取kafka cluster 中全部可用的node 6 val brokers = getAllBrokersInCluster(zkClient) 7 if (brokers.size == 0) { // 若是可用節點爲空,設置listener訂閱,返回 true 8 // This can happen in a rare case when there are no brokers available in the cluster when the consumer is started. 9 // We log an warning and register for child changes on brokers/id so that rebalance can be triggered when the brokers 10 // are up. 11 warn("no brokers found when trying to rebalance.") 12 zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, loadBalancerListener) 13 true 14 } 15 else { 16 /** 17 * fetchers must be stopped to avoid data duplication, since if the current 18 * rebalancing attempt fails, the partitions that are released could be owned by another consumer. 19 * But if we don't stop the fetchers first, this consumer would continue returning data for released 20 * partitions in parallel. So, not stopping the fetchers leads to duplicate data. 21 */ 22 // 3. 作rebalance 以前的準備工做 23 // 3.1. 關閉現有 fetcher 鏈接 24 closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap) 25 // 3.2 釋放 partition 的全部權(主要是刪除zk下的owner 節點的數據以及解除內存中的topic和 fetcher的關聯關係) 26 releasePartitionOwnership(topicRegistry) 27 // 3.3. 從新給partition分配 fetcher 28 val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient) 29 val partitionOwnershipDecision = partitionAssignor.assign(assignmentContext) 30 val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]( 31 valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo])) 32 33 // fetch current offsets for all topic-partitions 34 // 3.4 獲取當前fetcher對應的 partitions 的 offsets,這裏的offset是指 consumer 下一個要消費的offset 35 val topicPartitions = partitionOwnershipDecision.keySet.toSeq 36 37 val offsetFetchResponseOpt = fetchOffsets(topicPartitions) 38 39 if (isShuttingDown.get || !offsetFetchResponseOpt.isDefined) 40 false 41 else { 42 // 3.5 更新 partition 和 fetcher 的對應關係 43 val offsetFetchResponse = offsetFetchResponseOpt.get 44 topicPartitions.foreach(topicAndPartition => { 45 val (topic, partition) = topicAndPartition.asTuple 46 // requestInfo是OffsetFetchResponse實例中的成員變量,它是一個Map[TopicAndPartition, OffsetMetadataAndError]實例 47 val offset = offsetFetchResponse.requestInfo(topicAndPartition).offset 48 val threadId = partitionOwnershipDecision(topicAndPartition) 49 addPartitionTopicInfo(currentTopicRegistry, partition, topic, offset, threadId) 50 }) 51 52 /** 53 * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt 54 * A rebalancing attempt is completed successfully only after the fetchers have been started correctly 55 */ 56 if(reflectPartitionOwnershipDecision(partitionOwnershipDecision)) { 57 allTopicsOwnedPartitionsCount = partitionOwnershipDecision.size 58 59 partitionOwnershipDecision.view.groupBy { case(topicPartition, consumerThreadId) => topicPartition.topic } 60 .foreach { case (topic, partitionThreadPairs) => 61 newGauge("OwnedPartitionsCount", 62 new Gauge[Int] { 63 def value() = partitionThreadPairs.size 64 }, 65 ownedPartitionsCountMetricTags(topic)) 66 } 67 // 3.6 將已經新的 topic registry 覆蓋舊的 68 topicRegistry = currentTopicRegistry 69 // 4. 更新 fetcher 70 updateFetcher(cluster) 71 true 72 } else { 73 false 74 } 75 } 76 } 77 }
其中addPartitionTopicInfo 源碼以下:
1 private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]], 2 partition: Int, topic: String, 3 offset: Long, consumerThreadId: ConsumerThreadId) { 4 //若是map沒有對應的 key,會使用valueFactory初始化鍵值對,並返回 對應的 value 5 val partTopicInfoMap = currentTopicRegistry.getAndMaybePut(topic) 6 7 val queue = topicThreadIdAndQueues.get((topic, consumerThreadId)) 8 val consumedOffset = new AtomicLong(offset) 9 val fetchedOffset = new AtomicLong(offset) 10 val partTopicInfo = new PartitionTopicInfo(topic, 11 partition, 12 queue, 13 consumedOffset, 14 fetchedOffset, 15 new AtomicInteger(config.fetchMessageMaxBytes), 16 config.clientId) 17 // 1. 將其註冊到新的 Topic註冊中心中,即註冊 partition 和 fetcher 的關係 18 partTopicInfoMap.put(partition, partTopicInfo) 19 debug(partTopicInfo + " selected new offset " + offset) 20 // 2. 更新consumer 的 已經消費的offset信息 21 checkpointedZkOffsets.put(TopicAndPartition(topic, partition), offset) 22 } 23 }
第4步, 更新 fetcher 源碼以下:
1 private def updateFetcher(cluster: Cluster) { 2 // update partitions for fetcher 3 var allPartitionInfos : List[PartitionTopicInfo] = Nil 4 for (partitionInfos <- topicRegistry.values) 5 for (partition <- partitionInfos.values) 6 allPartitionInfos ::= partition 7 info("Consumer " + consumerIdString + " selected partitions : " + 8 allPartitionInfos.sortWith((s,t) => s.partitionId < t.partitionId).map(_.toString).mkString(",")) 9 10 fetcher match { 11 case Some(f) => 12 f.startConnections(allPartitionInfos, cluster) 13 case None => 14 } 15 }
其中,f.startConnections方法真正執行 更新操做。此時引入一個新的類。即 fetcher 類,kafka.consumer.ConsumerFetcherManager。
kafka.consumer.ConsumerFetcherManager#startConnections 的源碼以下:
1 def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster) { 2 // LeaderFinderThread 在 topic 的leader node可用時,將 fetcher 添加到 leader 節點上 3 leaderFinderThread = new LeaderFinderThread(consumerIdString + "-leader-finder-thread") 4 leaderFinderThread.start() 5 6 inLock(lock) { 7 // 更新ConsumerFetcherManager 成員變量 8 partitionMap = topicInfos.map(tpi => (TopicAndPartition(tpi.topic, tpi.partitionId), tpi)).toMap 9 this.cluster = cluster 10 noLeaderPartitionSet ++= topicInfos.map(tpi => TopicAndPartition(tpi.topic, tpi.partitionId)) 11 cond.signalAll() 12 } 13 }
ConsumerFetcherManager 有一個LeaderFinderThread 實例,該類的父類kafka.utils.ShutdownableThread ,run 方法以下:
1 override def run(): Unit = { 2 info("Starting ") 3 try{ 4 while(isRunning.get()){ 5 doWork() 6 } 7 } catch{ 8 case e: Throwable => 9 if(isRunning.get()) 10 error("Error due to ", e) 11 } 12 shutdownLatch.countDown() 13 info("Stopped ") 14 }
doWork其實就是一個抽象方法,其子類LeaderFinderThread的實現以下:
1 // thread responsible for adding the fetcher to the right broker when leader is available 2 override def doWork() { 3 // 1. 獲取 partition 和leader node的映射關係 4 val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker] 5 lock.lock() 6 try { 7 while (noLeaderPartitionSet.isEmpty) { // 這個字段在startConnections 已更新新值 8 trace("No partition for leader election.") 9 cond.await() 10 } 11 12 trace("Partitions without leader %s".format(noLeaderPartitionSet)) 13 val brokers = getAllBrokersInCluster(zkClient) // 獲取全部可用broker 節點 14 // 獲取kafka.api.TopicMetadata 序列,kafka.api.TopicMetadata 保存了 topic 和 partitionId,isr,leader,replicas 的信息 15 val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, 16 brokers, 17 config.clientId, 18 config.socketTimeoutMs, 19 correlationId.getAndIncrement).topicsMetadata 20 if(logger.isDebugEnabled) topicsMetadata.foreach(topicMetadata => debug(topicMetadata.toString())) 21 // 2. 根據獲取到的 partition 和 leader node 的關係更新noLeaderPartitionSet 和leaderForPartitionsMap 兩個map集合,其中noLeaderPartitionSet 包含的是沒有肯定leader 的 partition 集合,leaderForPartitionsMap 是 已經肯定了 leader 的 partition 集合。 22 topicsMetadata.foreach { tmd => 23 val topic = tmd.topic 24 tmd.partitionsMetadata.foreach { pmd => 25 val topicAndPartition = TopicAndPartition(topic, pmd.partitionId) 26 if(pmd.leader.isDefined && noLeaderPartitionSet.contains(topicAndPartition)) { 27 val leaderBroker = pmd.leader.get 28 leaderForPartitionsMap.put(topicAndPartition, leaderBroker) 29 noLeaderPartitionSet -= topicAndPartition 30 } 31 } 32 } 33 } catch { 34 case t: Throwable => { 35 if (!isRunning.get()) 36 throw t /* If this thread is stopped, propagate this exception to kill the thread. */ 37 else 38 warn("Failed to find leader for %s".format(noLeaderPartitionSet), t) 39 } 40 } finally { 41 lock.unlock() 42 } 43 44 try { 45 // 3. 具體爲 partition 分配 fetcher 46 addFetcherForPartitions(leaderForPartitionsMap.map{ 47 case (topicAndPartition, broker) => 48 topicAndPartition -> BrokerAndInitialOffset(broker, partitionMap(topicAndPartition).getFetchOffset())} 49 ) 50 } catch { 51 case t: Throwable => { 52 if (!isRunning.get()) 53 throw t /* If this thread is stopped, propagate this exception to kill the thread. */ 54 else { 55 warn("Failed to add leader for partitions %s; will retry".format(leaderForPartitionsMap.keySet.mkString(",")), t) 56 lock.lock() 57 noLeaderPartitionSet ++= leaderForPartitionsMap.keySet 58 lock.unlock() 59 } 60 } 61 } 62 // 4. 關閉空閒fetcher線程 63 shutdownIdleFetcherThreads() 64 Thread.sleep(config.refreshLeaderBackoffMs) 65 }
重點看第3 步,具體爲 partition 分配 fetcher,addFetcherForPartitions 源碼以下:
1 def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset]) { 2 mapLock synchronized { 3 // 獲取 fetcher 和 partition的映射關係 4 val partitionsPerFetcher = partitionAndOffsets.groupBy{ case(topicAndPartition, brokerAndInitialOffset) => 5 BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic, topicAndPartition.partition))} 6 for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) { 7 8 var fetcherThread: AbstractFetcherThread = null 9 fetcherThreadMap.get(brokerAndFetcherId) match { 10 case Some(f) => fetcherThread = f 11 case None => 12 // 根據brokerAndFetcherId 去初始化Fetcher並啓動 fetcher 13 fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker) 14 fetcherThreadMap.put(brokerAndFetcherId, fetcherThread) 15 fetcherThread.start 16 } 17 18 fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (topicAndPartition, brokerAndInitOffset) => 19 topicAndPartition -> brokerAndInitOffset.initOffset 20 }) 21 } 22 } 23 24 info("Added fetcher for partitions %s".format(partitionAndOffsets.map{ case (topicAndPartition, brokerAndInitialOffset) => 25 "[" + topicAndPartition + ", initOffset " + brokerAndInitialOffset.initOffset + " to broker " + brokerAndInitialOffset.broker + "] "})) 26 }
kafka.consumer.ConsumerFetcherManager#createFetcherThread的源碼以下:
1 override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = { 2 new ConsumerFetcherThread( 3 "ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id), 4 config, sourceBroker, partitionMap, this) 5 }
先來看ConsumerFetcherThread的構造方法聲明:
1 class ConsumerFetcherThread(name: String, 2 val config: ConsumerConfig, 3 sourceBroker: Broker, 4 partitionMap: Map[TopicAndPartition, PartitionTopicInfo], 5 val consumerFetcherManager: ConsumerFetcherManager) 6 extends AbstractFetcherThread(name = name, 7 clientId = config.clientId, 8 sourceBroker = sourceBroker, 9 socketTimeout = config.socketTimeoutMs, 10 socketBufferSize = config.socketReceiveBufferBytes, 11 fetchSize = config.fetchMessageMaxBytes, 12 fetcherBrokerId = Request.OrdinaryConsumerId, 13 maxWait = config.fetchWaitMaxMs, 14 minBytes = config.fetchMinBytes, 15 isInterruptible = true)
注意,partitionMap 中的value 是PartitionTopicInfo ,這個對象中封裝了存放fetch結果值的BlockingQueue[FetchedDataChunk] 實例。
再來看 run 方法,其使用的是 kafka.utils.ShutdownableThread#run 方法,上面咱們已經看過了,主要看該子類是如何從新 doWork方法的:
1 override def doWork() { 2 inLock(partitionMapLock) { // 加鎖,執行,釋放鎖 3 if (partitionMap.isEmpty) // 若是沒有須要執行的 fetch 操做,等待200ms後返回 4 partitionMapCond.await(200L, TimeUnit.MILLISECONDS) 5 partitionMap.foreach { // 將全部的 fetch 的信息添加到fetchRequestBuilder中 6 case((topicAndPartition, offset)) => 7 fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, 8 offset, fetchSize) 9 } 10 } 11 // 構建批抓取的fetchRequest對象 12 val fetchRequest = fetchRequestBuilder.build() 13 // 處理 FetchRequest 14 if (!fetchRequest.requestInfo.isEmpty) 15 processFetchRequest(fetchRequest) 16 }
其中 kafka.server.AbstractFetcherThread#processFetchRequest 源碼以下:
1 private def processFetchRequest(fetchRequest: FetchRequest) { 2 val partitionsWithError = new mutable.HashSet[TopicAndPartition] 3 var response: FetchResponse = null 4 try { 5 trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest)) 6 // 發送請求,並獲取返回值。 7 // simpleConsumer 就是SimpleConsumer 實例,已做說明,再也不贅述。 8 response = simpleConsumer.fetch(fetchRequest) 9 } catch { 10 case t: Throwable => 11 if (isRunning.get) { 12 warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.toString)) 13 partitionMapLock synchronized { 14 partitionsWithError ++= partitionMap.keys 15 } 16 } 17 } 18 fetcherStats.requestRate.mark() 19 20 if (response != null) { 21 // process fetched data 22 inLock(partitionMapLock) { // 獲取鎖,執行處理response 操做,釋放鎖 23 response.data.foreach { 24 case(topicAndPartition, partitionData) => 25 val (topic, partitionId) = topicAndPartition.asTuple 26 val currentOffset = partitionMap.get(topicAndPartition) 27 // we append to the log if the current offset is defined and it is the same as the offset requested during fetch 28 if (currentOffset.isDefined && fetchRequest.requestInfo(topicAndPartition).offset == currentOffset.get) { 29 partitionData.error match { // 根據返回碼來肯定具體執行哪部分處理邏輯 30 case ErrorMapping.NoError => // 成功返回,沒有錯誤 31 try { 32 val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet] 33 val validBytes = messages.validBytes 34 val newOffset = messages.shallowIterator.toSeq.lastOption match { 35 case Some(m: MessageAndOffset) => m.nextOffset 36 case None => currentOffset.get 37 } 38 partitionMap.put(topicAndPartition, newOffset) 39 fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset 40 fetcherStats.byteRate.mark(validBytes) 41 // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread 42 processPartitionData(topicAndPartition, currentOffset.get, partitionData) 43 } catch { 44 case ime: InvalidMessageException => // 消息獲取不完整 45 // we log the error and continue. This ensures two things 46 // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag 47 // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and 48 // should get fixed in the subsequent fetches 49 logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage) 50 case e: Throwable => 51 throw new KafkaException("error processing data for partition [%s,%d] offset %d" 52 .format(topic, partitionId, currentOffset.get), e) 53 } 54 case ErrorMapping.OffsetOutOfRangeCode => // offset out of range error 55 try { 56 val newOffset = handleOffsetOutOfRange(topicAndPartition) 57 partitionMap.put(topicAndPartition, newOffset) 58 error("Current offset %d for partition [%s,%d] out of range; reset offset to %d" 59 .format(currentOffset.get, topic, partitionId, newOffset)) 60 } catch { 61 case e: Throwable => 62 error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e) 63 partitionsWithError += topicAndPartition 64 } 65 case _ => 66 if (isRunning.get) { 67 error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id, 68 ErrorMapping.exceptionFor(partitionData.error).getClass)) 69 partitionsWithError += topicAndPartition 70 } 71 } 72 } 73 } 74 } 75 } 76 77 if(partitionsWithError.size > 0) { 78 debug("handling partitions with error for %s".format(partitionsWithError)) 79 handlePartitionsWithErrors(partitionsWithError) 80 } 81 }
其中processPartitionData 源碼以下,它負責處理具體的返回消息:
1 // process fetched data 2 def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) { 3 // partitionMap 是一個成員變量,在構造函數中做爲入參 4 val pti = partitionMap(topicAndPartition) 5 if (pti.getFetchOffset != fetchOffset) 6 throw new RuntimeException("Offset doesn't match for partition [%s,%d] pti offset: %d fetch offset: %d" 7 .format(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset, fetchOffset)) 8 // 數據入隊 9 pti.enqueue(partitionData.messages.asInstanceOf[ByteBufferMessageSet]) 10 }
能夠看到,終於在這裏,把從leader中fetch的消息放入了BlockingQueue[FetchedDataChunk] 緩衝堵塞隊列中。
KafkaStream 是依賴於 LinkedBlockingQueue 的同理 KafkaStream 也會返回一個迭代器 kafka.consumer.ConsumerIterator,用於迭代訪問 KafkaStream 中的數據。
kafka.consumer.ConsumerIterator 的主要源碼以下:
1 // 判斷是否有下一個元素 2 def hasNext(): Boolean = { 3 if(state == FAILED) 4 throw new IllegalStateException("Iterator is in failed state") 5 state match { 6 case DONE => false 7 case READY => true 8 case _ => maybeComputeNext() 9 } 10 } 11 // 獲取下一個元素,父類實現 12 def next(): T = { 13 if(!hasNext()) 14 throw new NoSuchElementException() 15 state = NOT_READY 16 if(nextItem == null) 17 throw new IllegalStateException("Expected item but none found.") 18 nextItem 19 } 20 // 獲取下一個元素,使用子類ConsumerIterator實現 21 override def next(): MessageAndMetadata[K, V] = { 22 val item = super.next() // 調用父類實現 23 if(consumedOffset < 0) 24 throw new KafkaException("Offset returned by the message set is invalid %d".format(consumedOffset)) 25 currentTopicInfo.resetConsumeOffset(consumedOffset) 26 val topic = currentTopicInfo.topic 27 trace("Setting %s consumed offset to %d".format(topic, consumedOffset)) 28 consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark() 29 consumerTopicStats.getConsumerAllTopicStats().messageRate.mark() 30 item 31 } 32 // 或許有,嘗試計算一下下一個 33 def maybeComputeNext(): Boolean = { 34 state = FAILED 35 nextItem = makeNext() 36 if(state == DONE) { 37 false 38 } else { 39 state = READY 40 true 41 } 42 } 43 // 建立下一個元素,這個在子類ConsumerIterator中有實現 44 protected def makeNext(): MessageAndMetadata[K, V] = { 45 // 首先channel 是 LinkedBlockingQueue實例, 是 KafkaStream 中的 queue 成員變量,queue 成員變量 46 var currentDataChunk: FetchedDataChunk = null 47 // if we don't have an iterator, get one 48 var localCurrent = current.get() 49 // 若是沒有迭代器或者是沒有下一個元素了,須要從channel中取一個 50 if(localCurrent == null || !localCurrent.hasNext) { 51 // 刪除並返回隊列的頭節點。 52 if (consumerTimeoutMs < 0) 53 currentDataChunk = channel.take // 阻塞方法,一直等待,直到有可用元素 54 else { 55 currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS) // 阻塞方法,等待指定時間,超時也會返回 56 if (currentDataChunk == null) { // 若是沒有數據,重置狀態爲NOT_READY 57 // reset state to make the iterator re-iterable 58 resetState() 59 throw new ConsumerTimeoutException 60 } 61 } 62 // 關閉命令 63 if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) { 64 debug("Received the shutdown command") 65 return allDone // 該函數將狀態設爲DONE, 返回null 66 } else { 67 currentTopicInfo = currentDataChunk.topicInfo 68 val cdcFetchOffset = currentDataChunk.fetchOffset 69 val ctiConsumeOffset = currentTopicInfo.getConsumeOffset 70 if (ctiConsumeOffset < cdcFetchOffset) { 71 error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data" 72 .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo)) 73 currentTopicInfo.resetConsumeOffset(cdcFetchOffset) 74 } 75 localCurrent = currentDataChunk.messages.iterator 76 77 current.set(localCurrent) 78 } 79 // if we just updated the current chunk and it is empty that means the fetch size is too small! 80 if(currentDataChunk.messages.validBytes == 0) 81 throw new MessageSizeTooLargeException("Found a message larger than the maximum fetch size of this consumer on topic " + 82 "%s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow." 83 .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset)) 84 } 85 var item = localCurrent.next() 86 // reject the messages that have already been consumed 87 while (item.offset < currentTopicInfo.getConsumeOffset && localCurrent.hasNext) { 88 item = localCurrent.next() 89 } 90 consumedOffset = item.nextOffset 91 92 item.message.ensureValid() // validate checksum of message to ensure it is valid 93 // 返回處理封裝好的 kafka 數據 94 new MessageAndMetadata(currentTopicInfo.topic, currentTopicInfo.partitionId, item.message, item.offset, keyDecoder, valueDecoder) 95 }
咱們再來看,org.apache.spark.streaming.kafka.ReliableKafkaReceiver#onStart 的第10 步相應的代碼:
1 // 10. 將待處理的MessageHandler 放入 線程池中,等待執行 2 topicMessageStreams.values.foreach { streams => 3 streams.foreach { stream => 4 messageHandlerThreadPool.submit(new MessageHandler(stream)) 5 } 6 }
其中 MessageHandler 是一個 Runnable 對象,其 run 方法以下:
1 override def run(): Unit = { 2 while (!isStopped) { 3 try { 4 // 1. 獲取ConsumerIterator 迭代器對象 5 val streamIterator = stream.iterator() 6 // 2. 遍歷迭代器中獲取每一條數據,而且保存message和相應的 metadata 信息 7 while (streamIterator.hasNext) { 8 storeMessageAndMetadata(streamIterator.next) 9 } 10 } catch { 11 case e: Exception => 12 reportError("Error handling message", e) 13 } 14 } 15 }
其中第二步中關鍵方法,org.apache.spark.streaming.kafka.ReliableKafkaReceiver#storeMessageAndMetadata 方法以下:
1 /** Store a Kafka message and the associated metadata as a tuple. */ 2 private def storeMessageAndMetadata( 3 msgAndMetadata: MessageAndMetadata[K, V]): Unit = { 4 val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.partition) 5 val data = (msgAndMetadata.key, msgAndMetadata.message) 6 val metadata = (topicAndPartition, msgAndMetadata.offset) 7 // 添加數據到 block 8 blockGenerator.addDataWithCallback(data, metadata) 9 }
addDataWithCallback 源碼以下:
1 /** 2 * Push a single data item into the buffer. After buffering the data, the 3 * `BlockGeneratorListener.onAddData` callback will be called. 4 */ 5 def addDataWithCallback(data: Any, metadata: Any): Unit = { 6 if (state == Active) { 7 waitToPush() 8 synchronized { 9 if (state == Active) { 10 // 1. 將數據放入 buffer 中,以便處理線程從中獲取數據 11 currentBuffer += data 12 // 2. 在啓動 receiver線程中,能夠知道listener 是指GeneratedBlockHandler 實例 13 listener.onAddData(data, metadata) 14 } else { 15 throw new SparkException( 16 "Cannot add data as BlockGenerator has not been started or has been stopped") 17 } 18 } 19 } else { 20 throw new SparkException( 21 "Cannot add data as BlockGenerator has not been started or has been stopped") 22 } 23 }
第二步比較簡單,先看一下第二步:
org.apache.spark.streaming.kafka.ReliableKafkaReceiver.GeneratedBlockHandler#onAddData的源碼以下:
1 def onAddData(data: Any, metadata: Any): Unit = { 2 // Update the offset of the data that was added to the generator 3 if (metadata != null) { 4 val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)] 5 updateOffset(topicAndPartition, offset) 6 } 7 } 8 // 這裏的 updateOffset 調用的是//org.apache.spark.streaming.kafka.ReliableKafkaReceiver#updateOffset,源碼以下: 9 /** Update stored offset */ 10 private def updateOffset(topicAndPartition: TopicAndPartition, offset: Long): Unit = { 11 topicPartitionOffsetMap.put(topicAndPartition, offset) 12 }
第一步的原理以下:
在 BlockGenerator中有一個定時器,定時(200ms)去執行檢查currentBuffer是否爲empty任務, 若不爲空,則執行以下操做並把它放入等待生成block 的隊列中,有兩外一個線程來時刻監聽這個隊列,有數據,則執行pushBlock 操做。
第一個定時器線程以下:
1 private val blockIntervalTimer = 2 new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator") 3 4 // 其中,updateCurrentBuffer 方法以下 5 /** Change the buffer to which single records are added to. */ 6 private def updateCurrentBuffer(time: Long): Unit = { 7 try { 8 var newBlock: Block = null 9 synchronized { 10 if (currentBuffer.nonEmpty) { 11 val newBlockBuffer = currentBuffer 12 currentBuffer = new ArrayBuffer[Any] 13 val blockId = StreamBlockId(receiverId, time - blockIntervalMs) 14 listener.onGenerateBlock(blockId) 15 newBlock = new Block(blockId, newBlockBuffer) 16 } 17 } 18 19 if (newBlock != null) { 20 blocksForPushing.put(newBlock) // put is blocking when queue is full 21 } 22 } catch { 23 case ie: InterruptedException => 24 logInfo("Block updating timer thread was interrupted") 25 case e: Exception => 26 reportError("Error in block updating thread", e) 27 } 28 } 29 30 // listener.onGenerateBlock(blockId) 代碼以下: 31 def onGenerateBlock(blockId: StreamBlockId): Unit = { 32 // Remember the offsets of topics/partitions when a block has been generated 33 rememberBlockOffsets(blockId) 34 } 35 // rememberBlockOffsets 代碼以下: 36 /** 37 * Remember the current offsets for each topic and partition. This is called when a block is 38 * generated. 39 */ 40 private def rememberBlockOffsets(blockId: StreamBlockId): Unit = { 41 // Get a snapshot of current offset map and store with related block id. 42 val offsetSnapshot = topicPartitionOffsetMap.toMap 43 blockOffsetMap.put(blockId, offsetSnapshot) 44 topicPartitionOffsetMap.clear() 45 } 46 // 能夠看出,主要是清除 topic-partition-> offset 映射關係 47 // 創建 block 和topic-partition-> offset的映射關係
其中,blocksForPushing是一個有界阻塞隊列,另一個線程會一直輪詢它。
1 private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize) 2 private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } 3 4 /** Keep pushing blocks to the BlockManager. */ 5 // 這個方法主要的做用就是一直不停地輪詢blocksForPushing隊列,並處理相應的push block 事件。 6 private def keepPushingBlocks() { 7 logInfo("Started block pushing thread") 8 9 def areBlocksBeingGenerated: Boolean = synchronized { 10 state != StoppedGeneratingBlocks 11 } 12 13 try { 14 // While blocks are being generated, keep polling for to-be-pushed blocks and push them. 15 while (areBlocksBeingGenerated) { // 線程沒有被中止,則一直循環 16 // 超時poll操做獲取並刪除頭節點,超過期間(10ms)則返回 17 Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match { 18 case Some(block) => pushBlock(block) // 若是有數據則進行處理。 19 case None => 20 } 21 } 22 23 // At this point, state is StoppedGeneratingBlock. So drain the queue of to-be-pushed blocks. 24 logInfo("Pushing out the last " + blocksForPushing.size() + " blocks") 25 while (!blocksForPushing.isEmpty) { // 若是隊列中還有數據,繼續進行處理 26 val block = blocksForPushing.take() // 這是一個堵塞方法,不過如今會立刻返回,由於隊列裏面有數據。 27 logDebug(s"Pushing block $block") 28 pushBlock(block) // 處理數據 29 logInfo("Blocks left to push " + blocksForPushing.size()) 30 } 31 logInfo("Stopped block pushing thread") 32 } catch { 33 case ie: InterruptedException => 34 logInfo("Block pushing thread was interrupted") 35 case e: Exception => 36 reportError("Error in block pushing thread", e) 37 } 38 }
其中的pushBlock源碼以下:
1 private def pushBlock(block: Block) { 2 listener.onPushBlock(block.id, block.buffer) 3 logInfo("Pushed block " + block.id) 4 }
其調用的listener(org.apache.spark.streaming.kafka.ReliableKafkaReceiver.GeneratedBlockHandler)的 onPushBlock 源碼以下:
1 def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = { 2 // Store block and commit the blocks offset 3 storeBlockAndCommitOffset(blockId, arrayBuffer) 4 }
其中,storeBlockAndCommitOffset具體代碼以下:
1 /** 2 * Store the ready-to-be-stored block and commit the related offsets to zookeeper. This method 3 * will try a fixed number of times to push the block. If the push fails, the receiver is stopped. 4 */ 5 private def storeBlockAndCommitOffset( 6 blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = { 7 var count = 0 8 var pushed = false 9 var exception: Exception = null 10 while (!pushed && count <= 3) { // 整個過程,總共容許3 次重試 11 try { 12 store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]]) 13 pushed = true 14 } catch { 15 case ex: Exception => 16 count += 1 17 exception = ex 18 } 19 } 20 if (pushed) { // 已經push block 21 // 更新 offset 22 Option(blockOffsetMap.get(blockId)).foreach(commitOffset) 23 // 若是已經push 到 BlockManager 中,則不會再保留 block和topic-partition-> offset的映射關係 24 blockOffsetMap.remove(blockId) 25 } else { 26 stop("Error while storing block into Spark", exception) 27 } 28 } 29 // 其中,commitOffset源碼以下: 30 /** 31 * Commit the offset of Kafka's topic/partition, the commit mechanism follow Kafka 0.8.x's 32 * metadata schema in Zookeeper. 33 */ 34 private def commitOffset(offsetMap: Map[TopicAndPartition, Long]): Unit = { 35 if (zkClient == null) { 36 val thrown = new IllegalStateException("Zookeeper client is unexpectedly null") 37 stop("Zookeeper client is not initialized before commit offsets to ZK", thrown) 38 return 39 } 40 41 for ((topicAndPart, offset) <- offsetMap) { 42 try { 43 // 獲取在 zk 中 comsumer 的partition的目錄 44 val topicDirs = new ZKGroupTopicDirs(groupId, topicAndPart.topic) 45 val zkPath = s"${topicDirs.consumerOffsetDir}/${topicAndPart.partition}" 46 // 更新 consumer 的已消費topic-partition 的offset 操做 47 ZkUtils.updatePersistentPath(zkClient, zkPath, offset.toString) 48 } catch { 49 case e: Exception => 50 logWarning(s"Exception during commit offset $offset for topic" + 51 s"${topicAndPart.topic}, partition ${topicAndPart.partition}", e) 52 } 53 54 logInfo(s"Committed offset $offset for topic ${topicAndPart.topic}, " + 55 s"partition ${topicAndPart.partition}") 56 } 57 }
關鍵方法store 以下:
1 /** Store an ArrayBuffer of received data as a data block into Spark's memory. */ 2 def store(dataBuffer: ArrayBuffer[T]) { 3 supervisor.pushArrayBuffer(dataBuffer, None, None) 4 }
其調用了supervisor(org.apache.spark.streaming.receiver.ReceiverSupervisorImpl實例)的pushArrayBuffer方法,內部操做以下:
1 /** Store an ArrayBuffer of received data as a data block into Spark's memory. */ 2 def pushArrayBuffer( 3 arrayBuffer: ArrayBuffer[_], 4 metadataOption: Option[Any], 5 blockIdOption: Option[StreamBlockId] 6 ) { 7 pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption) 8 }
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl#pushAndReportBlock 源碼以下:
1 /** Store block and report it to driver */ 2 def pushAndReportBlock( 3 receivedBlock: ReceivedBlock, 4 metadataOption: Option[Any], 5 blockIdOption: Option[StreamBlockId] 6 ) { 7 // 1.準備blockId,time等信息 8 val blockId = blockIdOption.getOrElse(nextBlockId) 9 val time = System.currentTimeMillis 10 // 2. 執行存儲 block 操做 11 val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock) 12 logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms") 13 // 3. 獲取保存的message 的記錄數 14 val numRecords = blockStoreResult.numRecords 15 // 4. 通知trackerEndpoint已經添加block,執行更新driver 的WAL操做 16 val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult) 17 trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo)) 18 logDebug(s"Reported block $blockId") 19 }
其中,receivedBlockHandler 的賦值語句以下:
1 private val receivedBlockHandler: ReceivedBlockHandler = { 2 if (WriteAheadLogUtils.enableReceiverLog(env.conf)) { 3 if (checkpointDirOption.isEmpty) { 4 throw new SparkException( 5 "Cannot enable receiver write-ahead log without checkpoint directory set. " + 6 "Please use streamingContext.checkpoint() to set the checkpoint directory. " + 7 "See documentation for more details.") 8 } 9 // enable WAL而且checkpoint dir 不爲空,即,在這裏,返回WriteAheadLogBasedBlockHandler 對象,這個對象持有了 blockmanager,streamid,storagelevel,conf,checkpointdir 等信息 10 new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId, 11 receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get) 12 } else { 13 new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel) 14 } 15 }
ReceivedBlockHandler 的 storeBlock方法源碼以下:
1 /** 2 * This implementation stores the block into the block manager as well as a write ahead log. 3 * It does this in parallel, using Scala Futures, and returns only after the block has 4 * been stored in both places. 5 */ 6 // 並行地將block 存入 blockmanager 和 write ahead log,使用scala 的Future 機制實現的,當兩個都寫完畢以後,返回。 7 def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = { 8 9 var numRecords = None: Option[Long] 10 // Serialize the block so that it can be inserted into both 11 // 1. 將ReceivedBlock序列化(未使用壓縮機制)成字節數組 12 val serializedBlock = block match { // serializedBlock 就是序列化後的結果 13 case ArrayBufferBlock(arrayBuffer) => // go this branch 14 numRecords = Some(arrayBuffer.size.toLong) 15 blockManager.dataSerialize(blockId, arrayBuffer.iterator) 16 case IteratorBlock(iterator) => 17 val countIterator = new CountingIterator(iterator) 18 val serializedBlock = blockManager.dataSerialize(blockId, countIterator) 19 numRecords = countIterator.count 20 serializedBlock 21 case ByteBufferBlock(byteBuffer) => 22 byteBuffer 23 case _ => 24 throw new Exception(s"Could not push $blockId to block manager, unexpected block type") 25 } 26 27 // 2. Store the block in block manager 28 val storeInBlockManagerFuture = Future { 29 val putResult = 30 blockManager.putBytes(blockId, serializedBlock, effectiveStorageLevel, tellMaster = true) 31 if (!putResult.map { _._1 }.contains(blockId)) { 32 throw new SparkException( 33 s"Could not store $blockId to block manager with storage level $storageLevel") 34 } 35 } 36 37 // 3. Store the block in write ahead log 38 val storeInWriteAheadLogFuture = Future { 39 writeAheadLog.write(serializedBlock, clock.getTimeMillis()) 40 } 41 42 // 4. Combine the futures, wait for both to complete, and return the write ahead log record handle 43 val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2) 44 // 等待future任務結果返回。默認時間是 30s, 使用spark.streaming.receiver.blockStoreTimeout 參數來變動默認值 45 val walRecordHandle = Await.result(combinedFuture, blockStoreTimeout) 46 // 返回cache以後的block 相關信息 47 WriteAheadLogBasedStoreResult(blockId, numRecords, walRecordHandle) 48 }
注意WriteAheadLogBasedStoreResult 這個 WriteAheadLogBasedStoreResult 實例,後面 RDD 在處理的時候會使用到。
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl#pushAndReportBlock 通知driver addBlock 的源碼以下:
1 // 4. 通知trackerEndpoint已經添加block,執行更新driver 的WAL操做 2 val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult) 3 trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo)) 4 logDebug(s"Reported block $blockId")
跳過中間的RPC操做,直接到 driver 端org.apache.spark.streaming.scheduler.ReceiverTracker.ReceiverTrackerEndpoint#receiveAndReply 中:
1 case AddBlock(receivedBlockInfo) => 2 if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) { 3 walBatchingThreadPool.execute(new Runnable { 4 override def run(): Unit = Utils.tryLogNonFatalError { 5 if (active) { 6 context.reply(addBlock(receivedBlockInfo)) 7 } else { 8 throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.") 9 } 10 } 11 }) 12 } else { 13 context.reply(addBlock(receivedBlockInfo)) 14 }
其中 addBlock方法源碼以下:
1 /** Add new blocks for the given stream */ 2 private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { 3 receivedBlockTracker.addBlock(receivedBlockInfo) 4 }
其中,org.apache.spark.streaming.scheduler.ReceivedBlockTracker#addBlock 源碼以下:
1 /** Add received block. This event will get written to the write ahead log (if enabled). */ 2 def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { 3 try { 4 val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) 5 if (writeResult) { 6 synchronized { 7 getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo 8 } 9 logDebug(s"Stream ${receivedBlockInfo.streamId} received " + 10 s"block ${receivedBlockInfo.blockStoreResult.blockId}") 11 } else { 12 logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " + 13 s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.") 14 } 15 writeResult 16 } catch { 17 case NonFatal(e) => 18 logError(s"Error adding block $receivedBlockInfo", e) 19 false 20 } 21 } 22 /** Write an update to the tracker to the write ahead log */ 23 private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = { 24 if (isWriteAheadLogEnabled) { 25 logTrace(s"Writing record: $record") 26 try { 27 writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)), 28 clock.getTimeMillis()) 29 true 30 } catch { 31 case NonFatal(e) => 32 logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e) 33 false 34 } 35 } else { 36 true 37 } 38 } 39 /** Get the queue of received blocks belonging to a particular stream */ 40 private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = { 41 streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue) 42 }
上述代碼,主要是將BlockAdditionEvent寫WAL和更新隊列(其實就是mutable.HashMap[Int, ReceivedBlockQueue]),這個隊列中存放的是streamId ->UnallocatedBlock 的映射關係
createStream 源碼以下:
1 /** 2 * Create an input stream that pulls messages from Kafka Brokers. 3 * @param ssc StreamingContext object 4 * @param kafkaParams Map of kafka configuration parameters, 5 * see http://kafka.apache.org/08/configuration.html 6 * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed 7 * in its own thread. 8 * @param storageLevel Storage level to use for storing the received objects 9 * @tparam K type of Kafka message key 10 * @tparam V type of Kafka message value 11 * @tparam U type of Kafka message key decoder 12 * @tparam T type of Kafka message value decoder 13 * @return DStream of (Kafka message key, Kafka message value) 14 */ 15 def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag]( 16 ssc: StreamingContext, 17 kafkaParams: Map[String, String], 18 topics: Map[String, Int], 19 storageLevel: StorageLevel 20 ): ReceiverInputDStream[(K, V)] = { 21 // 能夠經過設置spark.streaming.receiver.writeAheadLog.enable參數爲 true來開啓WAL 22 val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf) 23 new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel) 24 }
建立的是KafkaInputDStream對象:
1 /** 2 * Input stream that pulls messages from a Kafka Broker. 3 * 4 * @param kafkaParams Map of kafka configuration parameters. 5 * See: http://kafka.apache.org/configuration.html 6 * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed 7 * in its own thread. 8 * @param storageLevel RDD storage level. 9 */ 10 private[streaming] 11 class KafkaInputDStream[ 12 K: ClassTag, 13 V: ClassTag, 14 U <: Decoder[_]: ClassTag, 15 T <: Decoder[_]: ClassTag]( 16 ssc_ : StreamingContext, 17 kafkaParams: Map[String, String], 18 topics: Map[String, Int], 19 useReliableReceiver: Boolean, 20 storageLevel: StorageLevel 21 ) extends ReceiverInputDStream[(K, V)](ssc_) with Logging { 22 23 def getReceiver(): Receiver[(K, V)] = { 24 if (!useReliableReceiver) { // 未啓用 WAL,會使用 KafkaReceiver 對象 25 new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) 26 } else { // 若是啓用了WAL, 使用ReliableKafkaReceiver 27 new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) 28 } 29 } 30 }
org.apache.spark.streaming.kafka.KafkaInputDStream 繼承父類的 compute方法:
1 /** 2 * Generates RDDs with blocks received by the receiver of this stream. */ 3 override def compute(validTime: Time): Option[RDD[T]] = { 4 val blockRDD = { 5 6 if (validTime < graph.startTime) { 7 // If this is called for any time before the start time of the context, 8 // then this returns an empty RDD. This may happen when recovering from a 9 // driver failure without any write ahead log to recover pre-failure data. 10 new BlockRDD[T](ssc.sc, Array.empty) 11 } else { 12 // Otherwise, ask the tracker for all the blocks that have been allocated to this stream 13 // for this batch 14 val receiverTracker = ssc.scheduler.receiverTracker 15 val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty) 16 17 // Register the input blocks information into InputInfoTracker 18 val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum) 19 ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) 20 21 // Create the BlockRDD 22 createBlockRDD(validTime, blockInfos) 23 } 24 } 25 Some(blockRDD) 26 }
getBlocksOfBatch 以下:
1 /** Get the blocks for the given batch and all input streams. */ 2 def getBlocksOfBatch(batchTime: Time): Map[Int, Seq[ReceivedBlockInfo]] = { 3 receivedBlockTracker.getBlocksOfBatch(batchTime) 4 } 5 調用: 6 /** Get the blocks allocated to the given batch. */ 7 def getBlocksOfBatch(batchTime: Time): Map[Int, Seq[ReceivedBlockInfo]] = synchronized { 8 timeToAllocatedBlocks.get(batchTime).map { _.streamIdToAllocatedBlocks }.getOrElse(Map.empty) 9 }
在 org.apache.spark.streaming.scheduler.JobGenerator 中聲明瞭一個定時器:
1 // timer 會按照批次間隔 生成 GenerateJobs 任務,並放入eventLoop 堵塞隊列中 2 private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, 3 longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
EventLoop 實例化代碼以下:
1 eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { 2 override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) 3 4 override protected def onError(e: Throwable): Unit = { 5 jobScheduler.reportError("Error in job generator", e) 6 } 7 } 8 eventLoop.start()
EventLoop裏定義了一個LinkedBlockingDeque雙端堵塞隊列和一個執行daemon線程,daemon線程會不停從 雙端堵塞隊列中堵塞式取數據,一旦取到數據,會調 onReceive 方法,即 processEvent 方法:
1 /** Processes all events */ 2 private def processEvent(event: JobGeneratorEvent) { 3 logDebug("Got event " + event) 4 event match { 5 case GenerateJobs(time) => generateJobs(time) 6 case ClearMetadata(time) => clearMetadata(time) 7 case DoCheckpoint(time, clearCheckpointDataLater) => 8 doCheckpoint(time, clearCheckpointDataLater) 9 case ClearCheckpointData(time) => clearCheckpointData(time) 10 } 11 }
因爲是GenerateJobs 事件, 會繼續調用generateJobs 方法:
1 /** Generate jobs and perform checkpoint for the given `time`. */ 2 private def generateJobs(time: Time) { 3 // Set the SparkEnv in this thread, so that job generation code can access the environment 4 // Example: BlockRDDs are created in this thread, and it needs to access BlockManager 5 // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed. 6 SparkEnv.set(ssc.env) 7 Try { 8 // 1. 將 WAL block 信息 分配給batch(這些數據塊信息是worker 節點cache 到WAL 以後發送給driver 端的) 9 jobScheduler.receiverTracker.allocateBlocksToBatch(time) 10 // 2. 使用分配的block數據塊來生成任務 11 graph.generateJobs(time) // generate jobs using allocated block 12 } match { 13 case Success(jobs) => 14 val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) 15 jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) 16 case Failure(e) => 17 jobScheduler.reportError("Error generating jobs for time " + time, e) 18 } 19 // 發佈DoCheckpoint 事件,保存checkpoint操做,主要是將新的checkpoint 數據寫入到 hdfs, 刪除舊的 checkpoint 數據 20 eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) 21 }
第一步中調用的
org.apache.spark.streaming.scheduler.ReceiverTracker#allocateBlocksToBatch方法以下:
1 /** Allocate all unallocated blocks to the given batch. */ 2 def allocateBlocksToBatch(batchTime: Time): Unit = { 3 if (receiverInputStreams.nonEmpty) { 4 receivedBlockTracker.allocateBlocksToBatch(batchTime) 5 } 6 }
其中,org.apache.spark.streaming.scheduler.ReceivedBlockTracker#allocateBlocksToBatch 方法以下:
1 def allocateBlocksToBatch(batchTime: Time): Unit = synchronized { 2 if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) { 3 // 遍歷輸入流,根據流的 streamId 獲取未被分配的block隊列,並返回[streamId, seq[receivedBlockInfo]],由此可知,到此爲止,數據其實已經從receiver中讀出來了。 4 // 獲取 streamid和 WAL的blocks 的映射關係 5 val streamIdToBlocks = streamIds.map { streamId => 6 (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true)) 7 }.toMap 8 val allocatedBlocks = AllocatedBlocks(streamIdToBlocks) 9 if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) { 10 timeToAllocatedBlocks.put(batchTime, allocatedBlocks) 11 lastAllocatedBatchTime = batchTime 12 } else { 13 logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery") 14 } 15 } else { 16 // This situation occurs when: 17 // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent, 18 // possibly processed batch job or half-processed batch job need to be processed again, 19 // so the batchTime will be equal to lastAllocatedBatchTime. 20 // 2. Slow checkpointing makes recovered batch time older than WAL recovered 21 // lastAllocatedBatchTime. 22 // This situation will only occurs in recovery time. 23 logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery") 24 } 25 }
其中,getReceivedBlockQueue的源碼以下:
1 /** Get the queue of received blocks belonging to a particular stream */ 2 private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = { 3 streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue) 4 }
能夠看到,worker node 發送過來的block 數據被取出來了。
org.apache.spark.streaming.dstream.ReceiverInputDStream#createBlockRDD 源碼以下:
1 private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = { 2 3 if (blockInfos.nonEmpty) { 4 val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray 5 // 全部的block已經有了WriteAheadLogRecordHandle, 建立一個WALBackedBlockRDD便可, 不然建立BlockRDD。 6 // 其中,WriteAheadLogRecordHandle 是一個跟WAL 相關聯的EntryInfo,實現類FileBasedWriteAheadLogSegment就包含了WAL segment 的path, offset 以及 length 信息。RDD 在真正須要數據時,根據這些handle信息從 WAL 中讀取數據。 7 // Are WAL record handles present with all the blocks 8 val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty } 9 10 if (areWALRecordHandlesPresent) { 11 // If all the blocks have WAL record handle, then create a WALBackedBlockRDD 12 val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray 13 val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray 14 new WriteAheadLogBackedBlockRDD[T]( 15 ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid) 16 } else { 17 // Else, create a BlockRDD. However, if there are some blocks with WAL info but not 18 // others then that is unexpected and log a warning accordingly. 19 if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) { 20 if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) { 21 logError("Some blocks do not have Write Ahead Log information; " + 22 "this is unexpected and data may not be recoverable after driver failures") 23 } else { 24 logWarning("Some blocks have Write Ahead Log information; this is unexpected") 25 } 26 } 27 val validBlockIds = blockIds.filter { id => 28 ssc.sparkContext.env.blockManager.master.contains(id) 29 } 30 if (validBlockIds.size != blockIds.size) { 31 logWarning("Some blocks could not be recovered as they were not found in memory. " + 32 "To prevent such data loss, enabled Write Ahead Log (see programming guide " + 33 "for more details.") 34 } 35 new BlockRDD[T](ssc.sc, validBlockIds) 36 } 37 } else { 38 // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD 39 // according to the configuration 40 if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) { 41 new WriteAheadLogBackedBlockRDD[T]( 42 ssc.sparkContext, Array.empty, Array.empty, Array.empty) 43 } else { 44 new BlockRDD[T](ssc.sc, Array.empty) 45 } 46 } 47 }
org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD#compute 的源碼以下:
1 /** 2 * Gets the partition data by getting the corresponding block from the block manager. 3 * If the block does not exist, then the data is read from the corresponding record 4 * in write ahead log files. 5 */ 6 override def compute(split: Partition, context: TaskContext): Iterator[T] = { 7 assertValid() 8 val hadoopConf = broadcastedHadoopConf.value 9 val blockManager = SparkEnv.get.blockManager 10 val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition] 11 val blockId = partition.blockId 12 13 def getBlockFromBlockManager(): Option[Iterator[T]] = { 14 blockManager.get(blockId).map(_.data.asInstanceOf[Iterator[T]]) 15 } 16 17 def getBlockFromWriteAheadLog(): Iterator[T] = { 18 var dataRead: ByteBuffer = null 19 var writeAheadLog: WriteAheadLog = null 20 try { 21 // The WriteAheadLogUtils.createLog*** method needs a directory to create a 22 // WriteAheadLog object as the default FileBasedWriteAheadLog needs a directory for 23 // writing log data. However, the directory is not needed if data needs to be read, hence 24 // a dummy path is provided to satisfy the method parameter requirements. 25 // FileBasedWriteAheadLog will not create any file or directory at that path. 26 // FileBasedWriteAheadLog will not create any file or directory at that path. Also, 27 // this dummy directory should not already exist otherwise the WAL will try to recover 28 // past events from the directory and throw errors. 29 val nonExistentDirectory = new File( 30 System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString).getAbsolutePath 31 writeAheadLog = WriteAheadLogUtils.createLogForReceiver( 32 SparkEnv.get.conf, nonExistentDirectory, hadoopConf) 33 dataRead = writeAheadLog.read(partition.walRecordHandle) 34 } catch { 35 case NonFatal(e) => 36 throw new SparkException( 37 s"Could not read data from write ahead log record ${partition.walRecordHandle}", e) 38 } finally { 39 if (writeAheadLog != null) { 40 writeAheadLog.close() 41 writeAheadLog = null 42 } 43 } 44 if (dataRead == null) { 45 throw new SparkException( 46 s"Could not read data from write ahead log record ${partition.walRecordHandle}, " + 47 s"read returned null") 48 } 49 logInfo(s"Read partition data of $this from write ahead log, record handle " + 50 partition.walRecordHandle) 51 if (storeInBlockManager) { 52 blockManager.putBytes(blockId, dataRead, storageLevel) 53 logDebug(s"Stored partition data of $this into block manager with level $storageLevel") 54 dataRead.rewind() 55 } 56 blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]] 57 } 58 // 若是partition.isBlockIdValid 爲true,則說明該 block 數據存在executors 中 59 if (partition.isBlockIdValid) { 60 // 先根據 BlockManager從 executor中讀取數據, 若是沒有,再從WAL 中讀取數據 61 // BlockManager 從內存仍是從磁盤上獲取的數據 ? 62 blockManager 從 local 或 remote 獲取 block,其中 local既能夠從 memory 中獲取也能夠從 磁盤中讀取, 其中remote獲取數據是同步的,即在fetch block 過程當中會一直blocking。 63 getBlockFromBlockManager().getOrElse { getBlockFromWriteAheadLog() } 64 } else { 65 getBlockFromWriteAheadLog() 66 } 67 }
至此,從啓動 receiver,到receiver 接收數據並保存到WAL block,driver 接收WAL 的block 信息,直到spark streaming 經過WAL RDD 來獲取數據等等都一一作了說明。
原文出處:https://www.cnblogs.com/johnny666888/p/11100334.html