分佈式定時任務—xxl-job學習(二):源碼分析——執行器的啓動過程
- 前言
- 1、執行器的啓動
- 1.1 分析核心類XxlJobSpringExecutor
- 1.1.1 initJobHandlerRepository()
- 1.1.2 initJobHandlerMethodRepository()
- 1.1.3 GlueFactory.refreshInstance(1)
- 1.1.4 super.start()
- 1.2 分析核心類XxlJobExecutor
- 1.2.1 XxlJobFileAppender.initLogPath(logPath)
- 1.2.2 initAdminBizList(adminAddresses, accessToken)
- 1.2.3 JobLogFileCleanThread.getInstance().start(logRetentionDays)
- 1.2.4 TriggerCallbackThread.getInstance().start()
- 1.2.5 initEmbedServer(address, ip, port, appname, accessToken)
- 1.3 分析EmbedServer.start(address, port, appname, accessToken)
前言
接上篇:分佈式定時任務—xxl-job學習(一):簡單demo搭建java
從上一篇搭建一個簡單的分佈式demo任務調度項目能夠知道,主要是三個部分:web
- 配置並啓動任務調度中心(xxl-job-admin)
- 配置並啓動業務系統(執行器)
- 在調度中心web頁面配置執行器及任務
本篇我們先從業務系統的執行器的配置和啓動的源碼進行深度分析。
xxl.job.version使用的是 2.2.1-SNAPSHOT版本
spring
1、執行器的啓動
在業務定時任務系統bootstrap
- 引入xxl-job的依賴配置
- 新增執行器組件配置類
XxlJobConfig.java
,其中配置了核心類XxlJobSpringExecutor
- 新增jobhandler類,類中有帶
@XxlJob("xxx")
註解的方法
1.1 分析核心類XxlJobSpringExecutor
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean { @Override public void afterSingletonsInstantiated() { //。。。。。。。。暫時省略這個方法的具體內容 } @Override public void destroy() { super.destroy(); } private void initJobHandlerMethodRepository(ApplicationContext applicationContext) { //。。。。。。。。暫時省略這個方法的具體內容 } private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } public static ApplicationContext getApplicationContext() { return applicationContext; } }
在源碼中咱們能夠看到這個類繼承了XxlJobExecutor
類,實現了ApplicationContextAware、SmartInitializingSingleton、DisposableBean。api
這個對象初始化的時候會調用afterSingletonsInstantiated()方法。app
@Override public void afterSingletonsInstantiated() { // init JobHandler Repository /*initJobHandlerRepository(applicationContext);*/ // init JobHandler Repository (for method) initJobHandlerMethodRepository(applicationContext); // refresh GlueFactory GlueFactory.refreshInstance(1); // super start try { super.start(); } catch (Exception e) { throw new RuntimeException(e); } }
- initJobHandlerRepository()和initJobHandlerMethodRepository()是將項目中配置的任務保存在項目的內存中,使用
ConcurrentMap<String, IJobHandler>
保存,使用 springbean的id爲key,具體的任務實例對象爲 value。; - 刷新GlueFactory(glue執行工廠),把它刷新爲 SpringGlueFactory,在執行 glue 模式的任務時使用 spring 來加載相應實例。
- 會調用執行器的核心XxlJobExecutor中的start()方法。
1.1.1 initJobHandlerRepository()
這個方法是舊版本中用來註冊帶有 @JobHandler 註解的bean的Java類, 2.2.1-SNAPSHOT版本
已經不支持該種方式;jvm
1.1.2 initJobHandlerMethodRepository()
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) { if (applicationContext == null) { return; } // init job handler from method String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true); for (String beanDefinitionName : beanDefinitionNames) { Object bean = applicationContext.getBean(beanDefinitionName); Map<Method, XxlJob> annotatedMethods = null; // referred to : org.springframework.context.event.EventListenerMethodProcessor.processBean try { annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(), new MethodIntrospector.MetadataLookup<XxlJob>() { @Override public XxlJob inspect(Method method) { return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class); } }); } catch (Throwable ex) { logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex); } if (annotatedMethods==null || annotatedMethods.isEmpty()) { continue; } for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) { Method method = methodXxlJobEntry.getKey(); XxlJob xxlJob = methodXxlJobEntry.getValue(); if (xxlJob == null) { continue; } String name = xxlJob.value(); if (name.trim().length() == 0) { throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + method.getName() + "] ."); } if (loadJobHandler(name) != null) { throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts."); } // execute method if (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) { throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " + "The correct method format like \" public ReturnT<String> execute(String param) \" ."); } if (!method.getReturnType().isAssignableFrom(ReturnT.class)) { throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " + "The correct method format like \" public ReturnT<String> execute(String param) \" ."); } method.setAccessible(true); // init and destory Method initMethod = null; Method destroyMethod = null; if (xxlJob.init().trim().length() > 0) { try { initMethod = bean.getClass().getDeclaredMethod(xxlJob.init()); initMethod.setAccessible(true); } catch (NoSuchMethodException e) { throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] ."); } } if (xxlJob.destroy().trim().length() > 0) { try { destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy()); destroyMethod.setAccessible(true); } catch (NoSuchMethodException e) { throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] ."); } } // registry jobhandler 向`ConcurrentMap<String, IJobHandler>`中保存當前定時任務實例。 registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod)); } } }
分析:分佈式
- 從applicationContext中獲取全部的bean對象;
- 利用MethodIntrospector工具類的selectMethods方法和MetadataLookup接口獲得
Map<Method, XxlJob>
(下邊學習下這個工具類核心方法selectMethods的源碼)
public static <T> Map<Method, T> selectMethods(Class<?> targetType, final MetadataLookup<T> metadataLookup) { final Map<Method, T> methodMap = new LinkedHashMap<>(); Set<Class<?>> handlerTypes = new LinkedHashSet<>(); Class<?> specificHandlerType = null; //判斷是不是代理類 if (!Proxy.isProxyClass(targetType)) { //若是是代理類,找到實際的類型 specificHandlerType = ClassUtils.getUserClass(targetType); handlerTypes.add(specificHandlerType); } handlerTypes.addAll(ClassUtils.getAllInterfacesForClassAsSet(targetType)); //遍歷全部找到的class對象 for (Class<?> currentHandlerType : handlerTypes) { final Class<?> targetClass = (specificHandlerType != null ? specificHandlerType : currentHandlerType); ReflectionUtils.doWithMethods(currentHandlerType, method -> { //獲取指定的method Method specificMethod = ClassUtils.getMostSpecificMethod(method, targetClass); //獲取方法關聯的元數據,通常是指註解 T result = metadataLookup.inspect(specificMethod); if (result != null) { Method bridgedMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); if (bridgedMethod == specificMethod || metadataLookup.inspect(bridgedMethod) == null) { methodMap.put(specificMethod, result); } } }, ReflectionUtils.USER_DECLARED_METHODS); } return methodMap; }
- 循環第二步獲得的Map<Method, XxlJob>,key就是註解的id,value是註解元數據。
- 校驗註解元數據的name屬性,若是爲空則拋出異常;
- 根據name從內存
ConcurrentMap<String, IJobHandler>
(這實際上是註冊的時候存的全部任務的倉庫)獲取對應任務實例,若是已經存在,則拋出異常(任務衝突); - 校驗入參,必須爲String param,由於
2.2.1-SNAPSHOT
指定了開發Job方法,方式格式要求爲 「public ReturnT< String> execute(String param)」。 - 校驗出參,必須是ReturnT< String>格式;
- 注入元數據中配置的init()和destroy()方法;
- 向
ConcurrentMap<String, IJobHandler>
中保存當前定時任務實例。
1.1.3 GlueFactory.refreshInstance(1)
刷新GlueFactory爲 SpringGlueFactory,在執行 glue 模式的任務時使用 spring 來加載相應實例。ide
1.1.4 super.start()
調用XxlJobExecutor.start()
。工具
1.2 分析核心類XxlJobExecutor
XxlJobExecutor
的屬性有:
// ---------------------- param ---------------------- private String adminAddresses; private String accessToken; private String appname; private String address; private String ip; private int port; private String logPath; private int logRetentionDays;
上一步介紹了最終XxlJobSpringExecutor
會調用XxlJobExecutor
的start()
方法,下邊咱們繼續看看這個方法作些什麼:
public void start() throws Exception { // init logpath XxlJobFileAppender.initLogPath(logPath); // init invoker, admin-client initAdminBizList(adminAddresses, accessToken); // init JobLogFileCleanThread JobLogFileCleanThread.getInstance().start(logRetentionDays); // init TriggerCallbackThread TriggerCallbackThread.getInstance().start(); // init executor-server initEmbedServer(address, ip, port, appname, accessToken); }
1.2.1 XxlJobFileAppender.initLogPath(logPath)
logPath
是咱們配置執行器組件裏的xxl.job.executor.logpath
日誌路徑。
public static void initLogPath(String logPath){ // init if (logPath!=null && logPath.trim().length()>0) { logBasePath = logPath; } // mk base dir File logPathDir = new File(logBasePath); if (!logPathDir.exists()) { logPathDir.mkdirs(); } logBasePath = logPathDir.getPath(); // mk glue dir File glueBaseDir = new File(logPathDir, "gluesource"); if (!glueBaseDir.exists()) { glueBaseDir.mkdirs(); } glueSrcPath = glueBaseDir.getPath(); }
- 若是配置了日誌路徑,那麼
logBasePath
就是咱們配置文件裏的地址; - 判斷這個日誌路徑是否存在,若是不存在則建立日誌目錄;
- 生成
gluesource
子文件夾;
1.2.2 initAdminBizList(adminAddresses, accessToken)
// ---------------------- admin-client (rpc invoker) ---------------------- private static List<AdminBiz> adminBizList; private void initAdminBizList(String adminAddresses, String accessToken) throws Exception { if (adminAddresses!=null && adminAddresses.trim().length()>0) { for (String address: adminAddresses.trim().split(",")) { if (address!=null && address.trim().length()>0) { AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken); if (adminBizList == null) { adminBizList = new ArrayList<AdminBiz>(); } adminBizList.add(adminBiz); } } } } public static List<AdminBiz> getAdminBizList(){ return adminBizList; }
這個方法是根據調度中心部署跟地址adminAddresses
和執行器通信TOKENaccessToken
初始化AdminBizClient
,AdminBizClient這個類有三個核心方法
@Override public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) { return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class); } @Override public ReturnT<String> registry(RegistryParam registryParam) { return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class); } @Override public ReturnT<String> registryRemove(RegistryParam registryParam) { return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, timeout, registryParam, String.class); }
提供callback(回調)、registry(註冊)以及registryRemove(註冊移除)到調度中心的方法。
1.2.3 JobLogFileCleanThread.getInstance().start(logRetentionDays)
這個方法是初始化日誌清除線程,過時日誌自動清理(清理N天前的日誌文件)。
public class JobLogFileCleanThread { private static Logger logger = LoggerFactory.getLogger(JobLogFileCleanThread.class); private static JobLogFileCleanThread instance = new JobLogFileCleanThread(); public static JobLogFileCleanThread getInstance(){ return instance; } private Thread localThread; private volatile boolean toStop = false; public void start(final long logRetentionDays){ // limit min value if (logRetentionDays < 3 ) { return; } localThread = new Thread(new Runnable() { @Override public void run() { while (!toStop) { try { // clean log dir, over logRetentionDays File[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles(); if (childDirs!=null && childDirs.length>0) { // today Calendar todayCal = Calendar.getInstance(); todayCal.set(Calendar.HOUR_OF_DAY,0); todayCal.set(Calendar.MINUTE,0); todayCal.set(Calendar.SECOND,0); todayCal.set(Calendar.MILLISECOND,0); Date todayDate = todayCal.getTime(); for (File childFile: childDirs) { // valid if (!childFile.isDirectory()) { continue; } if (childFile.getName().indexOf("-") == -1) { continue; } // file create date Date logFileCreateDate = null; try { SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd"); logFileCreateDate = simpleDateFormat.parse(childFile.getName()); } catch (ParseException e) { logger.error(e.getMessage(), e); } if (logFileCreateDate == null) { continue; } if ((todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000) ) { FileUtil.deleteRecursively(childFile); } } } } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } try { TimeUnit.DAYS.sleep(1); } catch (InterruptedException e) { if (!toStop) { logger.error(e.getMessage(), e); } } } logger.info(">>>>>>>>>>> xxl-job, executor JobLogFileCleanThread thread destory."); } }); localThread.setDaemon(true); localThread.setName("xxl-job, executor JobLogFileCleanThread"); localThread.start(); } public void toStop() { toStop = true; if (localThread == null) { return; } // interrupt and wait localThread.interrupt(); try { localThread.join(); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } } }
咱們主要關注下start()方法:
- 執行器組件配置的執行器日誌文件保存天數必須大於3,不然不清理;
- 建立一個守護線程,天天執行一次(
TimeUnit.DAYS.sleep(1);
); - 獲取日誌路徑根目錄下的全部日期文件目錄;
- 循環判斷當前時間(當天的0時0分0秒0毫秒)和日期目錄對應的"yyyy-MM-dd"時間差值是否大於配置的執行器日誌文件保存天數參數;
(todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000)
- 若是超過日誌保存天數,則刪除該時間目錄及其目錄下全部文件。
1.2.4 TriggerCallbackThread.getInstance().start()
public void start() { // valid if (XxlJobExecutor.getAdminBizList() == null) { logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null."); return; } // callback triggerCallbackThread = new Thread(new Runnable() { @Override public void run() { // normal callback while(!toStop){ try { HandleCallbackParam callback = getInstance().callBackQueue.take(); if (callback != null) { // callback list param List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>(); int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); callbackParamList.add(callback); // callback, will retry if error if (callbackParamList!=null && callbackParamList.size()>0) { doCallback(callbackParamList); } } } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } } // last callback try { List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>(); int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList); if (callbackParamList!=null && callbackParamList.size()>0) { doCallback(callbackParamList); } } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory."); } }); triggerCallbackThread.setDaemon(true); triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread"); triggerCallbackThread.start(); // retry triggerRetryCallbackThread = new Thread(new Runnable() { @Override public void run() { while(!toStop){ try { retryFailCallbackFile(); } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } try { TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); } catch (InterruptedException e) { if (!toStop) { logger.error(e.getMessage(), e); } } } logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destory."); } }); triggerRetryCallbackThread.setDaemon(true); triggerRetryCallbackThread.start(); }
分析:
- 校驗有無配置調度中心,沒有則拋出異常;
- 新建一個回調守護線程;
- 若是執行器正常運行,則從任務執行結果回調隊列
LinkedBlockingQueue<HandleCallbackParam> callBackQueue
中獲取一個回調入參對象HandleCallbackParam callback = getInstance().callBackQueue.take();
若是callback
不爲空,則用drainTo()
方法批量獲取回調入參集合,調用doCallback(callbackParamList)
方法; - 若是執行器終止,則直接用
drainTo()
方法批量獲取回調隊列裏的回調入參集合,若是回調入參集合不爲空則調用doCallback(callbackParamList)
方法; - 分析下
doCallback(callbackParamList)
方法:private void doCallback(List<HandleCallbackParam> callbackParamList){ boolean callbackRet = false; // callback, will retry if error for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { try { ReturnT<String> callbackResult = adminBiz.callback(callbackParamList); if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) { callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish."); callbackRet = true; break; } else { callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult); } } catch (Exception e) { callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage()); } } if (!callbackRet) { appendFailCallbackFile(callbackParamList); } }
循環全部的AdminBiz,調用callback(callbackParamList)
方法執行回調,調用callbackLog()
方法記錄當前任務執行日誌並生成日誌文件,若是發生異常則調用appendFailCallbackFile(callbackParamList)
方法。private void appendFailCallbackFile(List<HandleCallbackParam> callbackParamList){ // valid if (callbackParamList==null || callbackParamList.size()==0) { return; } // append file byte[] callbackParamList_bytes = JdkSerializeTool.serialize(callbackParamList); File callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis()))); if (callbackLogFile.exists()) { for (int i = 0; i < 100; i++) { callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis()).concat("-").concat(String.valueOf(i)) )); if (!callbackLogFile.exists()) { break; } } } FileUtil.writeFileContent(callbackLogFile, callbackParamList_bytes); }
- 若是回調入參集合不爲空,使用
JdkSerializeTool
工具類,將集合序列化爲byte[]; - 在日誌根目錄下建立
callbacklog
目錄,生成回調失敗記錄文件xxl-job-callback-{x}.log
,其中{x}爲當前時間戳; - 判斷當前文件是否存在,若是存在則生成xxl-job-callback-{x}-i.log(i爲0~100的數值)
- 將byte[]寫入回調失敗記錄文件中。
- 若是回調入參集合不爲空,使用
- 新建一個回調重試守護線程,每30秒執行一次
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
- 當執行器還在運行時,回調重試守護線程調用
retryFailCallbackFile()
方法。private void retryFailCallbackFile(){ // valid File callbackLogPath = new File(failCallbackFilePath); if (!callbackLogPath.exists()) { return; } if (callbackLogPath.isFile()) { callbackLogPath.delete(); } if (!(callbackLogPath.isDirectory() && callbackLogPath.list()!=null && callbackLogPath.list().length>0)) { return; } // load and clear file, retry for (File callbaclLogFile: callbackLogPath.listFiles()) { byte[] callbackParamList_bytes = FileUtil.readFileContent(callbaclLogFile); List<HandleCallbackParam> callbackParamList = (List<HandleCallbackParam>) JdkSerializeTool.deserialize(callbackParamList_bytes, List.class); callbaclLogFile.delete(); doCallback(callbackParamList); } }
分析:- 判斷回調失敗日誌記錄目錄是否存在,不存在則跳出方法;
- 若是回調失敗日誌記錄目錄下沒有子文件則跳出方法;
- 循環每一個子文件,使用
JdkSerializeTool
工具類將文件中讀出的byte[]轉爲回調入參對象集合List< HandleCallbackParam>,刪除對應日誌記錄文件,調用doCallback(callbackParamList)
方法執行回調。
1.2.5 initEmbedServer(address, ip, port, appname, accessToken)
// ---------------------- executor-server (rpc provider) ---------------------- private EmbedServer embedServer = null; private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception { // fill ip port port = port>0?port: NetUtil.findAvailablePort(9999); ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp(); // generate address if (address==null || address.trim().length()==0) { String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is null address = "http://{ip_port}/".replace("{ip_port}", ip_port_address); } // start embedServer = new EmbedServer(); embedServer.start(address, port, appname, accessToken); } private void stopEmbedServer() { // stop provider factory try { embedServer.stop(); } catch (Exception e) { logger.error(e.getMessage(), e); } }
分析:
- 初始化執行器IP 地址與端口,若是IP沒配置則默認取當前服務的地址,若是端口沒配置則默認爲9999;
- 生成服務調用地址
"http://{ip_port}/"
- 調用embedServer.start(address, port, appname, accessToken)方法。
1.3 分析EmbedServer.start(address, port, appname, accessToken)
public void start(final String address, final int port, final String appname, final String accessToken) { executorBiz = new ExecutorBizImpl(); thread = new Thread(new Runnable() { @Override public void run() { // param EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor( 0, 200, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode()); } }, new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!"); } }); try { // start server ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) throws Exception { channel.pipeline() .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle .addLast(new HttpServerCodec()) .addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool)); } }) .childOption(ChannelOption.SO_KEEPALIVE, true); // bind ChannelFuture future = bootstrap.bind(port).sync(); logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port); // start registry startRegistry(appname, address); // wait util stop future.channel().closeFuture().sync(); } catch (InterruptedException e) { if (e instanceof InterruptedException) { logger.info(">>>>>>>>>>> xxl-job remoting server stop."); } else { logger.error(">>>>>>>>>>> xxl-job remoting server error.", e); } } finally { // stop try { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave thread.start(); }
1.3.1 ExecutorBiz調度業務的實現類ExecutorBizImpl
定義了任務的心跳檢測、忙碌檢測、觸發任務、終止任務以及查看執行日誌的方法
1.3.1.1 心跳檢測
@Override public ReturnT<String> beat() { return ReturnT.SUCCESS; }
1.3.1.2 忙碌檢測
@Override public ReturnT<String> idleBeat(IdleBeatParam idleBeatParam) { // isRunningOrHasQueue boolean isRunningOrHasQueue = false; JobThread jobThread = XxlJobExecutor.loadJobThread(idleBeatParam.getJobId()); if (jobThread != null && jobThread.isRunningOrHasQueue()) { isRunningOrHasQueue = true; } if (isRunningOrHasQueue) { return new ReturnT<String>(ReturnT.FAIL_CODE, "job thread is running or has trigger queue."); } return ReturnT.SUCCESS; }
分析:
- 根據jobId從任務線程倉庫
ConcurrentMap<Integer, JobThread> jobThreadRepository
中獲取對應的任務線程; - 若是任務線程對象不爲空且正在運行或者正在隊列中則認爲處於忙碌狀態。
1.3.1.3 觸發任務
@Override public ReturnT<String> run(TriggerParam triggerParam) { // load old:jobHandler + jobThread JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId()); IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null; String removeOldReason = null; // valid:jobHandler + jobThread GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType()); if (GlueTypeEnum.BEAN == glueTypeEnum) { // new jobhandler IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler()); // valid old jobThread if (jobThread!=null && jobHandler != newJobHandler) { // change handler, need kill old thread removeOldReason = "change jobhandler or glue type, and terminate the old job thread."; jobThread = null; jobHandler = null; } // valid handler if (jobHandler == null) { jobHandler = newJobHandler; if (jobHandler == null) { return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found."); } } } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) { // valid old jobThread if (jobThread != null && !(jobThread.getHandler() instanceof GlueJobHandler && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) { // change handler or gluesource updated, need kill old thread removeOldReason = "change job source or glue type, and terminate the old job thread."; jobThread = null; jobHandler = null; } // valid handler if (jobHandler == null) { try { IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource()); jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime()); } catch (Exception e) { logger.error(e.getMessage(), e); return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage()); } } } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) { // valid old jobThread if (jobThread != null && !(jobThread.getHandler() instanceof ScriptJobHandler && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) { // change script or gluesource updated, need kill old thread removeOldReason = "change job source or glue type, and terminate the old job thread."; jobThread = null; jobHandler = null; } // valid handler if (jobHandler == null) { jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType())); } } else { return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid."); } // executor block strategy if (jobThread != null) { ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null); if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) { // discard when running if (jobThread.isRunningOrHasQueue()) { return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle()); } } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) { // kill running jobThread if (jobThread.isRunningOrHasQueue()) { removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle(); jobThread = null; } } else { // just queue trigger } } // replace thread (new or exists invalid) if (jobThread == null) { jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason); } // push data to queue ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam); return pushResult; }
分析:
- 根據jobId從任務線程倉庫
ConcurrentMap<Integer, JobThread> jobThreadRepository
中獲取對應的任務線程; - 根據任務類型校驗任務線程和任務具體處理實例;(若是是"BEAN"類型的任務,根據任務處理實例id從
ConcurrentMap<String, IJobHandler> jobHandlerRepository
獲取到具體的任務處理實例,若是任務線程不爲空而且任務具體實例和從任務線程中拿到的任務實例不一樣則必須更改任務類型而且終止以前的任務線程) - 若是任務線程不爲空,獲取任務的阻塞處理策略(若是策略是
丟棄後續調度
,則本次請求將會被丟棄並返回爲失敗;若是策略是覆蓋以前調度
,則把任務線程置爲null,移除緣由置爲「xxxxxx」); - 若是任務線程爲null,調用
XxlJobExecutor.registJobThread()
進行保存;public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){ JobThread newJobThread = new JobThread(jobId, handler); newJobThread.start(); logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler}); JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!! if (oldJobThread != null) { oldJobThread.toStop(removeOldReason); oldJobThread.interrupt(); } return newJobThread; }
新建一個JobThread;判斷任務線程倉庫ConcurrentMap<Integer, JobThread> jobThreadRepository
中是否已經存在舊值,若是存在則銷燬對象,中斷線程。 - 調用
pushTriggerQueue()
方法將入參放入任務線程的執行隊列中。public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) { // avoid repeat if (triggerLogIdSet.contains(triggerParam.getLogId())) { logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId()); return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId()); } triggerLogIdSet.add(triggerParam.getLogId()); triggerQueue.add(triggerParam); return ReturnT.SUCCESS; }
根據日誌記錄id判斷任務是否重複執行
1.3.1.4 終止任務
@Override public ReturnT<String> kill(KillParam killParam) { // kill handlerThread, and create new one JobThread jobThread = XxlJobExecutor.loadJobThread(killParam.getJobId()); if (jobThread != null) { XxlJobExecutor.removeJobThread(killParam.getJobId(), "scheduling center kill job."); return ReturnT.SUCCESS; } return new ReturnT<String>(ReturnT.SUCCESS_CODE, "job thread already killed."); }
根據jobId獲取任務線程對象,調用removeJobThread()方法終止線程。
public static JobThread removeJobThread(int jobId, String removeOldReason){ JobThread oldJobThread = jobThreadRepository.remove(jobId); if (oldJobThread != null) { oldJobThread.toStop(removeOldReason); oldJobThread.interrupt(); return oldJobThread; } return null; }
1.3.1.5 查看執行日誌
@Override public ReturnT<LogResult> log(LogParam logParam) { // log filename: logPath/yyyy-MM-dd/9999.log String logFileName = XxlJobFileAppender.makeLogFileName(new Date(logParam.getLogDateTim()), logParam.getLogId()); LogResult logResult = XxlJobFileAppender.readLog(logFileName, logParam.getFromLineNum()); return new ReturnT<LogResult>(logResult); }
1.3.2 建立一個守護線程供調度中心調用
1.3.2.1 暴露一個任務調度的RPC服務提供給調度中心調用
使用netty_http啓動http 服務
,綁定你傳入設置執行器的端口。經過 EmbedHttpServerHandler
來處理調度中心調度執行器中的任務
1.3.2.2 註冊當前執行器的地址到調度中心
startRegistry(appname, address)
public void startRegistry(final String appname, final String address) { // start registry ExecutorRegistryThread.getInstance().start(appname, address); }
1.3.2.2.1 分析ExecutorRegistryThread類
public void start(final String appname, final String address){ // valid if (appname==null || appname.trim().length()==0) { logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appname is null."); return; } if (XxlJobExecutor.getAdminBizList() == null) { logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null."); return; } registryThread = new Thread(new Runnable() { @Override public void run() { // registry while (!toStop) { try { RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address); for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { try { ReturnT<String> registryResult = adminBiz.registry(registryParam); if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) { registryResult = ReturnT.SUCCESS; logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult}); break; } else { logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult}); } } catch (Exception e) { logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e); } } } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } try { if (!toStop) { TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); } } catch (InterruptedException e) { if (!toStop) { logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage()); } } } // registry remove try { RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address); for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) { try { ReturnT<String> registryResult = adminBiz.registryRemove(registryParam); if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) { registryResult = ReturnT.SUCCESS; logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult}); break; } else { logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult}); } } catch (Exception e) { if (!toStop) { logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e); } } } } catch (Exception e) { if (!toStop) { logger.error(e.getMessage(), e); } } logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory."); } }); registryThread.setDaemon(true); registryThread.setName("xxl-job, executor ExecutorRegistryThread"); registryThread.start(); }
- 校驗執行器配置appname和調度中心地址adminAddresses(不能爲空);
- 建立一個註冊守護線程registryThread(每
30秒
註冊一次); - 運行狀態下,循環執行器配置的全部
AdminBizClient
,調用registry()方法發起註冊; - 服務下線後,調用registryRemove()方法發起註冊移除。