之因此想寫這個專題,是由於最近在作一個搶佔任務的實現。假設數據庫不少個任務,在搶佔發生以前任務的狀態都是FREE。如今假設同時有一堆搶佔線程開始工做,搶佔線程會查找數據庫中狀態爲FREE的任務,而且將其狀態置爲BUSY,而後開始執行對應任務。執行完成以後,再將任務狀態置爲FINISH。任何任務都是不能被重複執行的,即必須保證全部任務都只能被一個線程執行。php
筆者和人民羣衆同樣,第一個想到的就是利用數據庫的for update
實現悲觀鎖。這樣確定可以保證數據的強一致性,可是這樣會大大影響效率,加劇數據庫的負擔。想到以前看過的一篇文章http://www.javashuo.com/article/p-brkblvjr-co.html,文章裏面有提到數據庫引擎自己對更新的記錄會行級上鎖。這個行級鎖的粒度很是細,上鎖的時間窗口也最少,只有在更新記錄的那一刻,纔會對記錄上鎖。同時筆者也想到在前一家公司工做的時候,當時有幸進入到了核心支付組,負責過一段時間的帳務系統。當時使用的是mysql的InnoDB引擎。記得當時的代碼在往帳戶裏面加錢的時候是沒有加任何鎖的,只有在從帳戶扣錢的時候才用for update
。因此這個問題應該有更加完美的答案......html
for update
的實現這裏就再也不作過多嘗試了。這裏筆者直接探索在沒有for update
的時候高併發狀況下是否會有問題。具體嘗試的過程以下:java
造測試數據mysql
首先創建一個任務表,爲了簡單模擬,咱們這裏就只添加必要的字段。建表語句以下:sql
create table task( ID NUMBER(10) NOT NULL, TASK_RUN_STATUS NUMBER(4) NOT NULL ); comment on table task is '互斥任務表'; comment on column task.ID is '主鍵ID.'; comment on column task.TASK_RUN_STATUS is '任務運行狀態(1.初始待運行 2.運行中 3.運行完成).'; alter table task add constraint TASK_PK primary key (ID) using index;
爲了方便測試,這裏咱們加入三條任務記錄,插入任務記錄的語句以下:數據庫
insert into task(id, task_run_status) values(0, 1); insert into task(id, task_run_status) values(1, 1); insert into task(id, task_run_status) values(2, 1);
模擬併發搶佔多線程
public class MultiThreadUpdate { public static void main(String[] args) throws Exception { Class.forName("oracle.jdbc.OracleDriver"); ExecutorService executorService = Executors.newFixedThreadPool(30); List<Future<Void>> futures = new ArrayList<Future<Void>>(); // 每一個ID開20個線程去併發更新數據 for (int i=0; i<20; i++) { for (int j=0; j<3; j++) { final int id = j; futures.add(executorService.submit(new Callable<Void>() { public Void call() throws Exception { Connection con = DriverManager.getConnection("jdbc:oracle:thin:@localhost:1521:orcl", "czbank", "123456"); // con.setAutoCommit(false); // 不自動提交事務 PreparedStatement pstm = con.prepareStatement("update task set TASK_RUN_STATUS = ? where id = ? and TASK_RUN_STATUS = ?"); pstm.setInt(1, 2); pstm.setInt(2, id); pstm.setInt(3, 1); int upRec = pstm.executeUpdate(); // 打印更新的記錄條數 System.out.println("Thread:" + Thread.currentThread().getName() + " updated(id=" + id + "):" + upRec + " records..."); // Thread.sleep(1000); // 在事務提交以前,其線程都會阻塞直到對特定記錄的更新提交 // con.commit(); con.close(); pstm.close(); return null; } })); } } executorService.shutdown(); } }
最終程序的輸出結果以下:併發
Thread:pool-1-thread-9 updated(id=2):0 records... Thread:pool-1-thread-15 updated(id=2):0 records... Thread:pool-1-thread-22 updated(id=0):0 records... Thread:pool-1-thread-28 updated(id=0):0 records... Thread:pool-1-thread-14 updated(id=1):0 records... Thread:pool-1-thread-17 updated(id=1):0 records... Thread:pool-1-thread-26 updated(id=1):0 records... Thread:pool-1-thread-30 updated(id=2):0 records... Thread:pool-1-thread-29 updated(id=1):0 records... Thread:pool-1-thread-27 updated(id=2):0 records... Thread:pool-1-thread-5 updated(id=1):0 records... Thread:pool-1-thread-23 updated(id=1):0 records... Thread:pool-1-thread-21 updated(id=2):1 records... Thread:pool-1-thread-1 updated(id=0):1 records... Thread:pool-1-thread-6 updated(id=2):0 records... Thread:pool-1-thread-8 updated(id=1):1 records... Thread:pool-1-thread-10 updated(id=0):0 records... Thread:pool-1-thread-13 updated(id=0):0 records... Thread:pool-1-thread-4 updated(id=0):0 records... Thread:pool-1-thread-19 updated(id=0):0 records... Thread:pool-1-thread-16 updated(id=0):0 records... Thread:pool-1-thread-2 updated(id=1):0 records... Thread:pool-1-thread-11 updated(id=1):0 records... Thread:pool-1-thread-7 updated(id=0):0 records... Thread:pool-1-thread-25 updated(id=0):0 records... Thread:pool-1-thread-3 updated(id=2):0 records... Thread:pool-1-thread-18 updated(id=2):0 records... Thread:pool-1-thread-12 updated(id=2):0 records... Thread:pool-1-thread-20 updated(id=1):0 records... Thread:pool-1-thread-24 updated(id=2):0 records... Thread:pool-1-thread-15 updated(id=2):0 records... Thread:pool-1-thread-9 updated(id=0):0 records... Thread:pool-1-thread-22 updated(id=1):0 records... Thread:pool-1-thread-30 updated(id=0):0 records... Thread:pool-1-thread-5 updated(id=1):0 records... Thread:pool-1-thread-17 updated(id=2):0 records... Thread:pool-1-thread-26 updated(id=0):0 records... Thread:pool-1-thread-29 updated(id=1):0 records... Thread:pool-1-thread-27 updated(id=2):0 records... Thread:pool-1-thread-28 updated(id=0):0 records... Thread:pool-1-thread-21 updated(id=1):0 records... Thread:pool-1-thread-1 updated(id=2):0 records... Thread:pool-1-thread-14 updated(id=0):0 records... Thread:pool-1-thread-2 updated(id=1):0 records... Thread:pool-1-thread-16 updated(id=0):0 records... Thread:pool-1-thread-4 updated(id=2):0 records... Thread:pool-1-thread-13 updated(id=1):0 records... Thread:pool-1-thread-19 updated(id=2):0 records... Thread:pool-1-thread-6 updated(id=0):0 records... Thread:pool-1-thread-8 updated(id=1):0 records... Thread:pool-1-thread-10 updated(id=2):0 records... Thread:pool-1-thread-23 updated(id=0):0 records... Thread:pool-1-thread-11 updated(id=1):0 records... Thread:pool-1-thread-7 updated(id=2):0 records... Thread:pool-1-thread-25 updated(id=0):0 records... Thread:pool-1-thread-3 updated(id=1):0 records... Thread:pool-1-thread-18 updated(id=2):0 records... Thread:pool-1-thread-12 updated(id=0):0 records... Thread:pool-1-thread-20 updated(id=1):0 records... Thread:pool-1-thread-24 updated(id=2):0 records...
能夠看到,即便在沒有顯示使用事務的狀況下,多線程併發執行也可以保證某一條數據的更新只被執行一次。oracle
經過上面的測試例子,已經驗證了個人猜測。接下來就是如何設計搶佔任務的執行步驟了。廢話很少說,直接上基本代碼:高併發
public void runMutexTasks(MutexTaskDto runCond) throws Exception { // STEP1: 先去查找待執行的互斥任務 runCond.setTaskRunStatus(Enums.MutexTaskRunStatus.WAIT_RUN.getKey()); // 待運行 runCond.setPhysicsFlag(Enums.TaskStatus.NORMAL.getKey()); // 正常狀態(未廢棄) PageInfo<MutexTaskDto> runnableTasks = MutexTaskService.pagingQueryGroupByTaskId(0, 0, runCond); if (CollectionUtils.isEmpty(runnableTasks.getRows())) { LOGGER.debug("根據條件未找到待執行的互斥任務,跳過執行......"); return; } // STEP2: 分別嘗試執行 List<MutexTaskDto> runTasks = null; Collections.shuffle(runnableTasks.getRows()); // 打亂順序 for (MutexTaskDto oneTask : runnableTasks.getRows()) { runTasks = mutexTaskService.selectRunnableTaskByTaskId(oneTask.getTaskId()); if (CollectionUtils.isEmpty(runTasks)) { LOGGER.info("互斥任務ID【{}】已不是待運行狀態,跳過任務執行......", oneTask.getTaskId()); continue; } // STEP3: 運行任務 MutexTaskDto updateCond = new MutexTaskDto(); updateCond.setTaskRunStatus(Enums.MutexTaskRunStatus.RUN_SUCCESS.getKey()); updateCond.setTaskPreStatus(Enums.MutexTaskRunStatus.RUNNING.getKey()); updateCond.setTaskId(oneTask.getTaskId()); try { runTasks(runTasks); } catch(Exception e) { updateCond.setRunRemark(getErrorMsg(e)); updateCond.setTaskRunStatus(Enums.MutexTaskRunStatus.RUN_FAILED.getKey()); mutexTaskService.updateByTaskId(updateCond); // 這裏只打印失敗結果,具體失敗信息須要上層調用方法日誌打印出來 LOGGER.error("互斥任務ID【{}】執行失敗!", oneTask.getTaskId()); throw e; } mutexTaskService.updateByTaskId(updateCond); LOGGER.info("互斥任務ID【{}】執行成功......", oneTask.getTaskId()); Thread.sleep(1000); // 搶到了一個節點執行權限,此處暫停1s,給其餘機器機會 } } // 其中mutexTaskService的selectRunnableTaskByTaskId方法以下: // 不使用事務,利用數據庫引擎自身的行級鎖 public List<MutexTaskDto> selectRunnableTaskByTaskId(String taskId) { // STEP1: 先用查詢數據(一個taskID可能對應多條記錄,對應不一樣的參數) List<MutexTaskModle> mutexTaskModles = this.mutexTaskDao .selectByTaskId(taskId); if (CollectionUtils.isEmpty(mutexTaskModles)) { return Collections.emptyList(); } // STEP2: 更新數據(使用數據庫引擎自身所帶的行級鎖) MutexTaskModle updateInfo = new MutexTaskModle(); updateInfo.setTaskRunStatus(2); updateInfo.setTaskPreStatus(1); updateInfo.setTaskId(taskId); int updateCount = cleaningMutexTaskDao.updateByTaskId(updateInfo); if (updateCount <= 0) { LOGGER.info("找到待執行的互斥任務,可是更新任務爲執行中失敗......"); return Collections.emptyList(); } // STEP3: 前面兩項都校驗過,則確認當前任務列表是能夠執行的 List<MutexTaskDto> mutexTasks = BeanConvertUtils.convertList(mutexTaskModles, MutexTaskDto.class); return mutexTasks; }
關鍵點就在於第58
行的cleaningMutexTaskDao.updateByTaskId(updateInfo);
。該語句對應的SQL大體爲:
update TASK set task_status = ? where task_id = ? and task_tatus = ?
其中task_id爲表的主鍵,且啓用了惟一索引。
這個問題剛開始筆者想到的解決方案就是使用for update
。但心裏總以爲這不是最佳方案,想起之前作過的項目還有看過的文章,卻也老是不太肯定。最終仍是本身動手寫了個測試用例"釋懷"了心裏的疑惑。最終也順利地想出了這個"完美"的實現。不得不認可:實踐是檢驗真理的惟一標準!工做到如今,愈來愈以爲你們以爲最好的實現不必定就是最好的,你們認爲的最高效的方法不必定就是最高效的。不少事情沒有絕對,就像寫代碼同樣,沒有絕對的好代碼。
固然這不是鼓勵你們隨便寫代碼,筆者想說的是:作軟件就像作學問。不能純粹地拿別人的結論奉爲聖經。遇到問題要多思考,纔會有本身的沉澱。思考以後要多行動,纔不會僅僅停留在思想的巨人,行動的矮子。固然,行動以後也要多多整理出來,就像筆者這樣,奉獻社會,方便你我他......(一臉無語)😂
---