有個線程,AzkabanWebServer-QueueProcessor-Thread.web
下面分析下這個線程到底幹嗎的!!!app
====================================================================================webapp
stop in azkaban.executor.ExecutorManager$QueueProcessorThread.runide
====================================================================================函數
public void run() {oop
// Loops till QueueProcessorThread is shutdownfetch
while (!shutdown) {// 一直循環this
synchronized (this) {// spa
try {線程
// start processing queue if active, other wait for
// sometime
if (isActive) {
// 開始處理任務
processQueuedFlows(activeExecutorRefreshWindowInMilisec,
activeExecutorRefreshWindowInFlows);
}
wait(QUEUE_PROCESSOR_WAIT_IN_MS);// 等待1秒
} catch (Exception e) {
logger.error("QueueProcessorThread Interrupted. Probably to shut down.", e);
}
}
}
}
因此接下來要看這個processQueuedFlows幹嗎的?
====================================================================================
// process flow with current snapshot of activeExecutors
selectExecutorAndDispatchFlow(reference, exflow, new HashSet<Executor>(activeExecutors));
從這裏來看,就是把任務分發到具體的executor上,
不過這個函數其實還作了其它的事情!那就是executor的存活監控
// if we have dispatched more than maxContinuousFlowProcessed or
// It has been more then activeExecutorsRefreshWindow millisec
// since we
// refreshed
// 知足上面的條件就探測executor的存活性
if (currentTime - lastExecutorRefreshTime > activeExecutorsRefreshWindow
|| currentContinuousFlowProcessed >= maxContinuousFlowProcessed) {
// Refresh executorInfo for all activeExecutors
refreshExecutors();
lastExecutorRefreshTime = currentTime;
currentContinuousFlowProcessed = 0;
}
因此接下來,咱們來看看refreshExecutors的代碼實現!
===================================================================================
jdb azkaban.webapp.AzkabanWebServer -conf /root/azkb/azkaban_3.0.0_debug/conf
stop in azkaban.executor.ExecutorManager.refreshExecutors
==============================================================================
其中刷新週期
azkProps.getLong(AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS, 50000),
private static final String AZKABAN_ACTIVE_EXECUTOR_REFRESH_IN_MS = "azkaban.activeexecutor.refresh.milisecinterval";
默認爲50秒刷1次更新請求
實際上是經過一個線程池來跑的,
Future<String> fetchExecutionInfo = executorInforRefresherService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return callExecutorForJsonString(executor.getHost(), executor.getPort(), "/serverStatistics",
null);
}
});
那麼,這個線程池的個數呢?
executorInforRefresherService = Executors
.newFixedThreadPool(azkProps.getInt(AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS, 5));
private static final String AZKABAN_EXECUTORINFO_REFRESH_MAX_THREADS = "azkaban.executorinfo.refresh.maxThreads";
默認就是5個了,好,回來看怎麼執行!
==============================================================================
發出HTTP請求 /serverStatistics
而後結果更新到executor,因此重點就是 /serverStatistics在executor中的執行過程!
==============================================================================
root.addServlet(new ServletHolder(new ServerStatisticsServlet()), "/serverStatistics");
jdb azkaban.execapp.AzkabanExecutorServer -conf /root/azkb/azkaban_3.0.0_debug/conf
stop in azkaban.execapp.ServerStatisticsServlet.doGet
stop in azkaban.execapp.ServerStatisticsServlet.populateStatistics
run
其實就是生成本地的信息彙總
1)
fillRemainingMemoryPercent(stats);
azkaban.execapp.ServerStatisticsServlet.fillRemainingMemoryPercent
2)
fillRemainingFlowCapacityAndLastDispatchedTime(stats);
stop in azkaban.execapp.ServerStatisticsServlet.fillRemainingFlowCapacityAndLastDispatchedTime
3)
fillCpuUsage(stats);
==============================================================================
其實就是返回這3種信息
對於web server來講,5秒內拿到返回內容後,更新本地消息
public void setExecutorInfo(ExecutorInfo info) {
this.cachedExecutorStats = info;
this.lastStatsUpdatedTime = new Date();
}
以此來做爲存活監控!