Azkaban的線程系列 37:QueueProcessor線程的任務處理&executor存活監控

有個線程,AzkabanWebServer-QueueProcessor-Thread.web

下面分析下這個線程到底幹嗎的!!!app

====================================================================================webapp

stop in azkaban.executor.ExecutorManager$QueueProcessorThread.runide

====================================================================================函數

public void run() {oop

// Loops till QueueProcessorThread is shutdownfetch

while (!shutdown) {// 一直循環this

synchronized (this) {// spa

try {線程

// start processing queue if active, other wait for

// sometime

if (isActive) {

// 開始處理任務

processQueuedFlows(activeExecutorRefreshWindowInMilisec,

activeExecutorRefreshWindowInFlows);

}

wait(QUEUE_PROCESSOR_WAIT_IN_MS);// 等待1秒

} catch (Exception e) {

logger.error("QueueProcessorThread Interrupted. Probably to shut down.", e);

}

}

}

}

因此接下來要看這個processQueuedFlows幹嗎的?

====================================================================================

// process flow with current snapshot of activeExecutors

selectExecutorAndDispatchFlow(reference, exflow, new HashSet<Executor>(activeExecutors));

從這裏來看,就是把任務分發到具體的executor上,

不過這個函數其實還作了其它的事情!那就是executor的存活監控

// if we have dispatched more than maxContinuousFlowProcessed or

// It has been more then activeExecutorsRefreshWindow millisec

// since we

// refreshed

// 知足上面的條件就探測executor的存活性

if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow

|| currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {

// Refresh executorInfo for all activeExecutors

refreshExecutors();

lastExecutorRefreshTime = currentTime;

currentContinuousFlowProcessed = 0;

}


因此接下來,咱們來看看refreshExecutors的代碼實現!

===================================================================================

jdb azkaban.webapp.AzkabanWebServer -conf  /root/azkb/azkaban_3.0.0_debug/conf

stop in  azkaban.executor.ExecutorManager.refreshExecutors

==============================================================================

其中刷新週期

azkProps.getLong(AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS, 50000),

private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS = "azkaban.activeexecutor.refresh.milisecinterval";

默認爲50秒刷1次更新請求


實際上是經過一個線程池來跑的,

Future<String> fetchExecutionInfo = executorInforRefresherService.submit(new Callable<String>() {

@Override

public String call() throws Exception {

return callExecutorForJsonString(executor.getHost(), executor.getPort(), "/serverStatistics",

null);

}

});

那麼,這個線程池的個數呢?


executorInforRefresherService = Executors

.newFixedThreadPool(azkProps.getInt(AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS, 5));

private static final String AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS = "azkaban.executorinfo.refresh.maxThreads";

默認就是5個了,好,回來看怎麼執行!

==============================================================================

發出HTTP請求 /serverStatistics

而後結果更新到executor,因此重點就是 /serverStatistics在executor中的執行過程!

==============================================================================

root.addServlet(new ServletHolder(new ServerStatisticsServlet()), "/serverStatistics");



jdb azkaban.execapp.AzkabanExecutorServer  -conf  /root/azkb/azkaban_3.0.0_debug/conf

stop in azkaban.execapp.ServerStatisticsServlet.doGet

stop in azkaban.execapp.ServerStatisticsServlet.populateStatistics


run

其實就是生成本地的信息彙總

1)

fillRemainingMemoryPercent(stats);

azkaban.execapp.ServerStatisticsServlet.fillRemainingMemoryPercent

 

2)

fillRemainingFlowCapacityAndLastDispatchedTime(stats);

stop in azkaban.execapp.ServerStatisticsServlet.fillRemainingFlowCapacityAndLastDispatchedTime

3)

fillCpuUsage(stats);

==============================================================================

其實就是返回這3種信息

對於web server來講,5秒內拿到返回內容後,更新本地消息

  public void setExecutorInfo(ExecutorInfo info) {

    this.cachedExecutorStats = info;

    this.lastStatsUpdatedTime = new Date();

  }

以此來做爲存活監控!

相關文章
相關標籤/搜索