分佈式定時任務—xxl-job學習(二):源碼分析——執行器的啓動過程

前言

接上篇:分佈式定時任務—xxl-job學習(一):簡單demo搭建java

從上一篇搭建一個簡單的分佈式demo任務調度項目能夠知道,主要是三個部分:web

  1. 配置並啓動任務調度中心(xxl-job-admin)
  2. 配置並啓動業務系統(執行器)
  3. 在調度中心web頁面配置執行器及任務

本篇我們先從業務系統的執行器的配置和啓動的源碼進行深度分析。
xxl.job.version使用的是 2.2.1-SNAPSHOT版本
spring

1、執行器的啓動

在業務定時任務系統bootstrap

  1. 引入xxl-job的依賴配置
  2. 新增執行器組件配置類XxlJobConfig.java,其中配置了核心類XxlJobSpringExecutor
  3. 新增jobhandler類,類中有帶 @XxlJob("xxx")註解的方法

1.1 分析核心類XxlJobSpringExecutor

public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {
 
 	@Override
    public void afterSingletonsInstantiated() {
		//。。。。。。。。暫時省略這個方法的具體內容
    }

	@Override
    public void destroy() {
        super.destroy();
    }

	 private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
		//。。。。。。。。暫時省略這個方法的具體內容
	}

	private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }
}

在源碼中咱們能夠看到這個類繼承了XxlJobExecutor類,實現了ApplicationContextAware、SmartInitializingSingleton、DisposableBean。api

這個對象初始化的時候會調用afterSingletonsInstantiated()方法。app

@Override
    public void afterSingletonsInstantiated() {

        // init JobHandler Repository
        /*initJobHandlerRepository(applicationContext);*/

        // init JobHandler Repository (for method)
        initJobHandlerMethodRepository(applicationContext);

        // refresh GlueFactory
        GlueFactory.refreshInstance(1);

        // super start
        try {
            super.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
  1. initJobHandlerRepository()和initJobHandlerMethodRepository()是將項目中配置的任務保存在項目的內存中,使用ConcurrentMap<String, IJobHandler>保存,使用 springbean的id爲key,具體的任務實例對象爲 value。;
  2. 刷新GlueFactory(glue執行工廠),把它刷新爲 SpringGlueFactory,在執行 glue 模式的任務時使用 spring 來加載相應實例。
  3. 會調用執行器的核心XxlJobExecutor中的start()方法。

1.1.1 initJobHandlerRepository()

這個方法是舊版本中用來註冊帶有 @JobHandler 註解的bean的Java類, 2.2.1-SNAPSHOT版本已經不支持該種方式;jvm

1.1.2 initJobHandlerMethodRepository()

private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
    if (applicationContext == null) {
        return;
    }
    // init job handler from method
    String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
    for (String beanDefinitionName : beanDefinitionNames) {
        Object bean = applicationContext.getBean(beanDefinitionName);

        Map<Method, XxlJob> annotatedMethods = null;   
        // referred to : org.springframework.context.event.EventListenerMethodProcessor.processBean
        try {
            annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
                    new MethodIntrospector.MetadataLookup<XxlJob>() {
                        @Override
                        public XxlJob inspect(Method method) {
                            return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
                        }
                    });
        } catch (Throwable ex) {
            logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
        }
        if (annotatedMethods==null || annotatedMethods.isEmpty()) {
            continue;
        }

        for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
            Method method = methodXxlJobEntry.getKey();
            XxlJob xxlJob = methodXxlJobEntry.getValue();
            if (xxlJob == null) {
                continue;
            }

            String name = xxlJob.value();
            if (name.trim().length() == 0) {
                throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
            }
            if (loadJobHandler(name) != null) {
                throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
            }

            // execute method
            if (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {
                throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
                        "The correct method format like \" public ReturnT<String> execute(String param) \" .");
            }
            if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {
                throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
                        "The correct method format like \" public ReturnT<String> execute(String param) \" .");
            }
            method.setAccessible(true);

            // init and destory
            Method initMethod = null;
            Method destroyMethod = null;

            if (xxlJob.init().trim().length() > 0) {
                try {
                    initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
                    initMethod.setAccessible(true);
                } catch (NoSuchMethodException e) {
                    throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
                }
            }
            if (xxlJob.destroy().trim().length() > 0) {
                try {
                    destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
                    destroyMethod.setAccessible(true);
                } catch (NoSuchMethodException e) {
                    throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
                }
            }

            // registry jobhandler 向`ConcurrentMap<String, IJobHandler>`中保存當前定時任務實例。
            registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod));
        }
    }

}

分析:分佈式

  1. 從applicationContext中獲取全部的bean對象;
  2. 利用MethodIntrospector工具類的selectMethods方法和MetadataLookup接口獲得Map<Method, XxlJob>(下邊學習下這個工具類核心方法selectMethods的源碼)
public static <T> Map<Method, T> selectMethods(Class<?> targetType, final MetadataLookup<T> metadataLookup) {
	final Map<Method, T> methodMap = new LinkedHashMap<>();
	Set<Class<?>> handlerTypes = new LinkedHashSet<>();
	Class<?> specificHandlerType = null;
	//判斷是不是代理類
	if (!Proxy.isProxyClass(targetType)) {
		//若是是代理類,找到實際的類型
		specificHandlerType = ClassUtils.getUserClass(targetType);
		handlerTypes.add(specificHandlerType);
	}
	handlerTypes.addAll(ClassUtils.getAllInterfacesForClassAsSet(targetType));
	//遍歷全部找到的class對象
	for (Class<?> currentHandlerType : handlerTypes) {
		final Class<?> targetClass = (specificHandlerType != null ? specificHandlerType : currentHandlerType);
		
		ReflectionUtils.doWithMethods(currentHandlerType, method -> {
			//獲取指定的method
			Method specificMethod = ClassUtils.getMostSpecificMethod(method, targetClass);
			//獲取方法關聯的元數據,通常是指註解
			T result = metadataLookup.inspect(specificMethod);
			if (result != null) {
				Method bridgedMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
				if (bridgedMethod == specificMethod || metadataLookup.inspect(bridgedMethod) == null) {
					methodMap.put(specificMethod, result);
				}
			}
		}, ReflectionUtils.USER_DECLARED_METHODS);
	}

	return methodMap;
}
  1. 循環第二步獲得的Map<Method, XxlJob>,key就是註解的id,value是註解元數據。
  2. 校驗註解元數據的name屬性,若是爲空則拋出異常;
  3. 根據name從內存ConcurrentMap<String, IJobHandler>(這實際上是註冊的時候存的全部任務的倉庫)獲取對應任務實例,若是已經存在,則拋出異常(任務衝突);
  4. 校驗入參,必須爲String param,由於 2.2.1-SNAPSHOT指定了開發Job方法,方式格式要求爲 「public ReturnT< String> execute(String param)」。
  5. 校驗出參,必須是ReturnT< String>格式;
  6. 注入元數據中配置的init()和destroy()方法;
  7. ConcurrentMap<String, IJobHandler>中保存當前定時任務實例。

1.1.3 GlueFactory.refreshInstance(1)

刷新GlueFactory爲 SpringGlueFactory,在執行 glue 模式的任務時使用 spring 來加載相應實例。ide

1.1.4 super.start()

調用XxlJobExecutor.start()工具

1.2 分析核心類XxlJobExecutor

XxlJobExecutor的屬性有:

// ---------------------- param ----------------------
    private String adminAddresses;
    private String accessToken;
    private String appname;
    private String address;
    private String ip;
    private int port;
    private String logPath;
    private int logRetentionDays;

上一步介紹了最終XxlJobSpringExecutor會調用XxlJobExecutorstart()方法,下邊咱們繼續看看這個方法作些什麼:

public void start() throws Exception {

    // init logpath
    XxlJobFileAppender.initLogPath(logPath);

    // init invoker, admin-client
    initAdminBizList(adminAddresses, accessToken);


    // init JobLogFileCleanThread
    JobLogFileCleanThread.getInstance().start(logRetentionDays);

    // init TriggerCallbackThread
    TriggerCallbackThread.getInstance().start();

    // init executor-server
    initEmbedServer(address, ip, port, appname, accessToken);
}

1.2.1 XxlJobFileAppender.initLogPath(logPath)

logPath是咱們配置執行器組件裏的xxl.job.executor.logpath日誌路徑。

public static void initLogPath(String logPath){
	// init
	if (logPath!=null && logPath.trim().length()>0) {
		logBasePath = logPath;
	}
	// mk base dir
	File logPathDir = new File(logBasePath);
	if (!logPathDir.exists()) {
		logPathDir.mkdirs();
	}
	logBasePath = logPathDir.getPath();

	// mk glue dir
	File glueBaseDir = new File(logPathDir, "gluesource");
	if (!glueBaseDir.exists()) {
		glueBaseDir.mkdirs();
	}
	glueSrcPath = glueBaseDir.getPath();
}
  1. 若是配置了日誌路徑,那麼logBasePath就是咱們配置文件裏的地址;
  2. 判斷這個日誌路徑是否存在,若是不存在則建立日誌目錄;
  3. 生成gluesource子文件夾;

1.2.2 initAdminBizList(adminAddresses, accessToken)

// ---------------------- admin-client (rpc invoker) ----------------------
    private static List<AdminBiz> adminBizList;
    private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
        if (adminAddresses!=null && adminAddresses.trim().length()>0) {
            for (String address: adminAddresses.trim().split(",")) {
                if (address!=null && address.trim().length()>0) {

                    AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);

                    if (adminBizList == null) {
                        adminBizList = new ArrayList<AdminBiz>();
                    }
                    adminBizList.add(adminBiz);
                }
            }
        }
    }
    public static List<AdminBiz> getAdminBizList(){
        return adminBizList;
    }

這個方法是根據調度中心部署跟地址adminAddresses和執行器通信TOKENaccessToken初始化AdminBizClient,AdminBizClient這個類有三個核心方法

@Override
    public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
        return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
    }

    @Override
    public ReturnT<String> registry(RegistryParam registryParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
    }

    @Override
    public ReturnT<String> registryRemove(RegistryParam registryParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, timeout, registryParam, String.class);
    }

提供callback(回調)、registry(註冊)以及registryRemove(註冊移除)到調度中心的方法。

1.2.3 JobLogFileCleanThread.getInstance().start(logRetentionDays)

這個方法是初始化日誌清除線程,過時日誌自動清理(清理N天前的日誌文件)。

public class JobLogFileCleanThread {
    private static Logger logger = LoggerFactory.getLogger(JobLogFileCleanThread.class);

    private static JobLogFileCleanThread instance = new JobLogFileCleanThread();
    public static JobLogFileCleanThread getInstance(){
        return instance;
    }

    private Thread localThread;
    private volatile boolean toStop = false;
    public void start(final long logRetentionDays){

        // limit min value
        if (logRetentionDays < 3 ) {
            return;
        }

        localThread = new Thread(new Runnable() {
            @Override
            public void run() {
                while (!toStop) {
                    try {
                        // clean log dir, over logRetentionDays
                        File[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles();
                        if (childDirs!=null && childDirs.length>0) {

                            // today
                            Calendar todayCal = Calendar.getInstance();
                            todayCal.set(Calendar.HOUR_OF_DAY,0);
                            todayCal.set(Calendar.MINUTE,0);
                            todayCal.set(Calendar.SECOND,0);
                            todayCal.set(Calendar.MILLISECOND,0);

                            Date todayDate = todayCal.getTime();

                            for (File childFile: childDirs) {

                                // valid
                                if (!childFile.isDirectory()) {
                                    continue;
                                }
                                if (childFile.getName().indexOf("-") == -1) {
                                    continue;
                                }

                                // file create date
                                Date logFileCreateDate = null;
                                try {
                                    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
                                    logFileCreateDate = simpleDateFormat.parse(childFile.getName());
                                } catch (ParseException e) {
                                    logger.error(e.getMessage(), e);
                                }
                                if (logFileCreateDate == null) {
                                    continue;
                                }

                                if ((todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000) ) {
                                    FileUtil.deleteRecursively(childFile);
                                }

                            }
                        }

                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }

                    }

                    try {
                        TimeUnit.DAYS.sleep(1);
                    } catch (InterruptedException e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, executor JobLogFileCleanThread thread destory.");

            }
        });
        localThread.setDaemon(true);
        localThread.setName("xxl-job, executor JobLogFileCleanThread");
        localThread.start();
    }

    public void toStop() {
        toStop = true;

        if (localThread == null) {
            return;
        }

        // interrupt and wait
        localThread.interrupt();
        try {
            localThread.join();
        } catch (InterruptedException e) {
            logger.error(e.getMessage(), e);
        }
    }

}

咱們主要關注下start()方法:

  1. 執行器組件配置的執行器日誌文件保存天數必須大於3,不然不清理;
  2. 建立一個守護線程,天天執行一次(TimeUnit.DAYS.sleep(1););
  3. 獲取日誌路徑根目錄下的全部日期文件目錄;
  4. 循環判斷當前時間(當天的0時0分0秒0毫秒)和日期目錄對應的"yyyy-MM-dd"時間差值是否大於配置的執行器日誌文件保存天數參數;
(todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000)
  1. 若是超過日誌保存天數,則刪除該時間目錄及其目錄下全部文件。

1.2.4 TriggerCallbackThread.getInstance().start()

public void start() {

    // valid
    if (XxlJobExecutor.getAdminBizList() == null) {
        logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");
        return;
    }

    // callback
    triggerCallbackThread = new Thread(new Runnable() {

        @Override
        public void run() {

            // normal callback
            while(!toStop){
                try {
                    HandleCallbackParam callback = getInstance().callBackQueue.take();
                    if (callback != null) {

                        // callback list param
                        List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                        int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                        callbackParamList.add(callback);

                        // callback, will retry if error
                        if (callbackParamList!=null && callbackParamList.size()>0) {
                            doCallback(callbackParamList);
                        }
                    }
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }

            // last callback
            try {
                List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                if (callbackParamList!=null && callbackParamList.size()>0) {
                    doCallback(callbackParamList);
                }
            } catch (Exception e) {
                if (!toStop) {
                    logger.error(e.getMessage(), e);
                }
            }
            logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory.");

        }
    });
    triggerCallbackThread.setDaemon(true);
    triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread");
    triggerCallbackThread.start();


    // retry
    triggerRetryCallbackThread = new Thread(new Runnable() {
        @Override
        public void run() {
            while(!toStop){
                try {
                    retryFailCallbackFile();
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }

                }
                try {
                    TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                } catch (InterruptedException e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
            logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destory.");
        }
    });
    triggerRetryCallbackThread.setDaemon(true);
    triggerRetryCallbackThread.start();

}

分析:

  1. 校驗有無配置調度中心,沒有則拋出異常;
  2. 新建一個回調守護線程;
  3. 若是執行器正常運行,則從任務執行結果回調隊列LinkedBlockingQueue<HandleCallbackParam> callBackQueue中獲取一個回調入參對象
    HandleCallbackParam callback = getInstance().callBackQueue.take();
    若是callback不爲空,則用drainTo()方法批量獲取回調入參集合,調用doCallback(callbackParamList)方法;
  4. 若是執行器終止,則直接用drainTo()方法批量獲取回調隊列裏的回調入參集合,若是回調入參集合不爲空則調用doCallback(callbackParamList)方法;
  5. 分析下doCallback(callbackParamList)方法:
    private void doCallback(List<HandleCallbackParam> callbackParamList){
            boolean callbackRet = false;
            // callback, will retry if error
            for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                try {
                    ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
                    if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
                        callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");
                        callbackRet = true;
                        break;
                    } else {
                        callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult);
                    }
                } catch (Exception e) {
                    callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage());
                }
            }
            if (!callbackRet) {
                appendFailCallbackFile(callbackParamList);
            }
        }
    循環全部的AdminBiz,調用callback(callbackParamList)方法執行回調,調用callbackLog()方法記錄當前任務執行日誌並生成日誌文件,若是發生異常則調用appendFailCallbackFile(callbackParamList)方法。
    private void appendFailCallbackFile(List<HandleCallbackParam> callbackParamList){
        // valid
        if (callbackParamList==null || callbackParamList.size()==0) {
            return;
        }
    
        // append file
        byte[] callbackParamList_bytes = JdkSerializeTool.serialize(callbackParamList);
    
        File callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis())));
        if (callbackLogFile.exists()) {
            for (int i = 0; i < 100; i++) {
                callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis()).concat("-").concat(String.valueOf(i)) ));
                if (!callbackLogFile.exists()) {
                    break;
                }
            }
        }
        FileUtil.writeFileContent(callbackLogFile, callbackParamList_bytes);
    }
    • 若是回調入參集合不爲空,使用JdkSerializeTool工具類,將集合序列化爲byte[];
    • 在日誌根目錄下建立callbacklog目錄,生成回調失敗記錄文件xxl-job-callback-{x}.log,其中{x}爲當前時間戳;
    • 判斷當前文件是否存在,若是存在則生成xxl-job-callback-{x}-i.log(i爲0~100的數值)
    • 將byte[]寫入回調失敗記錄文件中。
  6. 新建一個回調重試守護線程,每30秒執行一次
    TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
  7. 當執行器還在運行時,回調重試守護線程調用retryFailCallbackFile()方法。
    private void retryFailCallbackFile(){
    
        // valid
        File callbackLogPath = new File(failCallbackFilePath);
        if (!callbackLogPath.exists()) {
            return;
        }
        if (callbackLogPath.isFile()) {
            callbackLogPath.delete();
        }
        if (!(callbackLogPath.isDirectory() && callbackLogPath.list()!=null && callbackLogPath.list().length>0)) {
            return;
        }
    
        // load and clear file, retry
        for (File callbaclLogFile: callbackLogPath.listFiles()) {
            byte[] callbackParamList_bytes = FileUtil.readFileContent(callbaclLogFile);
            List<HandleCallbackParam> callbackParamList = (List<HandleCallbackParam>) JdkSerializeTool.deserialize(callbackParamList_bytes, List.class);
    
            callbaclLogFile.delete();
            doCallback(callbackParamList);
        }
    
    }
    分析:
    • 判斷回調失敗日誌記錄目錄是否存在,不存在則跳出方法;
    • 若是回調失敗日誌記錄目錄下沒有子文件則跳出方法;
    • 循環每一個子文件,使用JdkSerializeTool工具類將文件中讀出的byte[]轉爲回調入參對象集合List< HandleCallbackParam>,刪除對應日誌記錄文件,調用doCallback(callbackParamList)方法執行回調。

1.2.5 initEmbedServer(address, ip, port, appname, accessToken)

// ---------------------- executor-server (rpc provider) ----------------------
    private EmbedServer embedServer = null;

    private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {

        // fill ip port
        port = port>0?port: NetUtil.findAvailablePort(9999);
        ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();

        // generate address
        if (address==null || address.trim().length()==0) {
            String ip_port_address = IpUtil.getIpPort(ip, port);   // registry-address:default use address to registry , otherwise use ip:port if address is null
            address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
        }

        // start
        embedServer = new EmbedServer();
        embedServer.start(address, port, appname, accessToken);
    }

    private void stopEmbedServer() {
        // stop provider factory
        try {
            embedServer.stop();
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }

分析:

  1. 初始化執行器IP 地址與端口,若是IP沒配置則默認取當前服務的地址,若是端口沒配置則默認爲9999;
  2. 生成服務調用地址"http://{ip_port}/"
  3. 調用embedServer.start(address, port, appname, accessToken)方法。

1.3 分析EmbedServer.start(address, port, appname, accessToken)

public void start(final String address, final int port, final String appname, final String accessToken) {
    executorBiz = new ExecutorBizImpl();
    thread = new Thread(new Runnable() {

        @Override
        public void run() {

            // param
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
                    0,
                    200,
                    60L,
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>(2000),
                    new ThreadFactory() {
                        @Override
                        public Thread newThread(Runnable r) {
                            return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());
                        }
                    },
                    new RejectedExecutionHandler() {
                        @Override
                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                            throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
                        }
                    });


            try {
                // start server
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel channel) throws Exception {
                                channel.pipeline()
                                        .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                                        .addLast(new HttpServerCodec())
                                        .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                                        .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                            }
                        })
                        .childOption(ChannelOption.SO_KEEPALIVE, true);

                // bind
                ChannelFuture future = bootstrap.bind(port).sync();

                logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);

                // start registry
                startRegistry(appname, address);

                // wait util stop
                future.channel().closeFuture().sync();

            } catch (InterruptedException e) {
                if (e instanceof InterruptedException) {
                    logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
                } else {
                    logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
                }
            } finally {
                // stop
                try {
                    workerGroup.shutdownGracefully();
                    bossGroup.shutdownGracefully();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }

        }

    });
    thread.setDaemon(true);	// daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
    thread.start();
}

1.3.1 ExecutorBiz調度業務的實現類ExecutorBizImpl

定義了任務的心跳檢測、忙碌檢測、觸發任務、終止任務以及查看執行日誌的方法

1.3.1.1 心跳檢測

@Override
    public ReturnT<String> beat() {
        return ReturnT.SUCCESS;
    }

1.3.1.2 忙碌檢測

@Override
    public ReturnT<String> idleBeat(IdleBeatParam idleBeatParam) {

        // isRunningOrHasQueue
        boolean isRunningOrHasQueue = false;
        JobThread jobThread = XxlJobExecutor.loadJobThread(idleBeatParam.getJobId());
        if (jobThread != null && jobThread.isRunningOrHasQueue()) {
            isRunningOrHasQueue = true;
        }

        if (isRunningOrHasQueue) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "job thread is running or has trigger queue.");
        }
        return ReturnT.SUCCESS;
    }

分析:

  1. 根據jobId從任務線程倉庫ConcurrentMap<Integer, JobThread> jobThreadRepository中獲取對應的任務線程;
  2. 若是任務線程對象不爲空且正在運行或者正在隊列中則認爲處於忙碌狀態。

1.3.1.3 觸發任務

@Override
public ReturnT<String> run(TriggerParam triggerParam) {
    // load old:jobHandler + jobThread
    JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
    IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
    String removeOldReason = null;

    // valid:jobHandler + jobThread
    GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
    if (GlueTypeEnum.BEAN == glueTypeEnum) {

        // new jobhandler
        IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());

        // valid old jobThread
        if (jobThread!=null && jobHandler != newJobHandler) {
            // change handler, need kill old thread
            removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";

            jobThread = null;
            jobHandler = null;
        }

        // valid handler
        if (jobHandler == null) {
            jobHandler = newJobHandler;
            if (jobHandler == null) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
            }
        }

    } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {

        // valid old jobThread
        if (jobThread != null &&
                !(jobThread.getHandler() instanceof GlueJobHandler
                    && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
            // change handler or gluesource updated, need kill old thread
            removeOldReason = "change job source or glue type, and terminate the old job thread.";

            jobThread = null;
            jobHandler = null;
        }

        // valid handler
        if (jobHandler == null) {
            try {
                IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
                jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
            }
        }
    } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {

        // valid old jobThread
        if (jobThread != null &&
                !(jobThread.getHandler() instanceof ScriptJobHandler
                        && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
            // change script or gluesource updated, need kill old thread
            removeOldReason = "change job source or glue type, and terminate the old job thread.";

            jobThread = null;
            jobHandler = null;
        }

        // valid handler
        if (jobHandler == null) {
            jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
        }
    } else {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
    }

    // executor block strategy
    if (jobThread != null) {
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
        if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
            // discard when running
            if (jobThread.isRunningOrHasQueue()) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
            }
        } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
            // kill running jobThread
            if (jobThread.isRunningOrHasQueue()) {
                removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                jobThread = null;
            }
        } else {
            // just queue trigger
        }
    }

    // replace thread (new or exists invalid)
    if (jobThread == null) {
        jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
    }

    // push data to queue
    ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
    return pushResult;
}

分析:

  1. 根據jobId從任務線程倉庫ConcurrentMap<Integer, JobThread> jobThreadRepository中獲取對應的任務線程;
  2. 根據任務類型校驗任務線程和任務具體處理實例;(若是是"BEAN"類型的任務,根據任務處理實例id從ConcurrentMap<String, IJobHandler> jobHandlerRepository獲取到具體的任務處理實例,若是任務線程不爲空而且任務具體實例和從任務線程中拿到的任務實例不一樣則必須更改任務類型而且終止以前的任務線程)
  3. 若是任務線程不爲空,獲取任務的阻塞處理策略(若是策略是丟棄後續調度,則本次請求將會被丟棄並返回爲失敗;若是策略是覆蓋以前調度,則把任務線程置爲null,移除緣由置爲「xxxxxx」);
  4. 若是任務線程爲null,調用XxlJobExecutor.registJobThread()進行保存;
    public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
        JobThread newJobThread = new JobThread(jobId, handler);
        newJobThread.start();
        logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});
    
        JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);	// putIfAbsent | oh my god, map's put method return the old value!!!
        if (oldJobThread != null) {
            oldJobThread.toStop(removeOldReason);
            oldJobThread.interrupt();
        }
    
        return newJobThread;
    }
    新建一個JobThread;判斷任務線程倉庫ConcurrentMap<Integer, JobThread> jobThreadRepository中是否已經存在舊值,若是存在則銷燬對象,中斷線程。
  5. 調用pushTriggerQueue()方法將入參放入任務線程的執行隊列中。
    public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
    	// avoid repeat
    	if (triggerLogIdSet.contains(triggerParam.getLogId())) {
    		logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
    		return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
    	}
    
    	triggerLogIdSet.add(triggerParam.getLogId());
    	triggerQueue.add(triggerParam);
           return ReturnT.SUCCESS;
    }
    根據日誌記錄id判斷任務是否重複執行

1.3.1.4 終止任務

@Override
public ReturnT<String> kill(KillParam killParam) {
    // kill handlerThread, and create new one
    JobThread jobThread = XxlJobExecutor.loadJobThread(killParam.getJobId());
    if (jobThread != null) {
        XxlJobExecutor.removeJobThread(killParam.getJobId(), "scheduling center kill job.");
        return ReturnT.SUCCESS;
    }

    return new ReturnT<String>(ReturnT.SUCCESS_CODE, "job thread already killed.");
}

根據jobId獲取任務線程對象,調用removeJobThread()方法終止線程。

public static JobThread removeJobThread(int jobId, String removeOldReason){
    JobThread oldJobThread = jobThreadRepository.remove(jobId);
    if (oldJobThread != null) {
        oldJobThread.toStop(removeOldReason);
        oldJobThread.interrupt();

        return oldJobThread;
    }
    return null;
}

1.3.1.5 查看執行日誌

@Override
public ReturnT<LogResult> log(LogParam logParam) {
    // log filename: logPath/yyyy-MM-dd/9999.log
    String logFileName = XxlJobFileAppender.makeLogFileName(new Date(logParam.getLogDateTim()), logParam.getLogId());

    LogResult logResult = XxlJobFileAppender.readLog(logFileName, logParam.getFromLineNum());
    return new ReturnT<LogResult>(logResult);
}

1.3.2 建立一個守護線程供調度中心調用

1.3.2.1 暴露一個任務調度的RPC服務提供給調度中心調用

使用netty_http啓動http 服務,綁定你傳入設置執行器的端口。經過 EmbedHttpServerHandler來處理調度中心調度執行器中的任務

1.3.2.2 註冊當前執行器的地址到調度中心

startRegistry(appname, address)

public void startRegistry(final String appname, final String address) {
    // start registry
    ExecutorRegistryThread.getInstance().start(appname, address);
}
1.3.2.2.1 分析ExecutorRegistryThread類
public void start(final String appname, final String address){

    // valid
    if (appname==null || appname.trim().length()==0) {
        logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appname is null.");
        return;
    }
    if (XxlJobExecutor.getAdminBizList() == null) {
        logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");
        return;
    }

    registryThread = new Thread(new Runnable() {
        @Override
        public void run() {

            // registry
            while (!toStop) {
                try {
                    RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
                    for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                        try {
                            ReturnT<String> registryResult = adminBiz.registry(registryParam);
                            if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                                registryResult = ReturnT.SUCCESS;
                                logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                break;
                            } else {
                                logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                            }
                        } catch (Exception e) {
                            logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
                        }

                    }
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }

                }

                try {
                    if (!toStop) {
                        TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                    }
                } catch (InterruptedException e) {
                    if (!toStop) {
                        logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
                    }
                }
            }

            // registry remove
            try {
                RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
                for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                    try {
                        ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
                        if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                            registryResult = ReturnT.SUCCESS;
                            logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                            break;
                        } else {
                            logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                        }
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
                        }

                    }

                }
            } catch (Exception e) {
                if (!toStop) {
                    logger.error(e.getMessage(), e);
                }
            }
            logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory.");

        }
    });
    registryThread.setDaemon(true);
    registryThread.setName("xxl-job, executor ExecutorRegistryThread");
    registryThread.start();
}
  1. 校驗執行器配置appname和調度中心地址adminAddresses(不能爲空);
  2. 建立一個註冊守護線程registryThread(每30秒註冊一次);
  3. 運行狀態下,循環執行器配置的全部AdminBizClient,調用registry()方法發起註冊;
  4. 服務下線後,調用registryRemove()方法發起註冊移除。
相關文章
相關標籤/搜索