是整個同步機制的主入口,主要組織的邏輯有:數組
實現HelixWorkerOnlineOfflineStateModelFactory和OnlineOfflineStateModel,OnlineOfflineStateModel能夠理解成是一個監聽器。實例在上下線切換時能夠監聽到。ide
helixZkManager = HelixManagerFactory.getZKHelixManager(helixClusterName, instanceId, InstanceType.PARTICIPANT, zkServer) val stateMachineEngine: StateMachineEngine = helixZkManager.getStateMachineEngine() // register the MirrorMaker worker val stateModelFactory = new HelixWorkerOnlineOfflineStateModelFactory(instanceId, fetchNum, connectorMap) stateMachineEngine.registerStateModelFactory("OnlineOffline", stateModelFactory) helixZkManager.connect() helixAdmin = helixZkManager.getClusterManagmentTool
class HelixWorkerOnlineOfflineStateModelFactory(final val instanceId: String, final val fetchNum: Int, final val connectorMap: ConcurrentHashMap[String, KafkaConnector]) extends StateModelFactory[StateModel] { override def createNewStateModel(partitionName: String) = new OnlineOfflineStateModel(instanceId, connectorMap) // register mm instance class OnlineOfflineStateModel(final val instanceId: String, final val connectors: ConcurrentHashMap[String, KafkaConnector]) extends StateModel { def onBecomeOnlineFromOffline(message: Message, context: NotificationContext) = { // add topic partition on the instance connectorMap.get(getFetcherId(message.getResourceName, message.getPartitionName.toInt)).addTopicPartition(message.getResourceName, message.getPartitionName.toInt) } def onBecomeOfflineFromOnline(message: Message, context: NotificationContext) = { // delete topic partition on the instance connectorMap.get(getFetcherId(message.getResourceName, message.getPartitionName.toInt)).deleteTopicPartition(message.getResourceName, message.getPartitionName.toInt) } def onBecomeDroppedFromOffline(message: Message, context: NotificationContext) = { // do nothing } private def getFetcherId(topic: String, partitionId: Int): String = { "" + Utils.abs(31 * topic.hashCode() + partitionId) % fetchNum } } }
CompactConsumerFetcherThread
是繼承自Kafka提供的ShutdownableThread
,ShutdownableThread
內部會在isRunning標誌位ok的狀況下以spin的形式一直調用doWork
方法。fetch
override def run(): Unit = { info("Starting ") try{ while(isRunning.get()){ doWork() } } catch{ case e: Throwable => if(isRunning.get()) error("Error due to ", e) } shutdownLatch.countDown() info("Stopped ") }
當doWork方法準備好了FetchRequest實例就要靠processFetchRequest方法來拉數據給partitionInfoMap中的PartitionTopicInfo實例中的隊列了。簡單過程以下:ui