正如第一篇文章所看到的,在咱們啓動一個執行器以後,咱們會在一個延遲時間以後在調度中心看到這個註冊上來的執行器,那在XXL-JOB框架中是如何實現的呢?咱們先來看執行器這邊。java
咱們在執行器端配置了個Bean,以下。web
@Bean(initMethod = "start", destroyMethod = "destroy")
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppName(appName);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
複製代碼
顯而易見,咱們直奔XxlJobSpringExecutor類的start方法。spring
@Override
public void start() throws Exception {
// init JobHandler Repository
initJobHandlerRepository(applicationContext);
// refresh GlueFactory
GlueFactory.refreshInstance(1);
// super start
super.start();
}
複製代碼
第一步初始化JobHandler的數據,代碼很是簡單,從spring的上下文中取出JobHandler接口的所有實現類,而後調用registJobHandler方法,傳入JobHandler註解配置的value值和bean實例。api
private void initJobHandlerRepository(ApplicationContext applicationContext){
if (applicationContext == null) {
return;
}
// init job handler action
Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class);
if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
for (Object serviceBean : serviceBeanMap.values()) {
if (serviceBean instanceof IJobHandler){
String name = serviceBean.getClass().getAnnotation(JobHandler.class).value();
IJobHandler handler = (IJobHandler) serviceBean;
if (loadJobHandler(name) != null) {
throw new RuntimeException("xxl-job jobhandler naming conflicts.");
}
registJobHandler(name, handler);
}
}
}
}
複製代碼
registJobHandler方法爲XxlJobSpringExecutor的父類XxlJobExecutor實現,代碼以下多線程
// ---------------------- job handler repository ----------------------
private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
return jobHandlerRepository.put(name, jobHandler);
}
public static IJobHandler loadJobHandler(String name){
return jobHandlerRepository.get(name);
}
複製代碼
嗯,所謂的初始化JobHandler就是以JobHandler註解配置的value值爲key,bean實例爲value放入XxlJobExecutor的靜態集合類jobHandlerRepository中。app
第二步就是初始化執行glue腳本的工廠類,不細說。框架
第三步是才重點,咱們來看父類的start方法到底幹了什麼。ide
public void start() throws Exception {
// 日誌相關
XxlJobFileAppender.initLogPath(logPath);
// 建立調度中心接口代理類
initAdminBizList(adminAddresses, accessToken);
// 日誌相關
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// 結果回調
TriggerCallbackThread.getInstance().start();
// 啓動執行器服務
port = port>0?port: NetUtil.findAvailablePort(9999);
ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
initRpcProvider(ip, port, appName, accessToken);
}
複製代碼
咱們來看建立調度中心接口代理類和啓動執行器服務的代碼實現。學習
// ---------------------- admin-client (rpc invoker) ----------------------
private static List<AdminBiz> adminBizList;
private static Serializer serializer;
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception{
serializer = Serializer.SerializeEnum.HESSIAN.getSerializer();
if (adminAddresses!=null && adminAddresses.trim().length()>0) {
for (String address: adminAddresses.trim().split(",")) {
if (address!=null && address.trim().length()>0) {
//AdminBiz.MAPPING等於"/api"
String addressUrl = address.concat(AdminBiz.MAPPING);
AdminBiz adminBiz = (AdminBiz) new XxlRpcReferenceBean(
NetEnum.NETTY_HTTP,
serializer,
CallType.SYNC,
LoadBalance.ROUND,
AdminBiz.class,
null,
3000,
addressUrl,
accessToken,
null,
null
).getObject();
if (adminBizList == null) {
adminBizList = new ArrayList<AdminBiz>();
}
adminBizList.add(adminBiz);
}
}
}
}
複製代碼
利用XxlRpcReferenceBean建立了AdminBiz接口的多個代理類(調度中心支持集羣部署,多地址),放入靜態集合adminBizList中,XxlRpcReferenceBean這個類是他兄弟XXL-RPC框架的,熟悉RPC知識的同窗應該明白,這裏返回的是他的動態代理類,而後對方法調用進行攔截處理,組裝參數發起遠程調用。來看AdminBiz接口提供了哪些方法。this
/**
* @author xuxueli 2017-07-27 21:52:49
*/
public interface AdminBiz {
public static final String MAPPING = "/api";
/**
* 任務執行結果回調
*
* @param callbackParamList
* @return
*/
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList);
/**
* 註冊
*
* @param registryParam
* @return
*/
public ReturnT<String> registry(RegistryParam registryParam);
/**
* 註銷
*
* @param registryParam
* @return
*/
public ReturnT<String> registryRemove(RegistryParam registryParam);
}
複製代碼
好,如今咱們利用XxlJobExecutor中的adminBizList,就能夠完成執行器的任務結果回調,註冊和註銷操做了,那咱們是在哪裏調用的呢?咱們去啓動執行器服務這一步一探究竟。
private void initRpcProvider(String ip, int port, String appName, String accessToken){
// init, provider factory
String address = IpUtil.getIpPort(ip, port);
Map<String, String> serviceRegistryParam = new HashMap<String, String>();
serviceRegistryParam.put("appName", appName);
serviceRegistryParam.put("address", address);
xxlRpcProviderFactory = new XxlRpcProviderFactory();
xxlRpcProviderFactory.initConfig(NetEnum.NETTY_HTTP, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port, accessToken, ExecutorServiceRegistry.class, serviceRegistryParam);
// add services
// 向Provider新增一個服務
xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());
// start
// Provider 啓動
xxlRpcProviderFactory.start();
}
複製代碼
若是咱們把任務調度這個動做當作RPC來講,那執行器至關於服務的提供端(完成任務的執行),調度中心至關於服務的消費端(負責任務的調用),因此咱們這裏建立了一個RpcProviderFactory,而後直接看他啓動時幹了什麼。
public void start() throws Exception {
// start server
serviceAddress = IpUtil.getIpPort(this.ip, port);
server = netType.serverClass.newInstance();
server.setStartedCallback(new BaseCallback() { // serviceRegistry started
@Override
public void run() throws Exception {
// 註冊
if (serviceRegistryClass != null) {
serviceRegistry = serviceRegistryClass.newInstance();
serviceRegistry.start(serviceRegistryParam);
if (serviceData.size() > 0) {
serviceRegistry.registry(serviceData.keySet(), serviceAddress);
}
}
}
});
server.setStopedCallback(new BaseCallback() { // serviceRegistry stoped
@Override
public void run() {
// 註銷
if (serviceRegistry != null) {
if (serviceData.size() > 0) {
serviceRegistry.remove(serviceData.keySet(), serviceAddress);
}
serviceRegistry.stop();
serviceRegistry = null;
}
}
});
server.start(this);
}
複製代碼
調用serviceRegistry的start方法完成註冊,他的實現類是誰?回到上一步看initConfig方法的傳參,是ExecutorServiceRegistry,它的start方法以下。
public void start(Map<String, String> param) {
//利用線程池完成註冊操做
ExecutorRegistryThread.getInstance().start(param.get("appName"),param.get("address"));
}
複製代碼
跟進去發現向線程池registryThread提交了個任務,操做以下
public void run() {
while (!toStop) {
try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> registryResult = adminBiz.registry(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
//相關日誌打印;
break;
} else {
//相關日誌打印;
}
} catch (Exception e) {
//相關日誌打印;
}
}
} catch (Exception e) {
if (!toStop) {
//相關日誌打印;
}
}
try {
if (!toStop) {
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
}
} catch (InterruptedException e) {
if (!toStop) {
//相關日誌打印;
}
}
}
// registry remove
try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
//相關日誌打印;
break;
} else {
//相關日誌打印;
}
} catch (Exception e) {
if (!toStop) {
//相關日誌打印;
}
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
//相關日誌打印;
}
複製代碼
其中toStop爲volatile修飾的變量,可保證多線程執行下的可見性,此處起到跳轉註銷操做的做用。
接下來咱們進入循環,遍歷XxlJobExecutor的adminBizList,取出每個AdminBiz代理類調用registry方法進行多調度中心的註冊,時序圖以下:
總體時序圖以下:
通過咱們上文的分析,調度中心接受註冊信息的關鍵在於兩個,第一, AdminBiz 實現類,第二,服務暴露。
實現類AdminBizImpl在com.xxl.job.admin.service.impl包下,註冊和註銷代碼以下:
@Override
public ReturnT<String> registry(RegistryParam registryParam) {
int ret = xxlJobRegistryDao.registryUpdate(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());
if (ret < 1) {
xxlJobRegistryDao.registrySave(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());
}
return ReturnT.SUCCESS;
}
@Override
public ReturnT<String> registryRemove(RegistryParam registryParam) {
xxlJobRegistryDao.registryDelete(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());
return ReturnT.SUCCESS;
}
複製代碼
代碼很是簡單,咱們如今重點去看他是怎麼暴露出去的。
和執行器的思路同樣,咱們直接看調度中心的conf包下有啥玩意。
XxlJobAdminConfig:將一些配置值、dao、service利用InitializingBean的機制轉存給靜態變量中,以後的操做很是方便,值得學習。
XxlJobScheduler:重點,調度中心業務邏輯啓動的入口。
@Override
public void afterPropertiesSet() throws Exception {
// 國際化相關
initI18n();
// 執行器在線狀態監聽
JobRegistryMonitorHelper.getInstance().start();
// 失敗任務重試
JobFailMonitorHelper.getInstance().start();
// 暴露調度中心服務,點進去
initRpcProvider();
// 任務調度
JobScheduleHelper.getInstance().start();
logger.info(">>>>>>>>> init xxl-job admin success.");
}
複製代碼
private void initRpcProvider(){
XxlRpcProviderFactory xxlRpcProviderFactory = new XxlRpcProviderFactory();
xxlRpcProviderFactory.initConfig(
NetEnum.NETTY_HTTP,
Serializer.SerializeEnum.HESSIAN.getSerializer(),
null,
0,
XxlJobAdminConfig.getAdminConfig().getAccessToken(),
null,
null);
// 將AdminBizImpl添加至xxlRpcProviderFactory中
xxlRpcProviderFactory.addService(AdminBiz.class.getName(), null, XxlJobAdminConfig.getAdminConfig().getAdminBiz());
// servlet handler
servletServerHandler = new ServletServerHandler(xxlRpcProviderFactory);
}
private void stopRpcProvider() throws Exception {
XxlRpcInvokerFactory.getInstance().stop();
}
public static void invokeAdminService(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
servletServerHandler.handle(null, request, response);
}
複製代碼
initRpcProvider方法的代碼很熟悉,和執行器端的服務暴露同樣,可是調度中心AdminBiz並無用 xxlRpcProviderFactory.start()暴露,而是直接建立了一個ServletServerHandler,那他是怎麼被調用的?看invokeAdminService是啥時候被調用的不就好了!利用IDEA的快捷鍵,找到以下代碼:
@Controller
public class JobApiController implements InitializingBean {
@Override
public void afterPropertiesSet() throws Exception {
}
//AdminBiz.MAPPING等於"/api"
@RequestMapping(AdminBiz.MAPPING)
@PermissionLimit(limit=false)
public void api(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
XxlJobScheduler.invokeAdminService(request, response);
}
}
複製代碼
至此,調度中心的接收註冊和註銷的代碼實現以及分析完了,時序圖以下:
本篇分析了調度中心和執行器之間的註冊心跳是如何實現的,以後的兩篇是這個調度框架比較重要的業務邏輯,咱們來看看任務的調度和分發是如何實現的。
喜歡的能夠關注個人公衆號「江飛傑」第一時間閱讀(會更新的比較快),裏面也有本身的一些和技術無關的讀書筆記與生活隨感,歡迎你們來關注。