前段時間,公司要改造現有的單節點調度爲分佈式任務調度,而後就研究了目前市面上主流的開源分佈式任務調度框架,用起來就一個感受:麻煩!特別是以前在一個類裏寫了好多個調度任務,改造起來更加麻煩。我這人又比較懶,總感受用了別人寫好的工具還要改一大堆,內心就有點不舒服。因而我就想本身寫一個框架,畢竟本身以爲分佈式任務調度在全部分佈式系統中是最簡單的,由於通常公司任務調度自己不可能同時調度海量的任務,很大的併發,改形成分佈式主要仍是爲了分散任務到多個節點,以便同一時間處理更多的任務。後面有一天,我在公司前臺取快遞,看到這樣一個現象:咱們好幾個同事(包括我)在前臺那從頭至尾看快遞是否是本身的,是本身的就取走,不是就忽略,而後我就收到了啓發。這個場景類比到分佈式調度系統中,咱們能夠認爲是快遞公司或者快遞員已經把每一個快遞按照咱們名字電話分好了快遞,咱們只須要取走本身的就好了。可是從另一個角度看,也能夠理解成咱們每一個人都是從頭至尾看了全部快遞,而後按照某種約定的規則,若是是本身的快遞就拿走,不是本身的就忽略繼續看下一個。若是把快遞想象成任務,一堆人去拿一堆快遞也能夠很順利的拿到各自的快遞,那麼一堆節點本身去取任務是否是也能夠很好的處理各自的任務呢?java
傳統的分佈式任務調度都有一個調度中心,這個調度中心也都要部署稱多節點的集羣,避免單點故障,而後還有一堆執行器,執行器負責執行調度中心分發的任務。按照上面的啓發,個人思路是放棄中心式的調度中心直接由各個執行器節點去公共的地方按照約定的規則去取任務,而後執行。設計示意圖以下node
有人可能懷疑那任務db庫不是有單點問題嗎,我想反問下,難道其餘的分佈式任務調度框架沒有這個問題嗎?針對數據庫單點咱們能夠單獨相似業務庫那樣考慮高可用方案,這裏不是這篇文章的討論重點。很明顯咱們重點放在執行節點那裏到底怎麼保證高可用,單個任務不會被多個節點同時執行,單個節點執行到一半忽然失聯了,這個任務怎麼辦等複雜的問題。後續咱們使用未經修飾的代碼的方式一一解決這個問題(未經修飾主要是沒有優化結構流水帳式的代碼風格,主要是不少人包括我本身看別人源碼時老是感受暈頭轉向的,彷彿置身迷宮般,看起來特別費勁,多是我本身境界未到吧)git
既然省略了集中式的調度,那麼既然叫任務調度很明顯必需要有調度的過程,否則多個節點去搶一個任務怎麼避免衝突呢?我這裏解決方式是:首先先明確一個任務的幾種狀態:待執行,執行中,有異常,已完成。每一個節點起一個線程一直去查很快就要開始執行的待執行任務,而後遍歷這些任務,使用樂觀鎖的方式先更新這個任務的版本號(版本號+1)和狀態(變成執行中),若是更新成功就放入節點本身的延時隊列中等待執行。因爲每一個節點的線程都是去數據庫查待執行的任務,很明顯變成執行中的任務下次就不會被其餘節點再查詢到了,至於對於那些在本節點更新狀態以前就查到的待執行任務也會通過樂觀鎖嘗試後更新失敗從而跳過這個任務,這樣就能夠避免一個任務同時被多個節點重複執行。關鍵代碼以下:程序員
package com.rdpaas.task.scheduler; import com.rdpaas.task.common.*; import com.rdpaas.task.config.EasyJobConfig; import com.rdpaas.task.repository.NodeRepository; import com.rdpaas.task.repository.TaskRepository; import com.rdpaas.task.strategy.Strategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.Date; import java.util.List; import java.util.concurrent.*; /** * 任務調度器 * @author rongdi * @date 2019-03-13 21:15 */ @Component public class TaskExecutor { private static final Logger logger = LoggerFactory.getLogger(TaskExecutor.class); @Autowired private TaskRepository taskRepository; @Autowired private NodeRepository nodeRepository; @Autowired private EasyJobConfig config;/** * 建立任務到期延時隊列 */ private DelayQueue<DelayItem<Task>> taskQueue = new DelayQueue<>(); /** * 能夠明確知道最多隻會運行2個線程,直接使用系統自帶工具就能夠了 */ private ExecutorService bossPool = Executors.newFixedThreadPool(2); /** * 聲明工做線程池 */ private ThreadPoolExecutor workerPool; @PostConstruct public void init() { /** * 自定義線程池,初始線程數量corePoolSize,線程池等待隊列大小queueSize,當初始線程都有任務,而且等待隊列滿後 * 線程數量會自動擴充最大線程數maxSize,當新擴充的線程空閒60s後自動回收.自定義線程池是由於Executors那幾個線程工具 * 各有各的弊端,不適合生產使用 */ workerPool = new ThreadPoolExecutor(config.getCorePoolSize(), config.getMaxPoolSize(), 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(config.getQueueSize())); /** * 執行待處理任務加載線程 */ bossPool.execute(new Loader()); /** * 執行任務調度線程 */ bossPool.execute(new Boss()); } class Loader implements Runnable { @Override public void run() { for(;;) { try { /** * 查找還有指定時間(單位秒)開始的主任務列表 */ List<Task> tasks = taskRepository.listPeddingTasks(config.getFetchDuration()); if(tasks == null || tasks.isEmpty()) { continue; } for(Task task:tasks) { task.setStatus(TaskStatus.DOING); task.setNodeId(config.getNodeId()); /** * 使用樂觀鎖嘗試更新狀態,若是更新成功,其餘節點就不會更新成功。若是在查詢待執行任務列表 * 和當前這段時間有節點已經更新了這個任務,version必然和查出來時候的version不同了,這裏更新 * 必然會返回0了 */ int n = taskRepository.updateWithVersion(task); Date nextStartTime = task.getNextStartTime(); if(n == 0 || nextStartTime == null) { continue; } /** * 封裝成延時對象放入延時隊列 */ task = taskRepository.get(task.getId()); DelayItem<Task> delayItem = new DelayItem<Task>(nextStartTime.getTime() - new Date().getTime(), task); taskQueue.offer(delayItem); } Thread.sleep(config.getFetchPeriod()); } catch(Exception e) { logger.error("fetch task list failed,cause by:{}", e); } } } } class Boss implements Runnable { @Override public void run() { for (;;) { try { /** * 時間到了就能夠從延時隊列拿出任務對象,而後交給worker線程池去執行 */ DelayItem<Task> item = taskQueue.take(); if(item != null && item.getItem() != null) { Task task = item.getItem(); workerPool.execute(new Worker(task)); } } catch (Exception e) { logger.error("fetch task failed,cause by:{}", e); } } } } class Worker implements Runnable { private Task task; public Worker(Task task) { this.task = task; } @Override public void run() { logger.info("Begin to execute task:{}",task.getId()); TaskDetail detail = null; try { //開始任務 detail = taskRepository.start(task); if(detail == null) return; //執行任務 task.getInvokor().invoke(); //完成任務 finish(task,detail); logger.info("finished execute task:{}",task.getId()); } catch (Exception e) { logger.error("execute task:{} error,cause by:{}",task.getId(), e); try { taskRepository.fail(task,detail,e.getCause().getMessage()); } catch(Exception e1) { logger.error("fail task:{} error,cause by:{}",task.getId(), e); } } } } /** * 完成子任務,若是父任務失敗了,子任務不會執行 * @param task * @param detail * @throws Exception */ private void finish(Task task,TaskDetail detail) throws Exception { //查看是否有子類任務 List<Task> childTasks = taskRepository.getChilds(task.getId()); if(childTasks == null || childTasks.isEmpty()) { //當沒有子任務時完成父任務 taskRepository.finish(task,detail); return; } else { for (Task childTask : childTasks) { //開始任務 TaskDetail childDetail = null; try { //將子任務狀態改爲執行中 childTask.setStatus(TaskStatus.DOING); childTask.setNodeId(config.getNodeId()); //開始子任務 childDetail = taskRepository.startChild(childTask,detail); //使用樂觀鎖更新下狀態,否則這裏可能和恢復線程產生併發問題 int n = taskRepository.updateWithVersion(childTask); if (n > 0) { //再從數據庫取一下,避免上面update修改後version不一樣步 childTask = taskRepository.get(childTask.getId()); //執行子任務 childTask.getInvokor().invoke(); //完成子任務 finish(childTask, childDetail); } } catch (Exception e) { logger.error("execute child task error,cause by:{}", e); try { taskRepository.fail(childTask, childDetail, e.getCause().getMessage()); } catch (Exception e1) { logger.error("fail child task error,cause by:{}", e); } } } /** * 當有子任務時完成子任務後再完成父任務 */ taskRepository.finish(task,detail); } } }
如上所述,能夠保證一個任務同一個時間只會被一個節點調度執行。這時候若是部署多個節點,正常應該能夠很順利的將任務庫中的任務都執行到,就像一堆人去前臺取快遞同樣,能夠很順利的拿走全部快遞。畢竟對於每一個快遞不是本身的就是其餘人的,本身的快遞也不會是其餘人的。可是這裏的調度和取快遞有一點不同,取快遞的每一個人都知道怎麼去區分到底哪一個快遞是本身的。這裏的調度徹底沒這個概念,徹底是哪一個節點運氣好使用樂觀鎖更新了這個任務狀態就是哪一個節點的。總的來講區別就是須要一個約定的規則,快遞是否是本身的,直接看快遞上的名字和手機號碼就知道了。任務到底該不應本身執行咱們也能夠出一個這種規則,明確哪些任務那些應該是哪些節點能夠執行,從而避免無謂的鎖競爭。這裏能夠借鑑負載均衡的那些策略,目前我想實現以下規則:github
1) id_hash : 按照任務自增id的對節點個數取餘,餘數值和當前節點的實時序號匹配,能夠匹配就能夠拿走執行,不然請自覺忽略掉這個任務算法
2) least_count:最少執行任務的節點優先去取任務spring
3) weight:按照節點權重去取任務數據庫
4) default: 默認先到先得,沒有其它規則springboot
根據上面規則也能夠說是任務的負載均衡策略能夠知道除了默認規則,其他規則都須要知道全局的節點信息,好比節點執行次數,節點序號,節點權重等,因此咱們須要給節點添加一個心跳,隔一個心跳週期上報一下本身的信息到數據庫,心跳核心代碼以下:併發
/** * 建立節點心跳延時隊列 */ private DelayQueue<DelayItem<Node>> heartBeatQueue = new DelayQueue<>(); /** * 能夠明確知道最多隻會運行2個線程,直接使用系統自帶工具 */ private ExecutorService bossPool = Executors.newFixedThreadPool(2);
@PostConstruct public void init() { /** * 若是恢復線程開關是開着,而且心跳開關也是開着 */ if(config.isRecoverEnable() && config.isHeartBeatEnable()) { /** * 初始化一個節點到心跳隊列,延時爲0,用來註冊節點 */ heartBeatQueue.offer(new DelayItem<>(0,new Node(config.getNodeId()))); /** * 執行心跳線程 */ bossPool.execute(new HeartBeat()); /** * 執行異常恢復線程 */ bossPool.execute(new Recover()); } }
class HeartBeat implements Runnable { @Override public void run() { for(;;) { try { /** * 時間到了就能夠從延時隊列拿出節點對象,而後更新時間和序號, * 最後再新建一個超時時間爲心跳時間的節點對象放入延時隊列,造成循環的心跳 */ DelayItem<Node> item = heartBeatQueue.take(); if(item != null && item.getItem() != null) { Node node = item.getItem(); handHeartBeat(node); } heartBeatQueue.offer(new DelayItem<>(config.getHeartBeatSeconds() * 1000,new Node(config.getNodeId()))); } catch (Exception e) { logger.error("task heart beat error,cause by:{} ",e); } } } } /** * 處理節點心跳 * @param node */ private void handHeartBeat(Node node) { if(node == null) { return; } /** * 先看看數據庫是否存在這個節點 * 若是不存在:先查找下一個序號,而後設置到node對象中,最後插入 * 若是存在:直接根據nodeId更新當前節點的序號和時間 */ Node currNode= nodeRepository.getByNodeId(node.getNodeId()); if(currNode == null) { node.setRownum(nodeRepository.getNextRownum()); nodeRepository.insert(node); } else { nodeRepository.updateHeartBeat(node.getNodeId()); } }
數據庫有了節點信息後,咱們就能夠實現各類花式的取任務的策略了,代碼以下:
/** * 抽象的策略接口 * @author rongdi * @date 2019-03-16 12:36 */ public interface Strategy { /** * 默認策略 */ String DEFAULT = "default"; /** * 按任務ID hash取餘再和本身節點序號匹配 */ String ID_HASH = "id_hash"; /** * 最少執行次數 */ String LEAST_COUNT = "least_count"; /** * 按節點權重 */ String WEIGHT = "weight"; public static Strategy choose(String key) { switch(key) { case ID_HASH: return new IdHashStrategy(); case LEAST_COUNT: return new LeastCountStrategy(); case WEIGHT: return new WeightStrategy(); default: return new DefaultStrategy(); } } public boolean accept(List<Node> nodes,Task task,Long myNodeId); }
/** * 按照任務ID hash方式針對有效節點個數取餘,而後餘數+1後和各個節點的順序號匹配, * 這種方式效果其實等同於輪詢,由於任務id是自增的 * @author rongdi * @date 2019-03-16 */ public class IdHashStrategy implements Strategy { /** * 這裏的nodes集合必然不會爲空,外面調度那判斷了,並且是按照nodeId的升序排列的 */ @Override public boolean accept(List<Node> nodes, Task task, Long myNodeId) { int size = nodes.size(); long taskId = task.getId(); /** * 找到本身的節點 */ Node myNode = nodes.stream().filter(node -> node.getNodeId() == myNodeId).findFirst().get(); return myNode == null ? false : (taskId % size) + 1 == myNode.getRownum(); } }
/** * 最少處理任務次數策略,也就是每次任務來了,看看本身是否是處理任務次數最少的,是就能夠消費這個任務 * @author rongdi * @date 2019-03-16 21:56 */ public class LeastCountStrategy implements Strategy { @Override public boolean accept(List<Node> nodes, Task task, Long myNodeId) { /** * 獲取次數最少的那個節點,這裏能夠類比成先按counts升序排列而後取第一個元素 * 而後是本身就返回true */ Optional<Node> min = nodes.stream().min((o1, o2) -> o1.getCounts().compareTo(o2.getCounts())); return min.isPresent()? min.get().getNodeId() == myNodeId : false; } }
/** * 按權重的分配策略,方案以下,假如 * 節點序號 1 ,2 ,3 ,4 * 節點權重 2 ,3 ,3 ,2 * 則取餘後 0,1 | 2,3,4 | 5,6,7 | 8,9 * 序號1能夠消費按照權重的和取餘後小於2的 * 序號2能夠消費按照權重的和取餘後大於等於2小於2+3的 * 序號3能夠消費按照權重的和取餘後大於等於2+3小於2+3+3的 * 序號3能夠消費按照權重的和取餘後大於等於2+3+3小於2+3+3+2的 * 總結:本節點能夠消費的按照權重的和取餘後大於等於前面節點的權重和小於包括本身的權重和的這個範圍 * 不知道有沒有大神有更好的算法思路 * @author rongdi * @date 2019-03-16 23:16 */ public class WeightStrategy implements Strategy { @Override public boolean accept(List<Node> nodes, Task task, Long myNodeId) { Node myNode = nodes.stream().filter(node -> node.getNodeId() == myNodeId).findFirst().get(); if(myNode == null) { return false; } /** * 計算本節點序號前面的節點的權重和 */ int preWeightSum = nodes.stream().filter(node -> node.getRownum() < myNode.getRownum()).collect(Collectors.summingInt(Node::getWeight)); /** * 計算所有權重的和 */ int weightSum = nodes.stream().collect(Collectors.summingInt(Node::getWeight)); /** * 計算對權重和取餘的餘數 */ int remainder = (int)(task.getId() % weightSum); return remainder >= preWeightSum && remainder < preWeightSum + myNode.getWeight(); } }
而後咱們再改造下調度類
/** * 獲取任務的策略 */ private Strategy strategy; @PostConstruct public void init() { /** * 根據配置選擇一個節點獲取任務的策略 */ strategy = Strategy.choose(config.getNodeStrategy()); /** * 自定義線程池,初始線程數量corePoolSize,線程池等待隊列大小queueSize,當初始線程都有任務,而且等待隊列滿後 * 線程數量會自動擴充最大線程數maxSize,當新擴充的線程空閒60s後自動回收.自定義線程池是由於Executors那幾個線程工具 * 各有各的弊端,不適合生產使用 */ workerPool = new ThreadPoolExecutor(config.getCorePoolSize(), config.getMaxPoolSize(), 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(config.getQueueSize())); /** * 執行待處理任務加載線程 */ bossPool.execute(new Loader()); /** * 執行任務調度線程 */ bossPool.execute(new Boss()); } class Loader implements Runnable { @Override public void run() { for(;;) { try { /** * 先獲取可用的節點列表 */ List<Node> nodes = nodeRepository.getEnableNodes(config.getHeartBeatSeconds() * 2); if(nodes == null || nodes.isEmpty()) { continue; } /** * 查找還有指定時間(單位秒)開始的主任務列表 */ List<Task> tasks = taskRepository.listPeddingTasks(config.getFetchDuration()); if(tasks == null || tasks.isEmpty()) { continue; } for(Task task:tasks) { boolean accept = strategy.accept(nodes, task, config.getNodeId()); /** * 不應本身拿就不要搶 */ if(!accept) { continue; } task.setStatus(TaskStatus.DOING); task.setNodeId(config.getNodeId()); /** * 使用樂觀鎖嘗試更新狀態,若是更新成功,其餘節點就不會更新成功。若是在查詢待執行任務列表 * 和當前這段時間有節點已經更新了這個任務,version必然和查出來時候的version不同了,這裏更新 * 必然會返回0了 */ int n = taskRepository.updateWithVersion(task); Date nextStartTime = task.getNextStartTime(); if(n == 0 || nextStartTime == null) { continue; } /** * 封裝成延時對象放入延時隊列 */ task = taskRepository.get(task.getId()); DelayItem<Task> delayItem = new DelayItem<Task>(nextStartTime.getTime() - new Date().getTime(), task); taskQueue.offer(delayItem); } Thread.sleep(config.getFetchPeriod()); } catch(Exception e) { logger.error("fetch task list failed,cause by:{}", e); } } } }
如上能夠經過各類花式的負載策略來平衡各個節點獲取的任務,同時也能夠顯著下降各個節點對同一個任務的競爭。可是還有個問題,假如某個節點拿到了任務更新成了執行中,執行到一半,沒執行完也沒發生異常,忽然這個節點因爲各類緣由掛了,那麼這時候這個任務永遠沒有機會再執行了。這就是傳說中的佔着茅坑不拉屎。解決這個問題能夠用最終一致系統常見的方法,異常恢復線程。在這種場景下只須要檢測一下指定心跳超時時間(好比默認3個心跳週期)下沒有更新心跳時間的節點所屬的未完成任務,將這些任務狀態從新恢復成待執行,而且下次執行時間改爲當前就能夠了。核心代碼以下:
class Recover implements Runnable { @Override public void run() { for (;;) { try { /** * 查找須要恢復的任務,這裏界定須要恢復的任務是任務還沒完成,而且所屬執行節點超過3個 * 心跳週期沒有更新心跳時間。因爲這些任務因爲當時執行節點沒有來得及執行完就掛了,因此 * 只須要把狀態再改回待執行,而且下次執行時間改爲當前時間,讓任務再次被調度一次 */ List<Task> tasks = taskRepository.listRecoverTasks(config.getHeartBeatSeconds() * 3); if(tasks == null || tasks.isEmpty()) { return; } /** * 先獲取可用的節點列表 */ List<Node> nodes = nodeRepository.getEnableNodes(config.getHeartBeatSeconds() * 2); if(nodes == null || nodes.isEmpty()) { return; } long maxNodeId = nodes.get(nodes.size() - 1).getNodeId(); for (Task task : tasks) { /** * 每一個節點有一個恢復線程,爲了不沒必要要的競爭,從可用節點找到一個最靠近任務所屬節點的節點 */ long currNodeId = chooseNodeId(nodes,maxNodeId,task.getNodeId()); long myNodeId = config.getNodeId(); /** * 若是不應當前節點處理直接跳過 */ if(currNodeId != myNodeId) { continue; } /** * 直接將任務狀態改爲待執行,而且節點改爲當前節點 */ task.setStatus(TaskStatus.PENDING); task.setNextStartTime(new Date()); task.setNodeId(config.getNodeId()); taskRepository.updateWithVersion(task); } Thread.sleep(config.getRecoverSeconds() * 1000); } catch (Exception e) { logger.error("Get next task failed,cause by:{}", e); } } } } /** * 選擇下一個節點 * @param nodes * @param maxNodeId * @param nodeId * @return */ private long chooseNodeId(List<Node> nodes,long maxNodeId,long nodeId) { if(nodes.size() == 0 || nodeId >= maxNodeId) { return nodes.get(0).getNodeId(); } return nodes.stream().filter(node -> node.getNodeId() > nodeId).findFirst().get().getNodeId(); }
如上爲了不每一個節點的異常恢復線程對同一個任務作無謂的競爭,每一個異常任務只能被任務所屬節點ID的下一個正常節點去恢復。這樣處理後就能確保就算出現了上面那種任務沒執行完節點掛了的狀況,一段時間後也能夠自動恢復。總的來講上面那些不考慮優化應該能夠作爲一個還不錯的任務調度框架了。若是大家覺得這樣就完了,我只能說抱歉了,還有,哈哈!前面提到我是嫌棄其它任務調度用起來麻煩,特別是習慣用spring的註解寫調度的,那些極可能一個類裏寫了n個帶有@Scheduled註解的調度方法,這樣改造起來更加麻煩,我是但願作到以下方式就能夠直接整合到分佈式任務調度裏:
/** * 測試調度功能 * @author rongdi * @date 2019-03-17 16:54 */ @Component public class SchedulerTest { @Scheduled(cron = "0/10 * * * * ?") public void test1() throws InterruptedException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Thread.sleep(2000); System.out.println("當前時間1:"+sdf.format(new Date())); } @Scheduled(cron = "0/20 * * * * ?",parent = "test1") public void test2() throws InterruptedException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Thread.sleep(2000); System.out.println("當前時間2:"+sdf.format(new Date())); } @Scheduled(cron = "0/10 * * * * ?",parent = "test2") public void test3() throws InterruptedException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Thread.sleep(2000); System.out.println("當前時間3:"+sdf.format(new Date())); } @Scheduled(cron = "0/10 * * * * ?",parent = "test3") public void test4() throws InterruptedException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Thread.sleep(2000); System.out.println("當前時間4:"+sdf.format(new Date())); } }
爲了達到上述目標,咱們還須要在spring啓動後加載自定義的註解(名稱和spring的同樣),代碼以下
/** * spring容器啓動完後,加載自定義註解 * @author rongdi * @date 2019-03-15 21:07 */ @Component public class ContextRefreshedListener implements ApplicationListener<ContextRefreshedEvent> { @Autowired private TaskExecutor taskExecutor; /** * 用來保存方法名/任務名和任務插入後數據庫的ID的映射,用來處理子任務新增用 */ private Map<String,Long> taskIdMap = new HashMap<>(); @Override public void onApplicationEvent(ContextRefreshedEvent event) { /** * 判斷根容器爲Spring容器,防止出現調用兩次的狀況(mvc加載也會觸發一次) */ if(event.getApplicationContext().getParent()==null){ /** * 判斷調度開關是否打開 * 若是打開了:加載調度註解並將調度添加到調度管理中 */ ApplicationContext context = event.getApplicationContext(); Map<String,Object> beans = context.getBeansWithAnnotation(org.springframework.scheduling.annotation.EnableScheduling.class); if(beans == null) { return; } /** * 用來存放被調度註解修飾的方法名和Method的映射 */ Map<String,Method> methodMap = new HashMap<>(); /** * 查找全部直接或者間接被Component註解修飾的類,由於無論Service,Controller等都包含了Component,也就是 * 只要是被歸入了spring容器管理的類必然直接或者間接的被Component修飾 */ Map<String,Object> allBeans = context.getBeansWithAnnotation(org.springframework.stereotype.Component.class); Set<Map.Entry<String,Object>> entrys = allBeans.entrySet(); /** * 遍歷bean和裏面的method找到被Scheduled註解修飾的方法,而後將任務放入任務調度裏 */ for(Map.Entry entry:entrys){ Object obj = entry.getValue(); Class clazz = obj.getClass(); Method[] methods = clazz.getMethods(); for(Method m:methods) { if(m.isAnnotationPresent(Scheduled.class)) { methodMap.put(clazz.getName() + Delimiters.DOT + m.getName(),m); } } } /** * 處理Sheduled註解 */ handleSheduledAnn(methodMap); /** * 因爲taskIdMap只是啓動spring完成後使用一次,這裏能夠直接清空 */ taskIdMap.clear(); } } /** * 循環處理方法map中的全部Method * @param methodMap */ private void handleSheduledAnn(Map<String,Method> methodMap) { if(methodMap == null || methodMap.isEmpty()) { return; } Set<Map.Entry<String,Method>> entrys = methodMap.entrySet(); /** * 遍歷bean和裏面的method找到被Scheduled註解修飾的方法,而後將任務放入任務調度裏 */ for(Map.Entry<String,Method> entry:entrys){ Method m = entry.getValue(); try { handleSheduledAnn(methodMap,m); } catch (Exception e) { e.printStackTrace(); continue; } } } /** * 遞歸添加父子任務 * @param methodMap * @param m * @throws Exception */ private void handleSheduledAnn(Map<String,Method> methodMap,Method m) throws Exception { Class<?> clazz = m.getDeclaringClass(); String name = m.getName(); Scheduled sAnn = m.getAnnotation(Scheduled.class); String cron = sAnn.cron(); String parent = sAnn.parent(); /** * 若是parent爲空,說明該方法表明的任務是根任務,則添加到任務調度器中,而且保存在全局map中 * 若是parent不爲空,則表示是子任務,子任務須要知道父任務的id * 先根據parent裏面表明的方法全名或者方法名(父任務方法和子任務方法在同一個類直接能夠用方法名, * 否則要帶上類的全名)從taskIdMap獲取父任務ID * 若是找不到父任務ID,先根據父方法全名在methodMap找到父任務的method對象,調用本方法遞歸下去 * 若是找到父任務ID,則添加子任務 */ if(StringUtils.isEmpty(parent)) { if(!taskIdMap.containsKey(clazz.getName() + Delimiters.DOT + name)) { Long taskId = taskExecutor.addTask(name, cron, new Invocation(clazz, name, new Class[]{}, new Object[]{})); taskIdMap.put(clazz.getName() + Delimiters.DOT + name, taskId); } } else { String parentMethodName = parent.lastIndexOf(Delimiters.DOT) == -1 ? clazz.getName() + Delimiters.DOT + parent : parent; Long parentTaskId = taskIdMap.get(parentMethodName); if(parentTaskId == null) { Method parentMethod = methodMap.get(parentMethodName); handleSheduledAnn(methodMap,parentMethod); /** * 遞歸回來必定要更新一下這個父任務ID */ parentTaskId = taskIdMap.get(parentMethodName); } if(parentTaskId != null && !taskIdMap.containsKey(clazz.getName() + Delimiters.DOT + name)) { Long taskId = taskExecutor.addChildTask(parentTaskId, name, cron, new Invocation(clazz, name, new Class[]{}, new Object[]{})); taskIdMap.put(clazz.getName() + Delimiters.DOT + name, taskId); } } } }
上述代碼就完成了spring初始化完成後加載了本身的自定義任務調度的註解,而且也受spring的調度開關@EnableScheduling的控制,實現無縫整合到spring或者springboot中去,達到了我這種的懶人的要求。
好了其實寫這個框架差很少就用了5天業餘時間,估計會有一些隱藏的坑,不過明顯的坑我本身都解決了,開源出來的目的既是爲了拋磚引玉,也爲了廣大屌絲程序員提供一種新的思路,但願對你們有所幫助,同時也但願你們多幫忙找找bug,一塊兒來完善這個東西,大神們請忽略。文筆很差,主要是很久沒寫做文了,請你們多多擔待。詳細的流水帳式的源碼加上長篇大論式的漢語註釋盡情查看:https://github.com/rongdi/easy-job