做者:小傅哥
博客:https://bugstack.cnhtml
沉澱、分享、成長,讓本身和他人都能有所收穫!😄
@SpringBootApplication @EnableScheduling public class Application{ public static void mian(String[] args){ SpringApplication.run(Application.class,args); } @Scheduled(cron = "0/3 * * * * *") public void demoTask() { //... } }
咔咔,上面這段代碼很熟悉吧,他就是SpringBoot的Schedule定時任務,簡單易用。在咱們開發中若是須要作一些定時或指定時刻循環執行邏輯時候,基本都會使用到Schedule。java
可是,若是咱們的任務是比較大型的,好比;定時跑批T+1結算、商品秒殺前狀態變動、刷新數據預熱到緩存等等,這些定時任務都相同的特色;做業量大、實時性強、可用率高。而這時候若是隻是單純使用Schedule就顯得不足以控制。node
那麼,咱們產品需求就出來了,分佈式DcsSchedule任務;git
嗯?有人憋半天了想說能夠用Quertz,嗯能夠的,但這不是本篇文章的重點。難道你不想看看一個自言開源中間件是怎麼誕生的嗎,怎麼推到中心Maven倉的嗎?好比下圖;真香不!程序員
首頁監控
任務列表
github
😀好了,接下來開始介紹這個中間件如何使用和怎麼開發的了!面試
版本 | 發佈日期 | 備註 | |
---|---|---|---|
1 | 1.0.0-RELEASE | 2019-12-07 | 基本功能實現;任務接入、分佈式啓停 |
2 | 2019-12-07 | 上傳測試版本 |
配置中心zookeeper 3.4.14 {準備好zookeeper服務,若是windows調試能夠從這裏下載:https://www-eu.apache.org/dis...}spring
修改conf/zoo.cfg,修改配置以下;apache
dataDir=D:\\Program Files\\apache-zookeeper-3.4.14\\data dataLogDir=D:\\Program Files\\apache-zookeeper-3.4.14\\logs
打包部署控制平臺windows
<dependency> <groupId>org.itstack.middleware</groupId> <artifactId>schedule-spring-boot-starter</artifactId> <version>1.0.0-RELEASE</version> </dependency>
@SpringBootApplication @EnableDcsScheduling public class HelloWorldApplication { public static void main(String[] args) { SpringApplication.run(HelloWorldApplication.class, args); } }
@Component("demoTaskThree") public class DemoTaskThree { @DcsScheduled(cron = "0 0 9,13 * * *", desc = "03定時任務執行測試:taskMethod01", autoStartup = false) public void taskMethod01() { System.out.println("03定時任務執行測試:taskMethod01"); } @DcsScheduled(cron = "0 0/30 8-10 * * *", desc = "03定時任務執行測試:taskMethod02", autoStartup = false) public void taskMethod02() { System.out.println("03定時任務執行測試:taskMethod02"); } }
以SpringBoot爲基礎開發一款中間件我也是第一次,由於接觸SpringBoot也剛剛1個月左右。雖然SpringBoot已經出來挺久的了,但因爲咱們項目開發並不使用SpringBoot的一套東西,因此一直依賴沒有接觸。直到上個月開始考慮領域驅動設計才接觸,嗯!真的不錯,那麼就開始了夯實技能、學習思想用到項目裏。
按照個人產品需求,開發這麼一款分佈式任務的中間件,我腦殼中的模型已經存在了。另外就是須要開發過程當中去探索我須要的知識工具,簡單包括;
schedule-spring-boot-starter └── src ├── main │ ├── java │ │ └── org.itstack.middleware.schedule │ │ ├── annotation │ │ │ ├── DcsScheduled.java │ │ │ └── EnableDcsScheduling.java │ │ ├── annotation │ │ │ └── InstructStatus.java │ │ ├── config │ │ │ ├── DcsSchedulingConfiguration.java │ │ │ ├── StarterAutoConfig.java │ │ │ └── StarterServiceProperties.java │ │ ├── domain │ │ │ ├── DataCollect.java │ │ │ ├── DcsScheduleInfo.java │ │ │ ├── DcsServerNode.java │ │ │ ├── ExecOrder.java │ │ │ └── Instruct.java │ │ ├── export │ │ │ └── DcsScheduleResource.java │ │ ├── service │ │ │ ├── HeartbeatService.java │ │ │ └── ZkCuratorServer.java │ │ ├── task │ │ │ ├── TaskScheduler.java │ │ │ ├── ScheduledTask.java │ │ │ ├── SchedulingConfig.java │ │ │ └── SchedulingRunnable.java │ │ ├── util │ │ │ └── StrUtil.java │ │ └── DoJoinPoint.java │ └── resources │ └── META_INF │ └── spring.factories └── test └── java └── org.itstack.demo.test └── ApiTest.java
annotation/EnableDcsScheduling.java & 自定義註解
這個註解一堆的圈A,這些配置都是爲了開始啓動執行咱們的中間件;
@Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Import({DcsSchedulingConfiguration.class}) @ImportAutoConfiguration({SchedulingConfig.class, CronTaskRegister.class, DoJoinPoint.class}) @ComponentScan("org.itstack.middleware.*") public @interface EnableDcsScheduling { }
config/DcsSchedulingConfiguration.java & 初始化配置/服務、啓動任務、掛在節點
@Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean); if (this.nonAnnotatedClasses.contains(targetClass)) return bean; Method[] methods = ReflectionUtils.getAllDeclaredMethods(bean.getClass()); if (methods == null) return bean; for (Method method : methods) { DcsScheduled dcsScheduled = AnnotationUtils.findAnnotation(method, DcsScheduled.class); if (null == dcsScheduled || 0 == method.getDeclaredAnnotations().length) continue; List<ExecOrder> execOrderList = Constants.execOrderMap.computeIfAbsent(beanName, k -> new ArrayList<>()); ExecOrder execOrder = new ExecOrder(); execOrder.setBean(bean); execOrder.setBeanName(beanName); execOrder.setMethodName(method.getName()); execOrder.setDesc(dcsScheduled.desc()); execOrder.setCron(dcsScheduled.cron()); execOrder.setAutoStartup(dcsScheduled.autoStartup()); execOrderList.add(execOrder); this.nonAnnotatedClasses.add(targetClass); } return bean; }
private void init_server(ApplicationContext applicationContext) { try { //獲取zk鏈接 CuratorFramework client = ZkCuratorServer.getClient(Constants.Global.zkAddress); //節點組裝 path_root_server = StrUtil.joinStr(path_root, LINE, "server", LINE, schedulerServerId); path_root_server_ip = StrUtil.joinStr(path_root_server, LINE, "ip", LINE, Constants.Global.ip); //建立節點&遞歸刪除本服務IP下的舊內容 ZkCuratorServer.deletingChildrenIfNeeded(client, path_root_server_ip); ZkCuratorServer.createNode(client, path_root_server_ip); ZkCuratorServer.setData(client, path_root_server, schedulerServerName); //添加節點&監聽 ZkCuratorServer.createNodeSimple(client, Constants.Global.path_root_exec); ZkCuratorServer.addTreeCacheListener(applicationContext, client, Constants.Global.path_root_exec); } catch (Exception e) { logger.error("itstack middleware schedule init server error!", e); throw new RuntimeException(e); } }
private void init_task(ApplicationContext applicationContext) { CronTaskRegister cronTaskRegistrar = applicationContext.getBean("itstack-middlware-schedule-cronTaskRegister", CronTaskRegister.class); Set<String> beanNames = Constants.execOrderMap.keySet(); for (String beanName : beanNames) { List<ExecOrder> execOrderList = Constants.execOrderMap.get(beanName); for (ExecOrder execOrder : execOrderList) { if (!execOrder.getAutoStartup()) continue; SchedulingRunnable task = new SchedulingRunnable(execOrder.getBean(), execOrder.getBeanName(), execOrder.getMethodName()); cronTaskRegistrar.addCronTask(task, execOrder.getCron()); } } }
private void init_node() throws Exception { Set<String> beanNames = Constants.execOrderMap.keySet(); for (String beanName : beanNames) { List<ExecOrder> execOrderList = Constants.execOrderMap.get(beanName); for (ExecOrder execOrder : execOrderList) { String path_root_server_ip_clazz = StrUtil.joinStr(path_root_server_ip, LINE, "clazz", LINE, execOrder.getBeanName()); String path_root_server_ip_clazz_method = StrUtil.joinStr(path_root_server_ip_clazz, LINE, "method", LINE, execOrder.getMethodName()); String path_root_server_ip_clazz_method_status = StrUtil.joinStr(path_root_server_ip_clazz, LINE, "method", LINE, execOrder.getMethodName(), "/status"); //添加節點 ZkCuratorServer.createNodeSimple(client, path_root_server_ip_clazz); ZkCuratorServer.createNodeSimple(client, path_root_server_ip_clazz_method); ZkCuratorServer.createNodeSimple(client, path_root_server_ip_clazz_method_status); //添加節點數據[臨時] ZkCuratorServer.appendPersistentData(client, path_root_server_ip_clazz_method + "/value", JSON.toJSONString(execOrder)); //添加節點數據[永久] ZkCuratorServer.setData(client, path_root_server_ip_clazz_method_status, execOrder.getAutoStartup() ? "1" : "0"); } } }
service/ZkCuratorServer.java & zk服務
public static void addTreeCacheListener(final ApplicationContext applicationContext, final CuratorFramework client, String path) throws Exception { TreeCache treeCache = new TreeCache(client, path); treeCache.start(); treeCache.getListenable().addListener((curatorFramework, event) -> { //... switch (event.getType()) { case NODE_ADDED: case NODE_UPDATED: if (Constants.Global.ip.equals(instruct.getIp()) && Constants.Global.schedulerServerId.equals(instruct.getSchedulerServerId())) { //執行命令 Integer status = instruct.getStatus(); switch (status) { case 0: //中止任務 cronTaskRegistrar.removeCronTask(instruct.getBeanName() + "_" + instruct.getMethodName()); setData(client, path_root_server_ip_clazz_method_status, "0"); logger.info("itstack middleware schedule task stop {} {}", instruct.getBeanName(), instruct.getMethodName()); break; case 1: //啓動任務 cronTaskRegistrar.addCronTask(new SchedulingRunnable(scheduleBean, instruct.getBeanName(), instruct.getMethodName()), instruct.getCron()); setData(client, path_root_server_ip_clazz_method_status, "1"); logger.info("itstack middleware schedule task start {} {}", instruct.getBeanName(), instruct.getMethodName()); break; case 2: //刷新任務 cronTaskRegistrar.removeCronTask(instruct.getBeanName() + "_" + instruct.getMethodName()); cronTaskRegistrar.addCronTask(new SchedulingRunnable(scheduleBean, instruct.getBeanName(), instruct.getMethodName()), instruct.getCron()); setData(client, path_root_server_ip_clazz_method_status, "1"); logger.info("itstack middleware schedule task refresh {} {}", instruct.getBeanName(), instruct.getMethodName()); break; } } break; case NODE_REMOVED: break; default: break; } }); }
public void addCronTask(SchedulingRunnable task, String cronExpression) { if (null != Constants.scheduledTasks.get(task.taskId())) { removeCronTask(task.taskId()); } CronTask cronTask = new CronTask(task, cronExpression); Constants.scheduledTasks.put(task.taskId(), scheduleCronTask(cronTask)); } public void removeCronTask(String taskId) { ScheduledTask scheduledTask = Constants.scheduledTasks.remove(taskId); if (scheduledTask == null) return; scheduledTask.cancel(); }
@Pointcut("@annotation(org.itstack.middleware.schedule.annotation.DcsScheduled)") public void aopPoint() { } @Around("aopPoint()") public Object doRouter(ProceedingJoinPoint jp) throws Throwable { long begin = System.currentTimeMillis(); Method method = getMethod(jp); try { return jp.proceed(); } finally { long end = System.currentTimeMillis(); logger.info("\nitstack middleware schedule method:{}.{} take time(m):{}", jp.getTarget().getClass().getSimpleName(), method.getName(), (end - begin)); } }
開發完成後仍是須要將Jar包發佈到manven中心倉庫的,這個過程較長單獨寫了博客;發佈Jar包到Maven中央倉庫(爲開發開源中間件作準備)