一、quartz是基於數據庫for update實現鎖,來保證同一個任務同一時間只會執行一次。 二、最新版本的xxl-job已經摒棄了quartz.
進入JobTriggerPoolHelper.trigger方法,調用了JobTriggerPoolHelper.addTrigger方法,那麼看一下addTrigger方法:java
/** * add trigger */ public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam) { // choose thread pool //這裏區分了快慢線程池,1分鐘內超過500毫秒的請求大於10次,放入慢線程池處理 //快線程池默認8個核心線程,最大線程200,任務隊列1000 //慢線程池默認0個核心線程,最大線程100,任務隊列2000 ThreadPoolExecutor triggerPool_ = fastTriggerPool; AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId); if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min triggerPool_ = slowTriggerPool; } // trigger triggerPool_.execute(new Runnable() { @Override public void run() { long start = System.currentTimeMillis(); try { //觸發執行任務 XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam); } catch (Exception e) { logger.error(e.getMessage(), e); } finally { // check timeout-count-map long minTim_now = System.currentTimeMillis()/60000; if (minTim != minTim_now) { minTim = minTim_now; jobTimeoutCountMap.clear(); } // incr timeout-count-map long cost = System.currentTimeMillis()-start; if (cost > 500) { //耗時超過500ms就計算爲慢請求,加入慢線程池。 AtomicInteger timeoutCount = jobTimeoutCountMap.put(jobId, new AtomicInteger(1)); if (timeoutCount != null) { timeoutCount.incrementAndGet(); } } } } }); }
進入執行任務:XxlJobTrigger.trigger數據庫
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam) { // load data //經過JobId從數據庫中查詢該任務的具體信息 XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId); if (jobInfo == null) { logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId); return; } if (executorParam != null) { jobInfo.setExecutorParam(executorParam); } int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount(); //獲取該類型的執行器信息 XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup()); // sharding param //分片信息 int[] shardingParam = null; if (executorShardingParam!=null){ String[] shardingArr = executorShardingParam.split("/"); if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) { shardingParam = new int[2]; shardingParam[0] = Integer.valueOf(shardingArr[0]); shardingParam[1] = Integer.valueOf(shardingArr[1]); } } //廣播模式,循環執行器配置的服務地址列表 if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null) && group.getRegistryList()!=null && !group.getRegistryList().isEmpty() && shardingParam==null) { for (int i = 0; i < group.getRegistryList().size(); i++) { processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size()); } } else { if (shardingParam == null) { shardingParam = new int[]{0, 1}; } //非廣播模式進入 processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]); } }
進入processTrigger方法,組裝任務參數,選擇路由和阻塞策略緩存
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){ //阻塞策略 ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy //路由策略,官方一共10中路由策略 ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null; // 一、save log-id 日誌信息 XxlJobLog jobLog = new XxlJobLog(); jobLog.setJobGroup(jobInfo.getJobGroup()); jobLog.setJobId(jobInfo.getId()); jobLog.setTriggerTime(new Date()); XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog); logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId()); // 二、init trigger-param 初始化任務參數 //這裏要增長2個參數; // 一、固定IP, // 二、任務爲空時,默認輪訓隊列次數 TriggerParam triggerParam = new TriggerParam(); triggerParam.setJobId(jobInfo.getId()); triggerParam.setExecutorHandler(jobInfo.getExecutorHandler()); triggerParam.setExecutorParams(jobInfo.getExecutorParam()); triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy()); triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout()); triggerParam.setLogId(jobLog.getId()); triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime()); triggerParam.setGlueType(jobInfo.getGlueType()); triggerParam.setGlueSource(jobInfo.getGlueSource()); triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime()); triggerParam.setBroadcastIndex(index); triggerParam.setBroadcastTotal(total); triggerParam.setAssignAddress(jobInfo.getAssignAddress()); triggerParam.setJobEmptyLoopNum(jobInfo.getJobEmptyLoopNum()); // 三、init address 選擇執行器服務地址 String address = null; ReturnT<String> routeAddre***esult = null; if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) { if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) { if (index < group.getRegistryList().size()) { address = group.getRegistryList().get(index); } else { address = group.getRegistryList().get(0); } } else { //根據路由策略,選擇合適的執行器服務地址來執行任務 routeAddre***esult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList()); if (routeAddre***esult.getCode() == ReturnT.SUCCESS_CODE) { address = routeAddre***esult.getContent(); } } } else { routeAddre***esult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty")); } // 四、trigger remote executor ReturnT<String> triggerResult = null; if (address != null) { //遠程調用,這個是重點方法 triggerResult = runExecutor(triggerParam, address); } else { triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null); } // 五、collection trigger info StringBuffer triggerMsgSb = new StringBuffer(); triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle()); triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp()); triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":") .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") ); triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList()); triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle()); if (shardingParam != null) { triggerMsgSb.append("("+shardingParam+")"); } triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle()); triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout()); triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount); triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>") .append((routeAddre***esult!=null&&routeAddre***esult.getMsg()!=null)?routeAddre***esult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():""); // 六、save log trigger-info jobLog.setExecutorAddress(address); jobLog.setExecutorHandler(jobInfo.getExecutorHandler()); jobLog.setExecutorParam(jobInfo.getExecutorParam()); jobLog.setExecutorShardingParam(shardingParam); jobLog.setExecutorFailRetryCount(finalFailRetryCount); //jobLog.setTriggerTime(); jobLog.setTriggerCode(triggerResult.getCode()); jobLog.setTriggerMsg(triggerMsgSb.toString()); XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog); logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId()); }
因爲某些場景,某個任務必須保證只能執行一次,或者寧願不執行也不容許重複執行,比方說發放優惠券,能夠少發或者不發,可是不能多發,這種狀況下,讓任務固定到一個執行器服務IP上執行,因此在原來的基礎上增長了一個策略,固定IP策略app
組裝好參數,選擇了執行任務地址,進入runExecutor方法分佈式
建立好了RPC的客戶端對象,在建立對象過程當中使用了NettyHttp協議,HESSIAN序列化,就能夠發起RPC請求了ide
public static ExecutorBiz getExecutorBiz(String address) throws Exception { // valid if (address==null || address.trim().length()==0) { return null; } // load-cache 是否在緩存中 address = address.trim(); ExecutorBiz executorBiz = executorBizRepository.get(address); if (executorBiz != null) { return executorBiz; } // set-cache // 建立ExecutorBiz的代理對象,重點在這個裏面。 executorBiz = (ExecutorBiz) new XxlRpcReferenceBean( NetEnum.NETTY_HTTP, //nettyHttp Serializer.SerializeEnum.HESSIAN.getSerializer(),//序列化 CallType.SYNC,//同步 LoadBalance.ROUND, ExecutorBiz.class, null, 5000, address, XxlJobAdminConfig.getAdminConfig().getAccessToken(), null, null).getObject(); executorBizRepository.put(address, executorBiz); //對象放入緩存 return executorBiz; }
而後發起RPC請求:executorBiz.runoop
public ReturnT<String> run(TriggerParam triggerParam) { // load old:jobHandler + jobThread // 經過參數中的JobID, 從本地線程庫裏面獲取線程 ( 第一次進來是沒有線程的,jobThread爲空 , // 本地線程庫,本質上就是一個ConcurrentHashMap<Integer, JobThread> JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId()); IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null; String removeOldReason = null; // valid:jobHandler + jobThread //運行模式,這裏看一下java模式就能夠了 GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType()); if (GlueTypeEnum.BEAN == glueTypeEnum) { // new jobhandler // 經過參數中的handlerName從本地內存中獲取handler實例 // (在執行器啓動的時候,是把全部帶有@JobHandler的實例經過name放入到一個map中的 ) IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler()); // valid old jobThread // 若是修改了任務的handler, name此處會默認把之前老的handler清空,後面會以最新的newJobHandler爲準 if (jobThread!=null && jobHandler != newJobHandler) { // change handler, need kill old thread removeOldReason = "change jobhandler or glue type, and terminate the old job thread."; jobThread = null; jobHandler = null; } // valid handler if (jobHandler == null) { jobHandler = newJobHandler; if (jobHandler == null) { return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found."); } } } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) { // valid old jobThread if (jobThread != null && !(jobThread.getHandler() instanceof GlueJobHandler && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) { // change handler or gluesource updated, need kill old thread removeOldReason = "change job source or glue type, and terminate the old job thread."; jobThread = null; jobHandler = null; } // valid handler if (jobHandler == null) { try { IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource()); jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime()); } catch (Exception e) { logger.error(e.getMessage(), e); return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage()); } } } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) { // valid old jobThread if (jobThread != null && !(jobThread.getHandler() instanceof ScriptJobHandler && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) { // change script or gluesource updated, need kill old thread removeOldReason = "change job source or glue type, and terminate the old job thread."; jobThread = null; jobHandler = null; } // valid handler if (jobHandler == null) { jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType())); } } else { return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid."); } // executor block strategy if (jobThread != null) { ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null); if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) { // discard when running if (jobThread.isRunningOrHasQueue()) { return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle()); } } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) { // kill running jobThread if (jobThread.isRunningOrHasQueue()) { removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle(); jobThread = null; } } else { // just queue trigger } } // replace thread (new or exists invalid) // 若是jobThread爲空,那麼這個時候,就要註冊一個線程到本地線程庫裏面去。 // 而後啓動這個線程,線程會輪訓任務隊列開始執行,能夠查看JobThread.run方法 if (jobThread == null) { jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason, triggerParam.getJobEmptyLoopNum()); } // push data to queue // 任務線程已經存在了,將任務參數放入任務隊列,每一個任務線程有一個任務隊列,任務線程去輪詢這個任務 ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam); return pushResult; }
至此調度中心,基本處理完成。而後看執行器的操做流程。源碼分析
而後執行XxlJobExecutor。start方法:spa
public void start() throws Exception { // init logpath 初始化本地日誌路徑 XxlJobFileAppender.initLogPath(logPath); // init invoker, admin-client // 初始化調度中心的地址列表,建立好adminBiz實例,調度中心客戶端 initAdminBizList(adminAddresses, accessToken); // init JobLogFileCleanThread JobLogFileCleanThread.getInstance().start(logRetentionDays); // init TriggerCallbackThread TriggerCallbackThread.getInstance().start(); // init executor-server port = port>0?port: NetUtil.findAvailablePort(9999); ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp(); //啓動執行器服務,默認開端口9999 initRpcProvider(ip, port, appName, accessToken); }
RPC服務啓動了,就能夠正常提供執行器服務了。線程
重點看一下線程的run方法,會循環獲取隊列裏的任務,每次獲取超時時間是3秒,默認30次,若是沒有任務就中止線程,這裏的30次已經進行了定製化修改。
public void run() { // init try { //初始化任務對象 handler.init(); } catch (Throwable e) { logger.error(e.getMessage(), e); } // execute while(!toStop){ running = false; idleTimes++;//累加輪詢次數 TriggerParam triggerParam = null; ReturnT<String> executeResult = null; try { //獲取隊列裏的任務,設置3秒鐘超時 triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); if (triggerParam!=null) { running = true; idleTimes = 0; triggerLogIdSet.remove(triggerParam.getLogId()); // log filename, like "logPath/yyyy-MM-dd/9999.log" String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId()); XxlJobFileAppender.contextHolder.set(logFileName); ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal())); // execute XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams()); //帶超時的任務執行 if (triggerParam.getExecutorTimeout() > 0) { // limit timeout Thread futureThread = null; try { final TriggerParam triggerParamTmp = triggerParam; FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() { @Override public ReturnT<String> call() throws Exception { //執行任務 return handler.execute(triggerParamTmp.getExecutorParams()); } }); //啓動線程執行任務 futureThread = new Thread(futureTask); futureThread.start(); //獲取執行任務結果 executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS); } catch (TimeoutException e) { XxlJobLogger.log("<br>----------- xxl-job job execute timeout"); XxlJobLogger.log(e); executeResult = new ReturnT<String>(IJobHandler.FAIL_TIMEOUT.getCode(), "job execute timeout "); } finally { futureThread.interrupt(); } } else { // just execute //僅僅執行任務 executeResult = handler.execute(triggerParam.getExecutorParams()); } if (executeResult == null) { executeResult = IJobHandler.FAIL; } else { executeResult.setMsg( (executeResult!=null&&executeResult.getMsg()!=null&&executeResult.getMsg().length()>50000) ?executeResult.getMsg().substring(0, 50000).concat("...") :executeResult.getMsg()); executeResult.setContent(null); // limit obj size } XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult); } else { //超過必定次數,清空線程,並設置JobThread的stop中止標識位,終止輪詢。也就是3*jobEmptyLoopNum秒空輪詢 XxlJobLogger.log("<br>----------- xxl-job loop num diy set Param:" + jobEmptyLoopNum); if (idleTimes > jobEmptyLoopNum) { XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); } } } catch (Throwable e) { if (toStop) { XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason); } StringWriter stringWriter = new StringWriter(); e.printStackTrace(new PrintWriter(stringWriter)); String errorMsg = stringWriter.toString(); executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg); XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------"); } finally { if(triggerParam != null) { // callback handler info if (!toStop) { // commonm TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), executeResult)); } else { // is killed ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running,killed]"); TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), stopResult)); } } } } // callback trigger request in queue while(triggerQueue !=null && triggerQueue.size()>0){ TriggerParam triggerParam = triggerQueue.poll(); if (triggerParam!=null) { // is killed ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]"); TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), stopResult)); } } // destroy try { handler.destroy(); } catch (Throwable e) { logger.error(e.getMessage(), e); } logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread()); }