源碼分析ElasticJob任務錯過機制(misfire)與冪等性

任務在調度執行中,因爲某種緣由未執行完畢,下一次調度任務觸發後,在同一個Job實例中,會出現兩個線程處理同一個分片上的數據,這樣就會形成兩個線程可能處理相同的數據,所以Elastic-Job引入冪等機制來解決上述問題。再重申一次ElastciJob的分佈式是數據的分佈式,一個任務在多個Job實例上運行,每一個Job實例處理該Job的部分數據(數據分片)。
本文重點分析ElasticJob是如何作到以下兩點的。java

  1. ElasticJob如何確保在同一個Job實例中多個線程不會處理相同的數據數據庫

  2. ElasticJob如何確保數據不會被多個Job實例處理微信

爲了解決上述這種狀況,ElasticJob引入任務錯過補償執行(misfire)與冪等機制。app

ElasticJob冪等原理

場景:例如任務調度週期爲每5s執行一次,正常每次調度任務處理須要耗時2s,若是在某一段時間因爲數據庫壓力變大,致使本來只須要2s就能處理完成的任務,如今須要16s才能運行,在一批數據處理未完成的狀況下,每5s又會觸發一次調度,若是不加以控制的話,在同一個實例上根據分片條件去查詢數據庫,查詢到的數據有可能相同(部分相同),這樣同一條任務數據將被屢次處理,若是業務方法未實現冪等,則會引起很是嚴重的問題,那ElasticJob是否能夠避免這個問題呢?分佈式

答案是確定。elasticJob提供了一個配置參數:monitorExecution=true,開啓冪等性。post

一個任務觸發後,將執行任務處理邏輯,其入口:ui

1AbstractElasticJobExecutor#misfireIfRunning
2if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {  // @1
3       if (shardingContexts.isAllowSendJobEvent()) {  // @2
4             jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
5                    "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName, 
6                    shardingContexts.getShardingItemParameters().keySet()));
7       }
8      return;
9}

代碼@1:在一個調度任務觸發後若是上一次任務還未執行,則須要設置該分片狀態爲mirefire,表示錯失了一次任務執行。
代碼@2:若是該分片被設置爲mirefire並開啓了事件跟蹤,將事件跟蹤保存在數據庫中。lua

接下來詳細分析JobFacade.misfireIfRu-nning的實現邏輯:spa

 1/**
2     * 若是當前分片項仍在運行則設置任務被錯過執行的標記.
3     * 
4     * @param items 須要設置錯過執行的任務分片項
5     * @return 是否錯過本次執行
6     */

7    public boolean misfireIfHasRunningItems(final Collection<Integer> items) {
8        if (!hasRunningItems(items)) {
9            return false;
10        }
11        setMisfire(items);
12        return true;
13    }

若是存在未完成的分片,則調用setMis-fire(items)方法,在開啓monitorExecut-ion(true)的狀況下,在分片任務開始時會建立{namespace}/jobname/sharding/{item}/running節點,在任務結束後會刪除該目錄,因此在判斷是否有分片正在運行時,只需判斷是否存在上述節點便可。若是存在,調用setMisfire方法。
.net

PS:ElasticJob只有在monitorExecuti-on=true的狀況下,纔會建立{namespa-ce}/jobname/sharding/{item}/running,m-isfire機制才能生效。

 1ExecutionService#setMisfire
2/**
3     * 設置任務被錯過執行的標記.
4     *
5     * @param items 須要設置錯過執行的任務分片項
6     */

7    public void setMisfire(final Collection<Integer> items{
8        for (int each : items) {
9            jobNodeStorage.createJobNodeIfNeeded(ShardingNode.getMisfireNode(each));
10        }
11    }

其實現方式爲分配給該實例下的全部分片建立持久節點{namespace}/jobname/shading/{item}/misfire節點,注意,只要分配給該實例的任何一分片未執行完畢,則在該實例下的全部分片都增長m-isfire節點,而後忽略本次任務觸發,等待任務結束後再執行。

1AbstractElasticJobExecutor#execute
2execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);
3     while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
4         jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
5        execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
6}

在任務執行完成後檢查是否存在{name-space}/jobname/sharding/{item}/misfire節點,若是存在,則首先清除misfie相關的文件,而後執行任務。

冪等實現方案總結:
在下一個調度週期到達以後,只要發現這個分片的任何一個分片正在執行,則爲該實例分片的全部分片都設置爲mis-fire,等任務執行完畢後,再統一執行下一次任務調度。

ElasticJob數據分片

ElasticJob基於數據分片,不一樣分片根據分片參數(人爲配置),從數據庫中查詢各自數據(任務數據分片),若是當節點宕機,數據會從新分片,若是任務未執行完成,而後執行分片動做,數據是否會被不一樣的任務同時處理呢?

答案是不會,由於當節點宕機後是否須要從新分片事件監聽器會監聽到Job實例表明的節點刪除,設置從新分片,在任務被調度執行具體處理邏輯以前,須要從新分片,從新分片的前提又是要全部的分片的任務所有執行完畢,這也依賴是否開啓冪等控制(monitorExecution)。

若是開啓,ElasticJob能感知正在執行處理的分片,從新分片須要等待當前全部任務所有運行完畢後纔會觸發,故不會存在不一樣節點處理相同數據的問題。

問答:

一、若是一個任務JOB的調度頻率爲每10s一次,在某個時間,該job執行耗時用了33s(平時只需執行5s),按照正常調度,應該後續會觸發3次調度,那該job後執行完,會連續執行3次調度嗎?

答案:在33s此次任務執行完成後,若是後面的任務執行在10s內執行完畢的話,只會觸發一次,不會補償3次,由於Ela-sticJob記錄任務錯失執行,只是建立了misfire節點,並不會記錄錯失的次數。


更多文章請關注微信公衆號:


本文分享自微信公衆號 - 中間件興趣圈(dingwpmz_zjj)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索