uReplicator實現分析

MirrorMakerWorker分析

是整個同步機制的主入口,主要組織的邏輯有:數組

  • 配置數據的傳入與處理,ConsumerConfig對象的構建
  • 度量對象的準備,定時上報的度量數據收集線程的定義與啓動
  • CompactConsumerFetcherManager實例的建立與startConnections
  • 根據fetchNum建立KafkaConnector實例,KafkaConnector實例中會關聯CompactConsumerFetcherManager實例
  • 添加Helix Controller
  • 添加優雅關閉的鉤子
  • 構造producer的config producerProps
  • 根據維護KafkaConnector實例的connectorMap來建立MirrorMakerThread實例並啓動
  • 經過shutdownLatch: CountDownLatch來等待關閉退出main方法

與Helix關聯的地方

實現HelixWorkerOnlineOfflineStateModelFactory和OnlineOfflineStateModel,OnlineOfflineStateModel能夠理解成是一個監聽器。實例在上下線切換時能夠監聽到。ide

helixZkManager = HelixManagerFactory.getZKHelixManager(helixClusterName, instanceId, InstanceType.PARTICIPANT, zkServer)
val stateMachineEngine: StateMachineEngine = helixZkManager.getStateMachineEngine()
// register the MirrorMaker worker
val stateModelFactory = new HelixWorkerOnlineOfflineStateModelFactory(instanceId, fetchNum, connectorMap)
stateMachineEngine.registerStateModelFactory("OnlineOffline", stateModelFactory)
helixZkManager.connect()
helixAdmin = helixZkManager.getClusterManagmentTool
class HelixWorkerOnlineOfflineStateModelFactory(final val instanceId: String, final val fetchNum: Int,
                                                final val connectorMap: ConcurrentHashMap[String, KafkaConnector]) extends StateModelFactory[StateModel] {

  override def createNewStateModel(partitionName: String) = new OnlineOfflineStateModel(instanceId, connectorMap)


  // register mm instance
  class OnlineOfflineStateModel(final val instanceId: String, final val connectors: ConcurrentHashMap[String, KafkaConnector]) extends StateModel {

    def onBecomeOnlineFromOffline(message: Message, context: NotificationContext) = {
      // add topic partition on the instance
      connectorMap.get(getFetcherId(message.getResourceName, message.getPartitionName.toInt)).addTopicPartition(message.getResourceName, message.getPartitionName.toInt)
    }

    def onBecomeOfflineFromOnline(message: Message, context: NotificationContext) = {
      // delete topic partition on the instance
      connectorMap.get(getFetcherId(message.getResourceName, message.getPartitionName.toInt)).deleteTopicPartition(message.getResourceName, message.getPartitionName.toInt)
    }

    def onBecomeDroppedFromOffline(message: Message, context: NotificationContext) = {
      // do nothing
    }

    private def getFetcherId(topic: String, partitionId: Int): String = {
      "" + Utils.abs(31 * topic.hashCode() + partitionId) % fetchNum
    }
  }

}

run方法邏輯

  • 經過KafkaConnector拿到KafkaStream,經過KafkaStream拿到ConsumerIterator
  • 在沒有關閉時,一直迭代ConsumerIterator
  • 拿到迭代器中的數據,就是取到的消息(爲何迭代器中能一直有消息,由於這樣反推iter-->KafkaStream-->KafkaConnector+Queue-->PartitionTopicInfo-->fetcherManager.partitionAddMap-->fetcherManager.partitionInfoMap-->fetcherManager.createFetcherThread-->CompactConsumerFetcherThread.partitionInfoMap-->CompactConsumerFetcherThread.processPartitionData-->CompactConsumerFetcherThread.doWork-->ShutdownableThread.run//spin)
  • 通過MirrorMakerMessageHandler處理消息造成ProducerRecord數組實例,主要是分區對齊
  • 用producer發到目標集羣
  • 用maybeFlushAndCommitOffsets方法flush並提交offset
  • 真正commit offset的動做由自行實現的KafkaConnector完成,記錄在ZK上,提交是定時提交

CompactConsumerFetcherThread分析

概述

CompactConsumerFetcherThread是繼承自Kafka提供的ShutdownableThreadShutdownableThread內部會在isRunning標誌位ok的狀況下以spin的形式一直調用doWork方法。fetch

override def run(): Unit = {
    info("Starting ")
    try{
      while(isRunning.get()){
        doWork()
      }
    } catch{
      case e: Throwable =>
        if(isRunning.get())
          error("Error due to ", e)
    }
    shutdownLatch.countDown()
    info("Stopped ")
  }

doWork方法分析

  • 鎖定partitionMapLock
  • 鎖定updateMapLock
  • 將partitionAddMap中的數據放到partitionMap,而後清空partitionAddMap
  • 將partitionDeleteMap中的數據從partitionMap中移除並移除fetcherLagStats中對應的stat,而後清空partitionDeleteMap
  • 迭代partitionMap將須要拉取的topic、partition、fetchoffset、fetchsize等信息加入fetchRequestBuilder
  • 用fetchRequestBuilder構造出FetchRequest實例
  • 若是fetchRequest.requestInfo.isEmpty是空的,那麼等待fetchBackOffMs
  • 對於兩次拉取間隔是否過大作日誌輸出(DUMP_INTERVAL_MS = 5 * 60 * 1000)
  • processFetchRequest 處理拉的請求

processFetchRequest方法分析

當doWork方法準備好了FetchRequest實例就要靠processFetchRequest方法來拉數據給partitionInfoMap中的PartitionTopicInfo實例中的隊列了。簡單過程以下:ui

  • 迭代響應中的每條數據,按每一個分區維度處理
  • 拿到消息
  • 根據拿到的消息算出下一次的new offset,並更新到partitionMap中
  • 更新度量信息,計算堆積
  • 將取到的消息在PartitionTopicInfo實例中放入隊列。 PartitionTopicInfo實例的隊列來自Connect中的構造KafkaStream實例時傳遞的同一個隊列。 這樣能打通鏈接器和stream
相關文章
相關標籤/搜索