算法原理是註冊中心有一個隊列表PAMIRS_S
,它包含以下關鍵信息:QUEUE_ID是隊列標識 CUR_SERVER是當前分配服務器標識,REQ_SERVER是申請分配服務器標識。 java
假若有1,2,3,4,5個隊列,有A,B,C三個服務器依次啓動。則算法的規則是這樣的: mysql
A啓動的時候: 算法
因爲沒有其它的主機,則將全部的隊列分配給A。 sql
QUEUE_ID |
CUR_SERVER |
REQ_SERVER |
1 | A | |
2 | A |
|
3 | A |
|
4 | A | |
5 | A | |
B啓動的時候: 數據庫
QUEUE_ID |
CUR_SERVER |
REQ_SERVER |
1 | A | |
2 | A |
B |
3 | A |
|
4 | A | B |
5 | A | |
C啓動的時候: 服務器
QUEUE_ID |
CUR_SERVER |
REQ_SERVER |
1 | A | |
2 | A |
B |
3 | A |
C |
4 | A | |
5 | A | B |
D啓動的時候: 性能
QUEUE_ID |
CUR_SERVER |
REQ_SERVER |
1 | A | |
2 | A |
B |
3 | A |
C |
4 | A | D |
5 | A | |
上述算法中實現了預分配,那何時實現正式分配呢?當在獲取任務隊列的時候(必須控制在當前服務器中的全部任務都執行完畢的狀況下,不然會重複執行任務的可能性)會先釋放本身已經持有,可是別人要申請的隊列,將這些隊列讓給申請人。 ui
好比當前隊列是A,在執行釋放隊列前的數據狀態是: this
QUEUE_ID |
CUR_SERVER |
REQ_SERVER |
1 | A | |
2 | A |
B |
3 | A |
C |
4 | A | D |
5 | A | |
QUEUE_ID |
CUR_SERVER |
REQ_SERVER |
1 | A | |
2 | B | |
3 | C |
|
4 | D | |
5 | A | |
最開始的代碼是在TBScheduleManager的方法assignScheduleTask方法。 spa
public void assignScheduleTask() throws Exception { int clearServerCount = scheduleCenter .clearExpireScheduleServer(this.taskTypeInfo,this.taskTypeRunningInfo); List<ScheduleServer> serverList = scheduleCenter .selectAllValidScheduleServer(this.getTaskTypeRunningInfo().getTaskType()); int clearTaskQueueInfoCount = scheduleCenter.clearTaskQueueInfo( this.getTaskTypeRunningInfo().getTaskType(), serverList); boolean isNeedReAssign = false; if (clearServerCount > 0 || clearTaskQueueInfoCount > 0) { isNeedReAssign = true; } else { for (ScheduleServer item : serverList) { //注意,比較時間必定要用數據庫時間 if (item.getCenterServerTime().getTime() - item.getRegisterTime().getTime() < taskTypeInfo.getJudgeDeadInterval() * 3 ) { isNeedReAssign = true; break; } } } if (isNeedReAssign == true) { scheduleCenter.assignQueue(this.getTaskTypeRunningInfo().getTaskType(), this.currenScheduleServer.getUuid(), serverList); } if (log.isDebugEnabled()) { //log.debug(message); } }
它會先查詢一下是否須要從新分配隊列,當已經清理過過時的服務器,或者已經清理過非法服務器持有的隊列,或者有新的服務器(註冊時間距離如今時間小於3個時間週期)註冊的時候,則須要從新預分配隊列。比較時間必定要以註冊中心的時間爲準。
須要從新預分配隊列則進入方法scheduleCenter.assignQueue。
private Connection getConnection() throws SQLException{ Connection result = this.dataSource.getConnection(); if(result.getAutoCommit() == true){ result.setAutoCommit(false); } return result; } public void assignQueue(String taskType, String currentUuid, List<ScheduleServer> serverList) throws Exception { Connection conn = null; try{ conn = this.getConnection(); clientInner.assignQueue(conn, taskType,currentUuid,serverList); conn.commit(); }catch(Throwable e){ if(conn != null){ conn.rollback(); } if(e instanceof Exception){ throw (Exception)e; }else{ throw new Exception(e); } }finally{ if(conn!= null){ conn.close(); } } }
這個方法說明鏈接關閉了自動提交,方法內的多個SQL執行是在一個事務裏的。這個很是關鍵。
/** * 從新分配任務處理隊列 * * @param taskType * @param serverList * @throws Exception */ public void assignQueue(Connection conn,String taskType, String currentUuid, List<ScheduleServer> serverList) throws Exception { this.lockTaskTypeRunningInfo(conn,taskType, currentUuid); String sqlQueue = " SELECT TASK_TYPE,QUEUE_ID,CUR_SERVER,REQ_SERVER FROM " + transferTableName(conn, "PAMIRS_SCHEDULE_QUEUE") + " WHERE TASK_TYPE = ? ORDER BY QUEUE_ID"; PreparedStatement stmtQueue = conn.prepareStatement(sqlQueue); stmtQueue.setString(1, taskType); ResultSet setQueue = stmtQueue.executeQuery(); int point = 0; int taskCount = 0; while (setQueue.next()) { PreparedStatement stmtUpdateQueue = null; String sqlModifyQueue = ""; if (setQueue.getString("CUR_SERVER") == null) { sqlModifyQueue = " UPDATE " + transferTableName(conn, "PAMIRS_SCHEDULE_QUEUE") + " SET CUR_SERVER = ?,REQ_SERVER = null,GMT_MODIFIED = " + getDataBaseSysdateString(conn) + " WHERE TASK_TYPE = ? and QUEUE_ID = ? "; stmtUpdateQueue = conn.prepareStatement(sqlModifyQueue); stmtUpdateQueue.setString(1, serverList.get(point) .getUuid()); stmtUpdateQueue.setString(2, taskType); stmtUpdateQueue .setString(3, setQueue.getString("QUEUE_ID")); stmtUpdateQueue.executeUpdate(); stmtUpdateQueue.close(); } else if (!(serverList.get(point).getUuid().equalsIgnoreCase( setQueue.getString("CUR_SERVER")) == true && setQueue .getString("REQ_SERVER") == null)) { sqlModifyQueue = " UPDATE " + transferTableName(conn, "PAMIRS_SCHEDULE_QUEUE") + " SET REQ_SERVER = ? ,GMT_MODIFIED = " + getDataBaseSysdateString(conn) + " WHERE TASK_TYPE = ? and QUEUE_ID = ? "; stmtUpdateQueue = conn.prepareStatement(sqlModifyQueue); stmtUpdateQueue.setString(1, serverList.get(point) .getUuid()); stmtUpdateQueue.setString(2, taskType); stmtUpdateQueue .setString(3, setQueue.getString("QUEUE_ID")); stmtUpdateQueue.executeUpdate(); stmtUpdateQueue.close(); } else { // 不須要修改當前記錄的信息 } taskCount = taskCount + 1; if (point >= serverList.size() - 1) { point = 0; } else { point = point + 1; } } setQueue.close(); stmtQueue.close(); if (taskCount == 0) { throw new Exception("沒有對任務類型配置數據處理隊列,TASK_TYPE = " + taskType); } }
public void lockTaskTypeRunningInfo(Connection conn,String taskType, String lockServerUuid) throws Exception { String sql = " UPDATE " + transferTableName(conn, "PAMIRS_SCHEDULE_TASKTRUN") + " set LAST_ASSIGN_TIME = " + getDataBaseSysdateString(conn) + ",LAST_ASSIGN_UUID = ? , GMT_MODIFIED = " + getDataBaseSysdateString(conn) + " where TASK_TYPE = ? "; PreparedStatement statement = conn.prepareStatement(sql); statement.setString(1, lockServerUuid); statement.setString(2, taskType); statement.executeUpdate(); statement.close(); }
分配隊列以前,會先調用方法lockTaskTypeRunningInfo對這個運行期類型進行加鎖,看它使用的SQL語句能夠看出來,它是使用了數據庫實現的行鎖(或者範圍鎖)來實現加鎖,避免多個進程同時分配隊列時的衝突,其它進程若要更新該行須要等待釋放鎖。這就要求咱們在建表的時候必定要對字段TASK_TYPE創建索引,而且若是是mysql的話,要選擇支持行鎖的表引擎,避免鎖粒度過大致使的系統性能問題。
分配隊列的實現是先查詢出該任務全部的隊列列表,而後循環這個列表,依次給這個隊列列表分配服務器,參數輸入的是有效服務器列表。
這個代碼就實現了上述算法。它依次對隊列列表進行循環,有下面這些狀況:
若是當前隊列未分配服務器(即 CUR_SERVER=null)則將當前服務器分配給該隊列(即賦值給CUR_SERVER字段);
若是當前隊列已經分配服務器(即 CUR_SERVER!=null),而且分配的服務器不是當前服務器,則將當前服務器設置爲待分配服務器(即賦值給REQ_SERVER字段);若是是當前服務器則表示應分配,就沒有必要再放入待分配服務器。
其中服務器的選擇是循環的,由於服務器的數量可能小於隊列數。選擇到最後一個服務器則下一個又回到第一個服務器。
這樣就實現了服務器能夠均勻的分配給多個隊列,當服務器數大於隊列數的時候就有可能會出現有的服務器沒法分配給對應的任務隊列的問題,會報警。
在調度管理器中有一個獲取當前服務器某個任務隊列列表的方法,查看該方法源碼能夠看到檢查處理器中的數據是否已經處理完,若沒有處理完則會循環等待阻塞程序直處處理完成才能繼續獲取任務隊列。它最終調用了私有方法getCurrentScheduleQueueNow。
/** * 從新加載當前服務器的任務隊列 * 一、釋放當前服務器持有,但有其它服務器進行申請的任務隊列 * 二、從新獲取當前服務器的處理隊列 * * 爲了不此操做的過分,阻塞真正的數據處理能力。系統設置一個從新裝載的頻率。例如1分鐘 * * 特別注意: * 此方法的調用必須是在當前全部任務都處理完畢後才能調用,不然是否任務隊列後可能數據被重複處理 */ @SuppressWarnings("static-access") public List<String> getCurrentScheduleQueue() { try{ if (this.isNeedReloadQueue == true) { //特別注意:須要判斷數據隊列是否已經空了,不然可能在隊列切換的時候致使數據重複處理 //主要是在線程不休眠就加載數據的時候必定須要這個判斷 if (this.processor != null) { while (this.processor.isDealFinishAllData() == false) { Thread.currentThread().sleep(50); } } //真正開始處理數據 this.getCurrentScheduleQueueNow(); } this.lastReloadTaskQueueTime = ScheduleUtil.getCurrentTimeMillis(); return this.currentTaskQueue; }catch(Exception e){ throw new RuntimeException(e); } }
getCurrentScheduleQueueNow方法才真正實現了獲取隊列的邏輯,咱們進去看一下。
private List<String> getCurrentScheduleQueueNow() throws Exception { //是否被人申請的隊列 this.scheduleCenter.releaseDealQueue(this.getTaskTypeRunningInfo().getTaskType(), this.currenScheduleServer.getUuid()); //從新查詢當前服務器可以處理的隊列 this.currentTaskQueue = this.scheduleCenter.reloadDealQueue( this.getTaskTypeRunningInfo().getTaskType(), this.currenScheduleServer.getUuid()); //若是超過10個心跳週期尚未獲取到調度隊列,則報警 if(this.currentTaskQueue.size() ==0 && ScheduleUtil.getCurrentTimeMillis() - this.lastReloadTaskQueueTime > this.taskTypeInfo.getHeartBeatRate() * 10){ String message ="調度服務器" + this.currenScheduleServer.getUuid() +"[TASK_TYPE=" + this.getTaskTypeRunningInfo().getTaskType() + "]自啓動以來,超過10個心跳週期,還 沒有獲取到分配的任務隊列"; log.warn(message); if(this.scheduleAlert != null){ this.scheduleAlert.noTaskQueue(this.getTaskTypeRunningInfo().getTaskType(), this.currenScheduleServer.getUuid(),message); } } if(this.currentTaskQueue.size() >0){ //更新時間戳 this.lastReloadTaskQueueTime = ScheduleUtil.getCurrentTimeMillis(); } return this.currentTaskQueue; }
它先調用了scheduleCenter.releaseDealQueue方法釋放本身的隊列,即下列代碼。而後從新加載本身的隊列,當10個週期獲取到的隊列數爲0則會報警。
/** * 釋放本身把持,別人申請的隊列 * * @param taskType * @param uuid * @return * @throws Exception */ public void releaseDealQueue(Connection conn,String taskType, String uuid) throws Exception { String querySql = "select QUEUE_ID from " + transferTableName(conn, "PAMIRS_SCHEDULE_QUEUE") + " WHERE TASK_TYPE = ? and CUR_SERVER = ? AND REQ_SERVER IS NOT NULL "; PreparedStatement stmtQueue = conn.prepareStatement(querySql); stmtQueue.setString(1, taskType); stmtQueue.setString(2, uuid); ResultSet set = stmtQueue.executeQuery(); List<String> queueIds = new ArrayList<String>(); while(set.next()){ queueIds.add(set.getString("QUEUE_ID")); } set.close(); stmtQueue.close(); String sqlQueue = " update " + transferTableName(conn, "PAMIRS_SCHEDULE_QUEUE") + " set CUR_SERVER = REQ_SERVER,REQ_SERVER = NULL, GMT_MODIFIED = " + getDataBaseSysdateString(conn) + " WHERE TASK_TYPE = ? and CUR_SERVER = ? AND QUEUE_ID = ? AND REQ_SERVER IS NOT NULL "; for(String queueId:queueIds){ stmtQueue = conn.prepareStatement(sqlQueue); stmtQueue.setString(1, taskType); stmtQueue.setString(2, uuid); stmtQueue.setString(3, queueId); stmtQueue.executeUpdate(); stmtQueue.close(); conn.commit(); } }該方法的實現是查詢當前任務分給當前服務器的全部隊列列表,而後會依次循環將字段REQ_SERVER的值賦給字段CUR_SERVER,也就是表示將待分配服務器正式設置爲已分配服務器,而且將REQ_SERVER設置爲空,這也就實現了服務器釋放算法。