For update帶來的思考

For update or not

起源

​ 之因此想寫這個專題,是由於最近在作一個搶佔任務的實現。假設數據庫不少個任務,在搶佔發生以前任務的狀態都是FREE。如今假設同時有一堆搶佔線程開始工做,搶佔線程會查找數據庫中狀態爲FREE的任務,而且將其狀態置爲BUSY,而後開始執行對應任務。執行完成以後,再將任務狀態置爲FINISH。任何任務都是不能被重複執行的,即必須保證全部任務都只能被一個線程執行。php

​ 筆者和人民羣衆同樣,第一個想到的就是利用數據庫的for update實現悲觀鎖。這樣確定可以保證數據的強一致性,可是這樣會大大影響效率,加劇數據庫的負擔。想到以前看過的一篇文章http://www.javashuo.com/article/p-brkblvjr-co.html,文章裏面有提到數據庫引擎自己對更新的記錄會行級上鎖。這個行級鎖的粒度很是細,上鎖的時間窗口也最少,只有在更新記錄的那一刻,纔會對記錄上鎖。同時筆者也想到在前一家公司工做的時候,當時有幸進入到了核心支付組,負責過一段時間的帳務系統。當時使用的是mysql的InnoDB引擎。記得當時的代碼在往帳戶裏面加錢的時候是沒有加任何鎖的,只有在從帳戶扣錢的時候才用for update。因此這個問題應該有更加完美的答案......html


探索之路

for update的實現這裏就再也不作過多嘗試了。這裏筆者直接探索在沒有for update的時候高併發狀況下是否會有問題。具體嘗試的過程以下:java

造測試數據mysql

​ 首先創建一個任務表,爲了簡單模擬,咱們這裏就只添加必要的字段。建表語句以下:sql

create table task(
       ID NUMBER(10) NOT NULL, 
    TASK_RUN_STATUS NUMBER(4) NOT NULL
);
comment on table task is '互斥任務表';
comment on column task.ID is '主鍵ID.';
comment on column task.TASK_RUN_STATUS is '任務運行狀態(1.初始待運行 2.運行中 3.運行完成).';
alter table task add constraint TASK_PK primary key (ID) using index;

​ 爲了方便測試,這裏咱們加入三條任務記錄,插入任務記錄的語句以下:數據庫

insert into task(id, task_run_status) values(0, 1);
insert into task(id, task_run_status) values(1, 1);
insert into task(id, task_run_status) values(2, 1);

模擬併發搶佔多線程

public class MultiThreadUpdate {
    public static void main(String[] args) throws Exception {
        Class.forName("oracle.jdbc.OracleDriver");
        ExecutorService executorService = Executors.newFixedThreadPool(30);
        List<Future<Void>> futures = new ArrayList<Future<Void>>();
        
        // 每一個ID開20個線程去併發更新數據
        for (int i=0; i<20; i++) {
            for (int j=0; j<3; j++) {
                final int id = j;
                futures.add(executorService.submit(new Callable<Void>() {
                    public Void call() throws Exception {
                        Connection con = DriverManager.getConnection("jdbc:oracle:thin:@localhost:1521:orcl", "czbank", "123456");
                        // con.setAutoCommit(false);        // 不自動提交事務
                        PreparedStatement pstm = con.prepareStatement("update task set TASK_RUN_STATUS = ? where id = ? and TASK_RUN_STATUS = ?");
                        pstm.setInt(1, 2);
                        pstm.setInt(2, id);
                        pstm.setInt(3, 1);
                        int upRec = pstm.executeUpdate();
                        // 打印更新的記錄條數
                        System.out.println("Thread:" + Thread.currentThread().getName() + " updated(id=" + id + "):" + upRec + " records...");
                        // Thread.sleep(1000);      // 在事務提交以前,其線程都會阻塞直到對特定記錄的更新提交
                        // con.commit();
                        con.close();
                        pstm.close();
                        return null;
                    }
                }));
            }
        }
        executorService.shutdown();
    }
}

​ 最終程序的輸出結果以下:併發

Thread:pool-1-thread-9 updated(id=2):0 records...
Thread:pool-1-thread-15 updated(id=2):0 records...
Thread:pool-1-thread-22 updated(id=0):0 records...
Thread:pool-1-thread-28 updated(id=0):0 records...
Thread:pool-1-thread-14 updated(id=1):0 records...
Thread:pool-1-thread-17 updated(id=1):0 records...
Thread:pool-1-thread-26 updated(id=1):0 records...
Thread:pool-1-thread-30 updated(id=2):0 records...
Thread:pool-1-thread-29 updated(id=1):0 records...
Thread:pool-1-thread-27 updated(id=2):0 records...
Thread:pool-1-thread-5 updated(id=1):0 records...
Thread:pool-1-thread-23 updated(id=1):0 records...
Thread:pool-1-thread-21 updated(id=2):1 records...
Thread:pool-1-thread-1 updated(id=0):1 records...
Thread:pool-1-thread-6 updated(id=2):0 records...
Thread:pool-1-thread-8 updated(id=1):1 records...
Thread:pool-1-thread-10 updated(id=0):0 records...
Thread:pool-1-thread-13 updated(id=0):0 records...
Thread:pool-1-thread-4 updated(id=0):0 records...
Thread:pool-1-thread-19 updated(id=0):0 records...
Thread:pool-1-thread-16 updated(id=0):0 records...
Thread:pool-1-thread-2 updated(id=1):0 records...
Thread:pool-1-thread-11 updated(id=1):0 records...
Thread:pool-1-thread-7 updated(id=0):0 records...
Thread:pool-1-thread-25 updated(id=0):0 records...
Thread:pool-1-thread-3 updated(id=2):0 records...
Thread:pool-1-thread-18 updated(id=2):0 records...
Thread:pool-1-thread-12 updated(id=2):0 records...
Thread:pool-1-thread-20 updated(id=1):0 records...
Thread:pool-1-thread-24 updated(id=2):0 records...
Thread:pool-1-thread-15 updated(id=2):0 records...
Thread:pool-1-thread-9 updated(id=0):0 records...
Thread:pool-1-thread-22 updated(id=1):0 records...
Thread:pool-1-thread-30 updated(id=0):0 records...
Thread:pool-1-thread-5 updated(id=1):0 records...
Thread:pool-1-thread-17 updated(id=2):0 records...
Thread:pool-1-thread-26 updated(id=0):0 records...
Thread:pool-1-thread-29 updated(id=1):0 records...
Thread:pool-1-thread-27 updated(id=2):0 records...
Thread:pool-1-thread-28 updated(id=0):0 records...
Thread:pool-1-thread-21 updated(id=1):0 records...
Thread:pool-1-thread-1 updated(id=2):0 records...
Thread:pool-1-thread-14 updated(id=0):0 records...
Thread:pool-1-thread-2 updated(id=1):0 records...
Thread:pool-1-thread-16 updated(id=0):0 records...
Thread:pool-1-thread-4 updated(id=2):0 records...
Thread:pool-1-thread-13 updated(id=1):0 records...
Thread:pool-1-thread-19 updated(id=2):0 records...
Thread:pool-1-thread-6 updated(id=0):0 records...
Thread:pool-1-thread-8 updated(id=1):0 records...
Thread:pool-1-thread-10 updated(id=2):0 records...
Thread:pool-1-thread-23 updated(id=0):0 records...
Thread:pool-1-thread-11 updated(id=1):0 records...
Thread:pool-1-thread-7 updated(id=2):0 records...
Thread:pool-1-thread-25 updated(id=0):0 records...
Thread:pool-1-thread-3 updated(id=1):0 records...
Thread:pool-1-thread-18 updated(id=2):0 records...
Thread:pool-1-thread-12 updated(id=0):0 records...
Thread:pool-1-thread-20 updated(id=1):0 records...
Thread:pool-1-thread-24 updated(id=2):0 records...

​ 能夠看到,即便在沒有顯示使用事務的狀況下,多線程併發執行也可以保證某一條數據的更新只被執行一次。oracle


最終任務設計

​ 經過上面的測試例子,已經驗證了個人猜測。接下來就是如何設計搶佔任務的執行步驟了。廢話很少說,直接上基本代碼:高併發

public void runMutexTasks(MutexTaskDto runCond) throws Exception {
    // STEP1: 先去查找待執行的互斥任務
    runCond.setTaskRunStatus(Enums.MutexTaskRunStatus.WAIT_RUN.getKey());   // 待運行
    runCond.setPhysicsFlag(Enums.TaskStatus.NORMAL.getKey());               // 正常狀態(未廢棄)
    PageInfo<MutexTaskDto> runnableTasks = MutexTaskService.pagingQueryGroupByTaskId(0, 0, runCond);
    if (CollectionUtils.isEmpty(runnableTasks.getRows())) {
        LOGGER.debug("根據條件未找到待執行的互斥任務,跳過執行......");
        return;
    }
    
    // STEP2: 分別嘗試執行
    List<MutexTaskDto> runTasks = null;
    Collections.shuffle(runnableTasks.getRows());   // 打亂順序
    for (MutexTaskDto oneTask : runnableTasks.getRows()) {
        runTasks = mutexTaskService.selectRunnableTaskByTaskId(oneTask.getTaskId());
        if (CollectionUtils.isEmpty(runTasks)) {
            LOGGER.info("互斥任務ID【{}】已不是待運行狀態,跳過任務執行......", oneTask.getTaskId());
            continue;
        }
        
        // STEP3: 運行任務
        MutexTaskDto updateCond = new MutexTaskDto();
        updateCond.setTaskRunStatus(Enums.MutexTaskRunStatus.RUN_SUCCESS.getKey());
        updateCond.setTaskPreStatus(Enums.MutexTaskRunStatus.RUNNING.getKey());
        updateCond.setTaskId(oneTask.getTaskId());
        try {
            runTasks(runTasks);
        } catch(Exception  e) {
            updateCond.setRunRemark(getErrorMsg(e));
            updateCond.setTaskRunStatus(Enums.MutexTaskRunStatus.RUN_FAILED.getKey());
            mutexTaskService.updateByTaskId(updateCond);
            // 這裏只打印失敗結果,具體失敗信息須要上層調用方法日誌打印出來
            LOGGER.error("互斥任務ID【{}】執行失敗!", oneTask.getTaskId());
            throw e;
        }
        mutexTaskService.updateByTaskId(updateCond);
        LOGGER.info("互斥任務ID【{}】執行成功......", oneTask.getTaskId());
        Thread.sleep(1000); // 搶到了一個節點執行權限,此處暫停1s,給其餘機器機會
    }
}

// 其中mutexTaskService的selectRunnableTaskByTaskId方法以下:
// 不使用事務,利用數據庫引擎自身的行級鎖

public List<MutexTaskDto> selectRunnableTaskByTaskId(String taskId) {
    // STEP1: 先用查詢數據(一個taskID可能對應多條記錄,對應不一樣的參數)
    List<MutexTaskModle> mutexTaskModles = this.mutexTaskDao
            .selectByTaskId(taskId);
    if (CollectionUtils.isEmpty(mutexTaskModles)) {
        return Collections.emptyList();
    }
    
    // STEP2: 更新數據(使用數據庫引擎自身所帶的行級鎖)
    MutexTaskModle updateInfo = new MutexTaskModle();
    updateInfo.setTaskRunStatus(2);
    updateInfo.setTaskPreStatus(1);
    updateInfo.setTaskId(taskId);
    int updateCount = cleaningMutexTaskDao.updateByTaskId(updateInfo);
    if (updateCount <= 0) {
        LOGGER.info("找到待執行的互斥任務,可是更新任務爲執行中失敗......");
        return Collections.emptyList();
    }
    
    // STEP3: 前面兩項都校驗過,則確認當前任務列表是能夠執行的
    List<MutexTaskDto> mutexTasks = BeanConvertUtils.convertList(mutexTaskModles,
            MutexTaskDto.class);
    return mutexTasks;
}

​ 關鍵點就在於第58行的cleaningMutexTaskDao.updateByTaskId(updateInfo);。該語句對應的SQL大體爲:

update TASK set task_status = ? where task_id = ? and task_tatus = ?

​ 其中task_id爲表的主鍵,且啓用了惟一索引。


總結

​ 這個問題剛開始筆者想到的解決方案就是使用for update。但心裏總以爲這不是最佳方案,想起之前作過的項目還有看過的文章,卻也老是不太肯定。最終仍是本身動手寫了個測試用例"釋懷"了心裏的疑惑。最終也順利地想出了這個"完美"的實現。不得不認可:實踐是檢驗真理的惟一標準!工做到如今,愈來愈以爲你們以爲最好的實現不必定就是最好的,你們認爲的最高效的方法不必定就是最高效的。不少事情沒有絕對,就像寫代碼同樣,沒有絕對的好代碼。

​ 固然這不是鼓勵你們隨便寫代碼,筆者想說的是:作軟件就像作學問。不能純粹地拿別人的結論奉爲聖經。遇到問題要多思考,纔會有本身的沉澱。思考以後要多行動,纔不會僅僅停留在思想的巨人,行動的矮子。固然,行動以後也要多多整理出來,就像筆者這樣,奉獻社會,方便你我他......(一臉無語)😂

​---

參考連接

http://www.javashuo.com/article/p-brkblvjr-co.html

http://www.javashuo.com/article/p-csfyvmfa-bz.html

相關文章
相關標籤/搜索