sleep模式: java
當某一個線程任務處理完畢,從任務池中取不到任務的時候,檢查其它線程是否處於活動狀態。若是是,則本身休眠; 若是其它線程都已經由於沒有任務進入休眠,當前線程是最後一個活動線程的時候,就調用業務接口,獲取須要處理的任務,放入任務池中, 同時喚醒其它休眠線程開始工做。 數據庫
調度管理的resume方法是程序入口,咱們來看看。 安全
/** * 處在了可執行的時間區間,恢復運行 * @throws Exception */ public void resume(String message) throws Exception{ if (this.isPauseSchedule == true) { if(log.isDebugEnabled()){ log.debug("恢復調度:" + this.currenScheduleServer.getUuid()); } this.isPauseSchedule = false; this.pauseMessage = message; if (this.queueDealTask != null) { if (this.taskTypeInfo.getProcessorType() != null && this.taskTypeInfo.getProcessorType().equalsIgnoreCase("NOTSLEEP")==true){ this.taskTypeInfo.setProcessorType("NOTSLEEP"); this.processor = new TBScheduleProcessorNotSleep(this, queueDealTask,this.statisticsInfo); }else{ this.processor = new TBScheduleProcessorSleep(this, queueDealTask,this.statisticsInfo); this.taskTypeInfo.setProcessorType("SLEEP"); } } rewriteScheduleInfo(); } }
咱們進入類TBScheduleProcessorSleep的構造函數看看,它作了些什麼。 多線程
public TBScheduleProcessorSleep(TBScheduleManager aManager, IScheduleTaskDeal<T> aQueueTask, StatisticsInfo aStatisticsInfo) throws Exception { this.scheduleManager = aManager; this.statisticsInfo = aStatisticsInfo; this.taskTypeInfo = this.scheduleManager.getTaskTypeInfo(); this.taskDealProcessor = aQueueTask; if (this.taskDealProcessor instanceof IScheduleTaskDealSingle<?>) { if (taskTypeInfo.getExecuteNumber() > 1) { taskTypeInfo.setExecuteNumber(1); } isMutilTask = false; } else { isMutilTask = true; } if (taskTypeInfo.getFetchDataNumber() < taskTypeInfo.getThreadNumber() * 10) { logger.warn("參數設置不合理,系統性能不佳。【每次從數據庫獲取的數量fetchnum】 >= 【線程數量threadnum】 *【最少循環次數10】 "); } for (int i = 0; i < taskTypeInfo.getThreadNumber(); i++) { this.startThread(i); } }
最後根據任務類中的ThreadNumber值來啓動對應數目的線程數。 併發
private void startThread(int index) { Thread thread = new Thread(this); threadList.add(thread); String threadName = this.scheduleManager.getTaskTypeRunningInfo().getTaskType()+"-" + this.scheduleManager.getCurrentSerialNumber() + "-exe" + index; thread.setName(threadName); thread.start(); }
建立一個線程,而且啓動該線程,傳遞的Runnable對象就是本對象,則咱們看看本對象中的run()方法就是線程要運行的方法。 函數
@SuppressWarnings({ "rawtypes", "unchecked", "static-access" }) public void run(){ try { long startTime =0; while(true){ this.m_lockObject.addThread(); Object executeTask; while (true) { if(this.isStopSchedule == true){//中止隊列調度 this.m_lockObject.realseThread(); this.m_lockObject.notifyOtherThread();//通知全部的休眠線程 this.threadList.remove(Thread.currentThread()); if(this.threadList.size()==0){ this.scheduleManager.unRegisterScheduleServer(); } return; } //加載調度任務 if(this.isMutilTask == false){ executeTask = this.getScheduleTaskId(); }else{ executeTask = this.getScheduleTaskIdMulti(); } if(executeTask == null){ break; } try {//運行相關的程序 startTime =ScheduleUtil.getCurrentTimeMillis(); if (this.isMutilTask == false) { if (((IScheduleTaskDealSingle) this.taskDealProcessor).execute(executeTask,scheduleManager.getTaskTypeRunningInfo().getOwnSign()) == true) { addSuccessNum(1, ScheduleUtil.getCurrentTimeMillis() - startTime, "com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run"); } else { addFailNum(1, ScheduleUtil.getCurrentTimeMillis() - startTime, "com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run"); } } else { if (((IScheduleTaskDealMulti) this.taskDealProcessor) .execute((Object[]) executeTask,scheduleManager.getTaskTypeRunningInfo().getOwnSign()) == true) { addSuccessNum(((Object[]) executeTask).length, ScheduleUtil .getCurrentTimeMillis() - startTime, "com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run"); } else { addFailNum(((Object[]) executeTask).length, ScheduleUtil .getCurrentTimeMillis() - startTime, "com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run"); } } }catch (Throwable ex) { if (this.isMutilTask == false) { addFailNum(1, ScheduleUtil.getCurrentTimeMillis() - startTime, "TBScheduleProcessor.run"); } else { addFailNum(((Object[]) executeTask).length, ScheduleUtil .getCurrentTimeMillis() - startTime, "TBScheduleProcessor.run"); } logger.warn("Task :" + executeTask + " 處理失敗", ex); } } //當前隊列中全部的任務都已經完成了。 if(logger.isTraceEnabled()){ logger.trace(Thread.currentThread().getName() +":當前運行線程數量:" +this.m_lockObject.count()); } if (this.m_lockObject.realseThreadButNotLast() == false) { int size = 0; Thread.currentThread().sleep(100); startTime = ScheduleUtil.getCurrentTimeMillis(); // 裝載數據 size = this.loadScheduleData(); if (size > 0) { this.m_lockObject.notifyOtherThread(); } else { //判斷當沒有數據的是否,是否須要退出調度 if (this.isStopSchedule == false && this.scheduleManager.isContinueWhenData()== true ){ if(logger.isTraceEnabled()){ logger.trace("沒有裝載到數據,start sleep"); } this.isSleeping = true; Thread.currentThread().sleep(this.scheduleManager.getTaskTypeInfo().getSleepTimeNoData()); this.isSleeping = false; if(logger.isTraceEnabled()){ logger.trace("Sleep end"); } }else{ //沒有數據,退出調度,喚醒全部沉睡線程 this.m_lockObject.notifyOtherThread(); } } this.m_lockObject.realseThread(); } else {// 將當前線程放置到等待隊列中。直到有線程裝載到了新的任務數據 if(logger.isTraceEnabled()){ logger.trace("不是最後一個線程,sleep"); } this.m_lockObject.waitCurrentThread(); } } } catch (Throwable e) { logger.error(e.getMessage(), e); } }
該方法有兩重while(true)循環。 源碼分析
1.跳出循環的點一個是當接受到中止調度的指令的時候,會跳出整個run方法。 性能
2.若線程沒有加在到執行任務,則會中斷內層while(true)循環。若不是最後一個線程,則線程會處於等待狀態。使用的是Object.wait()方法。等待其它線程喚醒。 fetch
3.如果最後一個線程,則不會進入等待狀態,而是執行方法loadScheduleData加載新的任務。若是加載到任務則喚醒其它線程開始工做。調用Object.notifyAll()方法喚醒其它線程。
ui
若是沒有加載到數據,則本身也會睡眠一個週期,等待數據準備好。
加載任務的兩個方法的源碼分別以下。
public synchronized Object getScheduleTaskId() { if (this.taskList.size() > 0) return this.taskList.remove(0); // 按正序處理 return null; } public synchronized Object[] getScheduleTaskIdMulti() { if (this.taskList.size() == 0){ return null; } int size = taskList.size() > taskTypeInfo.getExecuteNumber() ? taskTypeInfo.getExecuteNumber() : taskList.size(); Object[] result = new Object[size]; for(int i=0;i<size;i++){ result[i] = this.taskList.remove(0); // 按正序處理 } return result; }
Not sleep模式:
當一個線程任務處理完畢,從任務池中取不到任務的時候,當即調用業務接口獲取須要處理的任務,放入任務池中。
構造對象TBScheduleProcessorNotSleep的方法與睡眠處理器實現相似,再也不列出代碼。
初始化和建立線程、啓動線程的代碼也是相似的,咱們就再也不細看,咱們只看不同的地方是run()方法。
/** * 運行函數 */ @SuppressWarnings("unchecked") public void run() { long startTime = 0; long sequence = 0; Object executeTask = null; while (true) { try { if (this.isStopSchedule == true) { // 中止隊列調度 this.threadList.remove(Thread.currentThread());///threadList中的線程是在startThread中添加的,new完線程後啓動以前加入 if(this.threadList.size()==0){ this.scheduleManager.unRegisterScheduleServer(); } return; } // 加載調度任務 if (this.isMutilTask == false) { ///從上次selectTask的結果中取數據,裝任務的taskList是線程安全的,且取的同時刪掉,因而同一個實例內部的線程不會取到相同數據. executeTask = this.getScheduleTaskId(); } else { executeTask = this.getScheduleTaskIdMulti(); } ///若是已加載數據處理完,則再加載.NoSleep類型任務的特色 if (executeTask == null ) { this.loadScheduleData(); continue; } try { // 運行相關的程序 this.runningTaskList.add(executeTask); startTime = ScheduleUtil.getCurrentTimeMillis(); sequence = sequence + 1;///沒用上 if (this.isMutilTask == false) { if (((IScheduleTaskDealSingle<Object>) this.taskDealProcessor).execute(executeTask,scheduleManager.getTaskTypeRunningInfo().getOwnSign()) == true) { addSuccessNum(1, ScheduleUtil.getCurrentTimeMillis() - startTime, "com.taobao.pamirs.schedule.TBScheduleProcessorNotSleep.run"); } else { addFailNum(1, ScheduleUtil.getCurrentTimeMillis() - startTime, "com.taobao.pamirs.schedule.TBScheduleProcessorNotSleep.run"); } } else { if (((IScheduleTaskDealMulti<Object>) this.taskDealProcessor) .execute((Object[]) executeTask,scheduleManager.getTaskTypeRunningInfo().getOwnSign()) == true) { addSuccessNum(((Object[]) executeTask).length, ScheduleUtil .getCurrentTimeMillis() - startTime, "com.taobao.pamirs.schedule.TBScheduleProcessorNotSleep.run"); } else { addFailNum(((Object[]) executeTask).length, ScheduleUtil .getCurrentTimeMillis() - startTime, "com.taobao.pamirs.schedule.TBScheduleProcessorNotSleep.run"); } } } catch (Throwable ex) { if (this.isMutilTask == false) { addFailNum(1, ScheduleUtil.getCurrentTimeMillis() - startTime, "TBScheduleProcessor.run"); } else { addFailNum(((Object[]) executeTask).length, ScheduleUtil .getCurrentTimeMillis() - startTime, "TBScheduleProcessor.run"); } logger.error("Task :" + executeTask + " 處理失敗", ex); } finally { this.runningTaskList.remove(executeTask); } } catch (Throwable e) { throw new RuntimeException(e); //log.error(e.getMessage(), e); } } }
/** * 獲取單個任務,注意lock是必須, * 不然在maybeRepeatTaskList的數據處理上會出現衝突 * @return */ public T getScheduleTaskId() { lockFetchID.lock(); try { T result = null; while (true) { if (this.taskList.size() > 0) { result = this.taskList.remove(0); // 按正序處理 } else { return null; } if (this.isDealing(result) == false) {///檢查是否是在maybeRepeatTaskList裏面的,這句話在sleep方式裏面沒有,所以任務的比較器只有在NotSleep模式下須要用到 return result; } } } finally { lockFetchID.unlock(); } }
@SuppressWarnings("unchecked") protected boolean isDealing(T aTask) { if (this.maybeRepeatTaskList.size() == 0) { return false; } T[] tmpList = (T[]) this.maybeRepeatTaskList.toArray(); for (int i = 0; i < tmpList.length; i++) { if(this.taskComparator.compare(aTask, tmpList[i]) == 0){///在本身定義的任務類中定義getComparator就是幹這個用的 this.maybeRepeatTaskList.remove(tmpList[i]); return true; } } return false; }
判斷任務是否在處理中的方法,是直接與maybeRepeatTaskList集合中的對象進行對比,而且調用taskComparator這個實現的方法進行對比,所以該接口的正確性,直接影響着去重的效果。