SpringBoot分佈式任務中間件開發 附視頻講解 (手把手教你開發和使用中間件)

image

做者:小傅哥
博客:https://bugstack.cnhtml

沉澱、分享、成長,讓本身和他人都能有所收穫!😄

前言


@SpringBootApplication
@EnableScheduling
public class Application{
    public static void mian(String[] args){
        SpringApplication.run(Application.class,args);
    }
    
    @Scheduled(cron = "0/3 * * * * *")
    public void demoTask() {
        //...
    }
}

咔咔,上面這段代碼很熟悉吧,他就是SpringBoot的Schedule定時任務,簡單易用。在咱們開發中若是須要作一些定時或指定時刻循環執行邏輯時候,基本都會使用到Schedule。java

可是,若是咱們的任務是比較大型的,好比;定時跑批T+1結算、商品秒殺前狀態變動、刷新數據預熱到緩存等等,這些定時任務都相同的特色;做業量大實時性強可用率高。而這時候若是隻是單純使用Schedule就顯得不足以控制。node

那麼,咱們產品需求就出來了,分佈式DcsSchedule任務;git

  1. 多機器部署任務
  2. 統一控制中心啓停
  3. 宕機災備,自動啓動執行
  4. 實時檢測任務執行信息:部署數量、任務總量、成功次數、失敗次數、執行耗時等

嗯?有人憋半天了想說能夠用Quertz,嗯能夠的,但這不是本篇文章的重點。難道你不想看看一個自言開源中間件是怎麼誕生的嗎,怎麼推到中心Maven倉的嗎?好比下圖;真香不!程序員

首頁監控
微信公衆號:bugstack蟲洞棧 & 首頁監控

任務列表
微信公衆號:bugstack蟲洞棧 & 任務列表github

😀好了,接下來開始介紹這個中間件如何使用和怎麼開發的了!面試

中間件使用


1. 版本記錄

版本 發佈日期 備註
1 1.0.0-RELEASE 2019-12-07 基本功能實現;任務接入、分佈式啓停
2 1.0.1-RELEASE 2019-12-07 上傳測試版本

2. 環境準備

  1. jdk1.8
  2. StringBoot 2.x
  3. 配置中心zookeeper 3.4.14 {準備好zookeeper服務,若是windows調試能夠從這裏下載:https://www-eu.apache.org/dis...}spring

    1. 下載後解壓,在bin同級路徑建立文件夾data、logs
    2. 修改conf/zoo.cfg,修改配置以下;apache

      dataDir=D:\\Program Files\\apache-zookeeper-3.4.14\\data
      dataLogDir=D:\\Program Files\\apache-zookeeper-3.4.14\\logs
  4. 打包部署控制平臺windows

    1. 下載地址:https://github.com/fuzhengwei...
    2. 部署訪問:http://localhost:7397

3. 配置POM

<dependency>
    <groupId>org.itstack.middleware</groupId>
    <artifactId>schedule-spring-boot-starter</artifactId>
    <version>1.0.0-RELEASE</version>
</dependency>

4. 引入分佈式任務DcsSchedule @EnableDcsScheduling

  1. 與SpringBoot的Sceduling很是像,他的註解是;@EnableScheduling,儘量下降使用難度
  2. 這個註解主要方便給咱們本身的中間件一個入口,也是😏扒拉源碼發現的能夠這麼幹{我一直說好的代碼都很騷氣}
@SpringBootApplication
@EnableDcsScheduling
public class HelloWorldApplication {

    public static void main(String[] args) {
        SpringApplication.run(HelloWorldApplication.class, args);
    }

}

5. 在任務方法上添加註解

  1. 這個註解也和SpringBoot的Schedule很像,可是多了desc描述和啓停初始化控制
  2. cron:執行計劃
  3. desc:任務描述
  4. autoStartup:默認啓動狀態
  5. 若是你的任務須要參數能夠經過引入service去調用獲取等方式均可以
@Component("demoTaskThree")
public class DemoTaskThree {
    
    @DcsScheduled(cron = "0 0 9,13 * * *", desc = "03定時任務執行測試:taskMethod01", autoStartup = false)
    public void taskMethod01() {
        System.out.println("03定時任務執行測試:taskMethod01");
    }

    @DcsScheduled(cron = "0 0/30 8-10 * * *", desc = "03定時任務執行測試:taskMethod02", autoStartup = false)
    public void taskMethod02() {
        System.out.println("03定時任務執行測試:taskMethod02");
    }

}

6. 啓動驗證

  1. 啓動SpringBoot工程便可,autoStartup = true的會自動啓動任務(任務是多線程並行執行的)
  2. 啓動控制平臺:itstack-middleware-control,訪問:http://localhost:7397/ 成功界面以下;能夠開啓/關閉驗證了!{功能還在完善}
    微信公衆號:bugstack蟲洞棧 & 任務列表

中間件開發


以SpringBoot爲基礎開發一款中間件我也是第一次,由於接觸SpringBoot也剛剛1個月左右。雖然SpringBoot已經出來挺久的了,但因爲咱們項目開發並不使用SpringBoot的一套東西,因此一直依賴沒有接觸。直到上個月開始考慮領域驅動設計才接觸,嗯!真的不錯,那麼就開始了夯實技能、學習思想用到項目裏。

按照個人產品需求,開發這麼一款分佈式任務的中間件,我腦殼中的模型已經存在了。另外就是須要開發過程當中去探索我須要的知識工具,簡單包括;

  1. 讀取Yml自定義配置
  2. 使用zookeeper做爲配置中心,這樣若是有機器宕機了就能夠經過臨時節點監聽知道
  3. 經過Spring類;ApplicationContextAware, BeanPostProcessor, ApplicationListener,執行服務啓動、註解掃描、節點掛在
  4. 分佈式任務統一控制檯,來管理任務

1. 工程模型

schedule-spring-boot-starter
└── src
    ├── main
    │   ├── java
    │   │   └── org.itstack.middleware.schedule
    │   │       ├── annotation
    │   │       │    ├── DcsScheduled.java    
    │   │       │    └── EnableDcsScheduling.java
    │   │       ├── annotation    
    │   │       │    └── InstructStatus.java    
    │   │       ├── config
    │   │       │    ├── DcsSchedulingConfiguration.java    
    │   │       │    ├── StarterAutoConfig.java    
    │   │       │    └── StarterServiceProperties.java    
    │   │       ├── domain
    │   │       │    ├── DataCollect.java    
    │   │       │    ├── DcsScheduleInfo.java    
    │   │       │    ├── DcsServerNode.java    
    │   │       │    ├── ExecOrder.java    
    │   │       │    └── Instruct.java
    │   │       ├── export    
    │   │       │    └── DcsScheduleResource.java
    │   │       ├── service
    │   │       │    ├── HeartbeatService.java    
    │   │       │    └── ZkCuratorServer.java
    │   │       ├── task
    │   │       │    ├── TaskScheduler.java    
    │   │       │    ├── ScheduledTask.java    
    │   │       │    ├── SchedulingConfig.java    
    │   │       │    └── SchedulingRunnable.java    
    │   │       ├── util
    │   │       │    └── StrUtil.java    
    │   │       └── DoJoinPoint.java
    │   └── resources    
    │       └── META_INF
    │           └── spring.factories    
    └── test
        └── java
            └── org.itstack.demo.test
                └── ApiTest.java

2. 代碼講解

  1. 篇幅較長,只講解部分重點代碼塊,若是你願意參與到開源編寫,能夠和我申請
  2. 我說過好的代碼都很騷氣,那麼就從這部分入手吧

2.1 自定義註解

annotation/EnableDcsScheduling.java & 自定義註解

這個註解一堆的圈A,這些配置都是爲了開始啓動執行咱們的中間件;

  • Target 標識須要放到類上執行
  • Retention 註釋將由編譯器記錄在類文件中,而且在運行時由VM保留,所以能夠反射地讀取它們
  • Import 引入入口資源,在程序啓動時會執行到本身定義的類中,以方便咱們;初始化配置/服務、啓動任務、掛在節點
  • ComponentScan 告訴程序掃描位置
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import({DcsSchedulingConfiguration.class})
@ImportAutoConfiguration({SchedulingConfig.class, CronTaskRegister.class, DoJoinPoint.class})
@ComponentScan("org.itstack.middleware.*")
public @interface EnableDcsScheduling {
}

2.2 掃描自定義註解、初始化配置/服務、啓動任務、掛在節點

config/DcsSchedulingConfiguration.java & 初始化配置/服務、啓動任務、掛在節點
  • 寫到這的時候,咱們的自定義註解有了,已經寫到方法上了,那麼咱們怎麼拿到呢?
  • 須要經過實現BeanPostProcessor.postProcessAfterInitialization,在每一個bean實例化的時候進行掃描
  • 這裏遇到一個有趣的問題,一個方法會獲得兩次,由於有一個CGLIB給代理的,像真假美猴王同樣,幾乎一毛同樣。😏扒了源碼纔看到,生命註解批註沒有。好那就能夠判斷了!method.getDeclaredAnnotations()
  • 咱們將掃描下來的任務信息彙總到Map中,當Spring初始化完成後,在執行咱們中間件內容。{太早執行有點喧賓奪主了!主要人家也不讓呀,給你拋異常😭。}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
    Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
    if (this.nonAnnotatedClasses.contains(targetClass)) return bean;
    Method[] methods = ReflectionUtils.getAllDeclaredMethods(bean.getClass());
    if (methods == null) return bean;
    for (Method method : methods) {
        DcsScheduled dcsScheduled = AnnotationUtils.findAnnotation(method, DcsScheduled.class);
        if (null == dcsScheduled || 0 == method.getDeclaredAnnotations().length) continue;
        List<ExecOrder> execOrderList = Constants.execOrderMap.computeIfAbsent(beanName, k -> new ArrayList<>());
        ExecOrder execOrder = new ExecOrder();
        execOrder.setBean(bean);
        execOrder.setBeanName(beanName);
        execOrder.setMethodName(method.getName());
        execOrder.setDesc(dcsScheduled.desc());
        execOrder.setCron(dcsScheduled.cron());
        execOrder.setAutoStartup(dcsScheduled.autoStartup());
        execOrderList.add(execOrder);
        this.nonAnnotatedClasses.add(targetClass);
    }
    return bean;
}
  • 初始化服務鏈接zookeeper配置中心
  • 鏈接後將建立咱們的節點以及添加監聽,這個監聽主要負責分佈式消息通知,收到通知負責控制任務啓停
  • 這裏包括了循環建立節點以及批量節點刪除,彷佛!面試題會問😏
private void init_server(ApplicationContext applicationContext) {
    try {
        //獲取zk鏈接
        CuratorFramework client = ZkCuratorServer.getClient(Constants.Global.zkAddress);
        //節點組裝
        path_root_server = StrUtil.joinStr(path_root, LINE, "server", LINE, schedulerServerId);
        path_root_server_ip = StrUtil.joinStr(path_root_server, LINE, "ip", LINE, Constants.Global.ip);
        //建立節點&遞歸刪除本服務IP下的舊內容
        ZkCuratorServer.deletingChildrenIfNeeded(client, path_root_server_ip);
        ZkCuratorServer.createNode(client, path_root_server_ip);
        ZkCuratorServer.setData(client, path_root_server, schedulerServerName);
        //添加節點&監聽
        ZkCuratorServer.createNodeSimple(client, Constants.Global.path_root_exec);
        ZkCuratorServer.addTreeCacheListener(applicationContext, client, Constants.Global.path_root_exec);
    } catch (Exception e) {
        logger.error("itstack middleware schedule init server error!", e);
        throw new RuntimeException(e);
    }
}
  • 啓動標記了True的Schedule任務
  • Scheduled默認是單線程執行的,這裏擴展爲多線程並行執行
private void init_task(ApplicationContext applicationContext) {
    CronTaskRegister cronTaskRegistrar = applicationContext.getBean("itstack-middlware-schedule-cronTaskRegister", CronTaskRegister.class);
    Set<String> beanNames = Constants.execOrderMap.keySet();
    for (String beanName : beanNames) {
        List<ExecOrder> execOrderList = Constants.execOrderMap.get(beanName);
        for (ExecOrder execOrder : execOrderList) {
            if (!execOrder.getAutoStartup()) continue;
            SchedulingRunnable task = new SchedulingRunnable(execOrder.getBean(), execOrder.getBeanName(), execOrder.getMethodName());
            cronTaskRegistrar.addCronTask(task, execOrder.getCron());
        }
    }
}
  • 掛在任務節點到zookeeper掛在
  • 按照不一樣的場景,有些內容是掛在到虛擬機節點。{😏又來個面試題,虛擬節點數據怎麼掛在,建立的是永久節點,那麼虛擬值怎麼加?}
  • path_root_server_ip_clazz_method;這個結構是:根目錄、服務、IP、類、方法
private void init_node() throws Exception {
    Set<String> beanNames = Constants.execOrderMap.keySet();
    for (String beanName : beanNames) {
        List<ExecOrder> execOrderList = Constants.execOrderMap.get(beanName);
        for (ExecOrder execOrder : execOrderList) {
            String path_root_server_ip_clazz = StrUtil.joinStr(path_root_server_ip, LINE, "clazz", LINE, execOrder.getBeanName());
            String path_root_server_ip_clazz_method = StrUtil.joinStr(path_root_server_ip_clazz, LINE, "method", LINE, execOrder.getMethodName());
            String path_root_server_ip_clazz_method_status = StrUtil.joinStr(path_root_server_ip_clazz, LINE, "method", LINE, execOrder.getMethodName(), "/status");
            //添加節點
            ZkCuratorServer.createNodeSimple(client, path_root_server_ip_clazz);
            ZkCuratorServer.createNodeSimple(client, path_root_server_ip_clazz_method);
            ZkCuratorServer.createNodeSimple(client, path_root_server_ip_clazz_method_status);
            //添加節點數據[臨時]
            ZkCuratorServer.appendPersistentData(client, path_root_server_ip_clazz_method + "/value", JSON.toJSONString(execOrder));
            //添加節點數據[永久]
            ZkCuratorServer.setData(client, path_root_server_ip_clazz_method_status, execOrder.getAutoStartup() ? "1" : "0");
        }
    }
}

2.3 zookeeper控制服務

service/ZkCuratorServer.java & zk服務
  • 這裏提供一個zk的方法集合,其中比較重要的方法添加監聽
  • zookeeper有一個特性是對這個監聽後,當節點內容發生變化時會收到通知,固然宕機也是收穫得的,這個也就是咱們後面開發災備的核心觸發點
public static void addTreeCacheListener(final ApplicationContext applicationContext, final CuratorFramework client, String path) throws Exception {
    TreeCache treeCache = new TreeCache(client, path);
    treeCache.start();
    treeCache.getListenable().addListener((curatorFramework, event) -> {
        //...
        switch (event.getType()) {
            case NODE_ADDED:
            case NODE_UPDATED:
                if (Constants.Global.ip.equals(instruct.getIp()) && Constants.Global.schedulerServerId.equals(instruct.getSchedulerServerId())) {
                    //執行命令
                    Integer status = instruct.getStatus();
                    switch (status) {
                        case 0: //中止任務
                            cronTaskRegistrar.removeCronTask(instruct.getBeanName() + "_" + instruct.getMethodName());
                            setData(client, path_root_server_ip_clazz_method_status, "0");
                            logger.info("itstack middleware schedule task stop {} {}", instruct.getBeanName(), instruct.getMethodName());
                            break;
                        case 1: //啓動任務
                            cronTaskRegistrar.addCronTask(new SchedulingRunnable(scheduleBean, instruct.getBeanName(), instruct.getMethodName()), instruct.getCron());
                            setData(client, path_root_server_ip_clazz_method_status, "1");
                            logger.info("itstack middleware schedule task start {} {}", instruct.getBeanName(), instruct.getMethodName());
                            break;
                        case 2: //刷新任務
                            cronTaskRegistrar.removeCronTask(instruct.getBeanName() + "_" + instruct.getMethodName());
                            cronTaskRegistrar.addCronTask(new SchedulingRunnable(scheduleBean, instruct.getBeanName(), instruct.getMethodName()), instruct.getCron());
                            setData(client, path_root_server_ip_clazz_method_status, "1");
                            logger.info("itstack middleware schedule task refresh {} {}", instruct.getBeanName(), instruct.getMethodName());
                            break;
                    }
                }
                break;
            case NODE_REMOVED:
                break;
            default:
                break;
        }
    });
}

2.4 並行任務註冊

  • 因爲默認的SpringBoot是單線程的,因此這裏改造了下,能夠支持多線程並行執行
  • 包括了添加任務和刪除任務,也就是執行取消future.cancel(true)
public void addCronTask(SchedulingRunnable task, String cronExpression) {
    if (null != Constants.scheduledTasks.get(task.taskId())) {
        removeCronTask(task.taskId());
    }
    CronTask cronTask = new CronTask(task, cronExpression);
    Constants.scheduledTasks.put(task.taskId(), scheduleCronTask(cronTask));
}
public void removeCronTask(String taskId) {
    ScheduledTask scheduledTask = Constants.scheduledTasks.remove(taskId);
    if (scheduledTask == null) return;
    scheduledTask.cancel();
}

2.5 待擴展的自定義AOP

  • 咱們最開始配置的掃描@ComponentScan("org.itstack.middleware.*"),主要用到這裏的自定義註解,不然是掃描不到的,也就是你自定義切面失效的效果
  • 目前這裏的功能並無擴展,基本只是打印執行耗時,後續完善的任務執行耗時監聽等,就須要這裏來完善
@Pointcut("@annotation(org.itstack.middleware.schedule.annotation.DcsScheduled)")
public void aopPoint() {
}

@Around("aopPoint()")
public Object doRouter(ProceedingJoinPoint jp) throws Throwable {
    long begin = System.currentTimeMillis();
    Method method = getMethod(jp);
    try {
        return jp.proceed();
    } finally {
        long end = System.currentTimeMillis();
        logger.info("\nitstack middleware schedule method:{}.{} take time(m):{}", jp.getTarget().getClass().getSimpleName(), method.getName(), (end - begin));
    }
}

3. Jar包發佈

開發完成後仍是須要將Jar包發佈到manven中心倉庫的,這個過程較長單獨寫了博客;發佈Jar包到Maven中央倉庫(爲開發開源中間件作準備)

綜上總結


  1. 要開發要實現的還不少,一個週末也幹不完全部的!並且須要有想法的小猿/媛伴一塊兒加入!🙂 😀 😏
  2. 這裏沒有講解分佈式任務中間件控制平臺itstack-middleware-control,由於比較簡單只是使用了中間件的zk功能接口作展現和操做。
  3. 中間件開發是一件很是有意思的事情,不一樣於業務它更像易筋經,寺廟老僧,劍走偏鋒,馳騁縱橫,騷招滿屏。

推薦閱讀

相關文章
相關標籤/搜索