taobao-pamirs-schedule-2.0源碼分析——任務處理器源碼分析

TBScheduleProcessorSleep分析

基本介紹

sleep模式: java

當某一個線程任務處理完畢,從任務池中取不到任務的時候,檢查其它線程是否處於活動狀態。若是是,則本身休眠; 若是其它線程都已經由於沒有任務進入休眠,當前線程是最後一個活動線程的時候,就調用業務接口,獲取須要處理的任務,放入任務池中, 同時喚醒其它休眠線程開始工做。 數據庫


流程圖


源碼分析

調度管理的resume方法是程序入口,咱們來看看。 安全


/**
	 * 處在了可執行的時間區間,恢復運行
	 * @throws Exception 
	 */
	public void resume(String message) throws Exception{
		if (this.isPauseSchedule == true) {
			if(log.isDebugEnabled()){
				log.debug("恢復調度:" + this.currenScheduleServer.getUuid());
			}
			this.isPauseSchedule = false;
			this.pauseMessage = message;
			if (this.queueDealTask != null) {
				if (this.taskTypeInfo.getProcessorType() != null &&
					this.taskTypeInfo.getProcessorType().equalsIgnoreCase("NOTSLEEP")==true){
					this.taskTypeInfo.setProcessorType("NOTSLEEP");
					this.processor = new TBScheduleProcessorNotSleep(this,
							queueDealTask,this.statisticsInfo);
				}else{
					this.processor = new TBScheduleProcessorSleep(this,
							queueDealTask,this.statisticsInfo);
					this.taskTypeInfo.setProcessorType("SLEEP");
				}
			}
			rewriteScheduleInfo();
		}
	}



從代碼能夠看出當調度管理器恢復調度的時候,會根據任務類型中配置的處理器類型來構造對應的處理器對象,即睡眠和不睡眠兩種處理器對象實現類。當屬性this.processor以前是有值的,則以前的處理器對象會成爲無引用的對象,當該processor中的線程已經執行完畢後,則原來舊的processor就能夠被銷燬。若管理器頻繁的中止和恢復調度,則可能出現代碼過大的問題。


咱們進入類TBScheduleProcessorSleep的構造函數看看,它作了些什麼。 多線程


public TBScheduleProcessorSleep(TBScheduleManager aManager,
			IScheduleTaskDeal<T> aQueueTask,	StatisticsInfo aStatisticsInfo) throws Exception {
		this.scheduleManager = aManager;
		this.statisticsInfo = aStatisticsInfo;
		this.taskTypeInfo = this.scheduleManager.getTaskTypeInfo();
		this.taskDealProcessor = aQueueTask;
		if (this.taskDealProcessor instanceof IScheduleTaskDealSingle<?>) {
			if (taskTypeInfo.getExecuteNumber() > 1) {
				taskTypeInfo.setExecuteNumber(1);
			}
			isMutilTask = false;
		} else {
			isMutilTask = true;
		}
		if (taskTypeInfo.getFetchDataNumber() < taskTypeInfo.getThreadNumber() * 10) {
			logger.warn("參數設置不合理,系統性能不佳。【每次從數據庫獲取的數量fetchnum】 >= 【線程數量threadnum】 *【最少循環次數10】 ");
		}
		for (int i = 0; i < taskTypeInfo.getThreadNumber(); i++) {
			this.startThread(i);
		}
	}



構造函數主要是使用參數中的值對屬性中的一些值進行初始化。檢查屬性配置值中的不合理配置會報警。


最後根據任務類中的ThreadNumber值來啓動對應數目的線程數。 併發


private void startThread(int index) {
		Thread thread = new Thread(this);
		threadList.add(thread);
		String threadName = this.scheduleManager.getTaskTypeRunningInfo().getTaskType()+"-" 
				+ this.scheduleManager.getCurrentSerialNumber() + "-exe"
				+ index;
		thread.setName(threadName);
		thread.start();
	}

建立一個線程,而且啓動該線程,傳遞的Runnable對象就是本對象,則咱們看看本對象中的run()方法就是線程要運行的方法。 函數



@SuppressWarnings({ "rawtypes", "unchecked", "static-access" })
	public void run(){
	      try {
	        long startTime =0;
	        while(true){
	          this.m_lockObject.addThread();
	          Object executeTask;
	          while (true) {
	            if(this.isStopSchedule == true){//中止隊列調度
	              this.m_lockObject.realseThread();
	              this.m_lockObject.notifyOtherThread();//通知全部的休眠線程
				  this.threadList.remove(Thread.currentThread());
				  if(this.threadList.size()==0){
						this.scheduleManager.unRegisterScheduleServer();
				  }
				  return;
	            }
	            
	            //加載調度任務
	            if(this.isMutilTask == false){
	              executeTask = this.getScheduleTaskId();
	            }else{
	              executeTask = this.getScheduleTaskIdMulti();
	            }
	            
	            if(executeTask == null){
	              break;
	            }
	            
	            try {//運行相關的程序
	              startTime =ScheduleUtil.getCurrentTimeMillis();
	              if (this.isMutilTask == false) {
						if (((IScheduleTaskDealSingle) this.taskDealProcessor).execute(executeTask,scheduleManager.getTaskTypeRunningInfo().getOwnSign()) == true) {
							addSuccessNum(1, ScheduleUtil.getCurrentTimeMillis()
									- startTime,
									"com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run");
						} else {
							addFailNum(1, ScheduleUtil.getCurrentTimeMillis()
									- startTime,
									"com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run");
						}
					} else {
						if (((IScheduleTaskDealMulti) this.taskDealProcessor)
								.execute((Object[]) executeTask,scheduleManager.getTaskTypeRunningInfo().getOwnSign()) == true) {
							addSuccessNum(((Object[]) executeTask).length, ScheduleUtil
									.getCurrentTimeMillis()
									- startTime,
									"com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run");
						} else {
							addFailNum(((Object[]) executeTask).length, ScheduleUtil
									.getCurrentTimeMillis()
									- startTime,
									"com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run");
						}
					} 
	            }catch (Throwable ex) {
					if (this.isMutilTask == false) {
						addFailNum(1, ScheduleUtil.getCurrentTimeMillis() - startTime,
								"TBScheduleProcessor.run");
					} else {
						addFailNum(((Object[]) executeTask).length, ScheduleUtil
								.getCurrentTimeMillis()
								- startTime,
								"TBScheduleProcessor.run");
					}
					logger.warn("Task :" + executeTask + " 處理失敗", ex);				
	            }
	          }
	          //當前隊列中全部的任務都已經完成了。
	            if(logger.isTraceEnabled()){
				   logger.trace(Thread.currentThread().getName() +":當前運行線程數量:" +this.m_lockObject.count());
			    }
				if (this.m_lockObject.realseThreadButNotLast() == false) {
					int size = 0;
					Thread.currentThread().sleep(100);
					startTime = ScheduleUtil.getCurrentTimeMillis();
					// 裝載數據
					size = this.loadScheduleData();
					if (size > 0) {
						this.m_lockObject.notifyOtherThread();
					} else {
						//判斷當沒有數據的是否,是否須要退出調度
						if (this.isStopSchedule == false && this.scheduleManager.isContinueWhenData()== true ){						 
							if(logger.isTraceEnabled()){
								   logger.trace("沒有裝載到數據,start sleep");
							}
							this.isSleeping = true;
						    Thread.currentThread().sleep(this.scheduleManager.getTaskTypeInfo().getSleepTimeNoData());
						    this.isSleeping = false;
						    
						    if(logger.isTraceEnabled()){
								   logger.trace("Sleep end");
							}
						}else{
							//沒有數據,退出調度,喚醒全部沉睡線程
							this.m_lockObject.notifyOtherThread();
						}
					}
					this.m_lockObject.realseThread();
				} else {// 將當前線程放置到等待隊列中。直到有線程裝載到了新的任務數據
					if(logger.isTraceEnabled()){
						   logger.trace("不是最後一個線程,sleep");
					}
					this.m_lockObject.waitCurrentThread();
				}
	        }
	      }
	      catch (Throwable e) {
	    	  logger.error(e.getMessage(), e);
	      }
	    }



該方法詳細的流程描述參考流程圖,咱們分析代碼中比較關鍵的實現點。


該方法有兩重while(true)循環。 源碼分析

1.跳出循環的點一個是當接受到中止調度的指令的時候,會跳出整個run方法。 性能

2.若線程沒有加在到執行任務,則會中斷內層while(true)循環。若不是最後一個線程,則線程會處於等待狀態。使用的是Object.wait()方法。等待其它線程喚醒。 fetch

3.如果最後一個線程,則不會進入等待狀態,而是執行方法loadScheduleData加載新的任務。若是加載到任務則喚醒其它線程開始工做。調用Object.notifyAll()方法喚醒其它線程。
ui

若是沒有加載到數據,則本身也會睡眠一個週期,等待數據準備好。

加載任務的兩個方法的源碼分別以下。


public synchronized Object getScheduleTaskId() {
		     if (this.taskList.size() > 0)
		    	 return this.taskList.remove(0);  // 按正序處理
		     return null;
		   }

		   public synchronized Object[] getScheduleTaskIdMulti() {
		       if (this.taskList.size() == 0){
		         return null;
		       }
		       int size = taskList.size() > taskTypeInfo.getExecuteNumber() ? taskTypeInfo.getExecuteNumber()
						: taskList.size();
		       Object[] result = new Object[size];
		       for(int i=0;i<size;i++){
		      	 result[i] = this.taskList.remove(0);  // 按正序處理
		       }
		       return result;
		   }



能夠看到每一個方法都添加了關鍵字 synchronized,它的意義是控制多線程併發的不安全問題。



TBScheduleProcessorNotSleep分析

基本介紹

Not sleep模式:

當一個線程任務處理完畢,從任務池中取不到任務的時候,當即調用業務接口獲取須要處理的任務,放入任務池中。

流程圖

源碼分析

構造對象TBScheduleProcessorNotSleep的方法與睡眠處理器實現相似,再也不列出代碼。

初始化和建立線程、啓動線程的代碼也是相似的,咱們就再也不細看,咱們只看不同的地方是run()方法。


/**
	 * 運行函數
	 */
	@SuppressWarnings("unchecked")
	public void run() {
		long startTime = 0;
		long sequence = 0;
		Object executeTask = null;	
		while (true) {
			try {
				if (this.isStopSchedule == true) { // 中止隊列調度
					this.threadList.remove(Thread.currentThread());///threadList中的線程是在startThread中添加的,new完線程後啓動以前加入
					if(this.threadList.size()==0){
						this.scheduleManager.unRegisterScheduleServer();
					}
					return;
				}
				// 加載調度任務
				if (this.isMutilTask == false) {
					///從上次selectTask的結果中取數據,裝任務的taskList是線程安全的,且取的同時刪掉,因而同一個實例內部的線程不會取到相同數據.
					executeTask = this.getScheduleTaskId();
				} else {
					executeTask = this.getScheduleTaskIdMulti();
				}
				///若是已加載數據處理完,則再加載.NoSleep類型任務的特色
				if (executeTask == null ) {
					this.loadScheduleData();
					continue;
				}
				
				try { // 運行相關的程序
					this.runningTaskList.add(executeTask);
					startTime = ScheduleUtil.getCurrentTimeMillis();
					sequence = sequence + 1;///沒用上
					if (this.isMutilTask == false) {
						if (((IScheduleTaskDealSingle<Object>) this.taskDealProcessor).execute(executeTask,scheduleManager.getTaskTypeRunningInfo().getOwnSign()) == true) {
							addSuccessNum(1, ScheduleUtil.getCurrentTimeMillis()
									- startTime,
									"com.taobao.pamirs.schedule.TBScheduleProcessorNotSleep.run");
						} else {
							addFailNum(1, ScheduleUtil.getCurrentTimeMillis()
									- startTime,
									"com.taobao.pamirs.schedule.TBScheduleProcessorNotSleep.run");
						}
					} else {
						if (((IScheduleTaskDealMulti<Object>) this.taskDealProcessor)
								.execute((Object[]) executeTask,scheduleManager.getTaskTypeRunningInfo().getOwnSign()) == true) {
							addSuccessNum(((Object[]) executeTask).length, ScheduleUtil
									.getCurrentTimeMillis()
									- startTime,
									"com.taobao.pamirs.schedule.TBScheduleProcessorNotSleep.run");
						} else {
							addFailNum(((Object[]) executeTask).length, ScheduleUtil
									.getCurrentTimeMillis()
									- startTime,
									"com.taobao.pamirs.schedule.TBScheduleProcessorNotSleep.run");
						}
					}
				} catch (Throwable ex) {
					if (this.isMutilTask == false) {
						addFailNum(1, ScheduleUtil.getCurrentTimeMillis() - startTime,
								"TBScheduleProcessor.run");
					} else {
						addFailNum(((Object[]) executeTask).length, ScheduleUtil
								.getCurrentTimeMillis()
								- startTime,
								"TBScheduleProcessor.run");
					}
					logger.error("Task :" + executeTask + " 處理失敗", ex);
				} finally {
					this.runningTaskList.remove(executeTask);
				}
			} catch (Throwable e) {
				throw new RuntimeException(e);
				//log.error(e.getMessage(), e);
			}
		}
	}



該部分代碼的區別是當從內存中未加載到執行任務的時候,會直接調用 loadScheduleData()方法去獲取新的任務,這個時候就有可能新讀取到的任務與其它線程正在運行的任務存在重複,所以爲了解決該問題,則須要在內存中進行排重,避免執行相同任務。


/**
	 * 獲取單個任務,注意lock是必須,
	 * 不然在maybeRepeatTaskList的數據處理上會出現衝突
	 * @return
	 */
	public T getScheduleTaskId() {
		lockFetchID.lock();
		try {
			T result = null;
			while (true) {
				if (this.taskList.size() > 0) {
					result = this.taskList.remove(0); // 按正序處理
				} else {
					return null;
				}
				if (this.isDealing(result) == false) {///檢查是否是在maybeRepeatTaskList裏面的,這句話在sleep方式裏面沒有,所以任務的比較器只有在NotSleep模式下須要用到
					return result;
				}
			}
		} finally {
			lockFetchID.unlock();
		}
	}



能夠看到在返回任務的時候,會調用 this.isDealing(result)判斷該任務是否正在執行,若在執行,則會循環獲取下一個任務,直到獲取到一個不重複的任務爲止。
另外咱們看到爲了不線程訪問共享數據的不安全問題,使用了
lockFetchID .lock() lockFetchID .unlock()的加、解鎖來控制多線程同時運行。使用lockFetchID鎖定的是這個對象,在這裏定義多個對象就能下降鎖粒度,提高併發性能,而相對於Sleep模式中經過synchronized來控制併發(鎖定的是整個處理器對象,粒度更大)。


@SuppressWarnings("unchecked")
	protected boolean isDealing(T aTask) {
		if (this.maybeRepeatTaskList.size() == 0) {
			return false;
		}
		T[] tmpList = (T[]) this.maybeRepeatTaskList.toArray();
		for (int i = 0; i < tmpList.length; i++) {
			if(this.taskComparator.compare(aTask, tmpList[i]) == 0){///在本身定義的任務類中定義getComparator就是幹這個用的
				this.maybeRepeatTaskList.remove(tmpList[i]);
				return true;
			}
		}
		return false;
	}




判斷任務是否在處理中的方法,是直接與maybeRepeatTaskList集合中的對象進行對比,而且調用taskComparator這個實現的方法進行對比,所以該接口的正確性,直接影響着去重的效果。

兩種模式區別

  • Sleep模式在實現邏輯上相對簡單清晰,但存在一個大任務處理時間長,致使其它線程不工做的狀況。
  • 在NotSleep模式下,減小了線程休眠的時間,避免大任務阻塞的狀況,但爲了不數據被重複處理,增長了CPU在數據比較上的開銷。 同時要求業務接口實現對象的比較接口。
  • 若是對任務處理不容許停頓的狀況下建議用NotSleep模式,其它狀況建議用sleep模式。
相關文章
相關標籤/搜索