LTS任務調度使用

LTS(light-task-scheduler)主要用於解決分佈式任務調度問題,支持實時任務,定時任務和Cron任務。有較好的伸縮性,擴展性,健壯穩定性而被多家公司使用。java

項目主頁 https://github.com/ltsopensource/light-task-schedulernode

原來項目使用Quartz做爲定時器解決方案,可是Quartz沒有可視化的任務運行時調度和監控(有數據庫,可是須要本身開發界面),每次想要修改定時器配置都比較麻煩,因此引入了LTSgit

該框架主要有四個節點:github

  • JobClient:主要負責提交任務, 並接收任務執行反饋結果。
  • JobTracker:負責接收並分配任務,任務調度。
  • TaskTracker:負責執行任務,執行完反饋給JobTracker。
  • Monitor:(管理後臺)主要負責節點管理,任務隊列管理,監控管理等。

因爲目前系統所須要的任務都是固定任務,所已其中JobClient所有在頁面中進行提交,因此不部署
Monitor使用官方提供頁面(war),直接部署到tomcat,默認用戶名密碼均爲adminspring

JobTracker部署

clone項目的源碼,運行根目錄下的sh build.sh或build.cmd腳本,會在dist目錄下生成lts-{version}-bin文件夾。
或者直接解壓dist下面原有的lts-1.6.8-beta1-bin.zip,也能實現一樣效果,本人使用了第二種方法。數據庫

修改生成的或解壓的文件夾中conf/zoo文件夾下的配置文件,修改成實際使用ZooKeeper和MySQL的配置apache

在生成的或解壓的文件夾中執行 sh jobtracker.sh zoo start 便可啓動JobTrackertomcat

TaskTracker使用

tasktracker須要在業務中實現代碼,簡單說是本身編寫一個任務執行類,實現JobRunner接口,在run方法中實現本身的邏輯便可。markdown

貼一個官方提供的TaskTracker例子app

public class MyJobRunner implements JobRunner {
    @Override
    public Result run(JobContext jobContext) throws Throwable {
        try {
            // TODO 業務邏輯
            // 會發送到 LTS (JobTracker上)
            jobContext.getBizLogger().info("測試,業務日誌啊啊啊啊啊");

        } catch (Exception e) {
            return new Result(Action.EXECUTE_FAILED, e.getMessage());
        }
        return new Result(Action.EXECUTE_SUCCESS, "執行成功了,哈哈");
    }
}

因爲原來的項目使用的了Spring,因此直接寫配置文件便可暴露節點,一樣貼上官方例子:

<bean id="taskTracker" class="com.github.ltsopensource.spring.TaskTrackerAnnotationFactoryBean" init-method="start">
    <property name="jobRunnerClass" value="com.github.ltsopensource.example.support.MyJobRunner"/>
    <property name="bizLoggerLevel" value="INFO"/>
    <property name="clusterName" value="test_cluster"/>
    <property name="registryAddress" value="zookeeper://127.0.0.1:2181"/>
    <property name="nodeGroup" value="test_trade_TaskTracker"/>
    <property name="workThreads" value="20"/>
    <property name="configs">
        <props>
            <prop key="job.fail.store">leveldb</prop>
        </props>
    </property>
</bean>

啓動項目,節點就暴露成功了。

一個TaskTracker執行多種任務

通常一個系統中每每不止一個任務,須要使用LTS對多個任務進行調度,一開始本人的想法是在一個項目中啓動多個任務節點來接受任務調度。後來發現一個JVM中使用多個TaskTracker實例比較浪費資源,在正式項目運行過程當中出現了文件句柄過多致使任務大量失敗的狀況,因此已經棄用。參考了LTS做者提供的文檔之後,對任務進行了簡單封裝,以知足這種需求。

  1. 建立一個集中的任務調度器,該bean在啓動時將IOC容器中全部JobRunner取出放入一個MAP中,當該JOB被LTS調用時根據參數判斷具體調度那個任務
* @author ElinZhou
 * @version $Id: JobRunnerDispatcher.java , v 0.1 2016/6/24 16:39 ElinZhou Exp $
 */
public class JobRunnerDispatcher implements JobRunner, ApplicationContextAware {

    private static final ConcurrentHashMap<String/*type*/, JobRunner> JOB_RUNNER_MAP = new ConcurrentHashMap<String, JobRunner>();

    @Override
    public Result run(JobContext jobContext) throws Throwable {

        //根據type選擇對應的JobRunner運行
        Job job = jobContext.getJob();
        String type = job.getParam("type");
        return JOB_RUNNER_MAP.get(type.toUpperCase()).run(jobContext);

    }

    /** * 從IOC容器中將JobRunner類型的bean放入map中 * @param applicationContext * @throws BeansException */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

        Map<String, JobRunner> map = applicationContext.getBeansOfType(JobRunner.class);
        for (String type : map.keySet()) {
            JOB_RUNNER_MAP.put(type.toUpperCase(), map.get(type));
        }
    }
}
  1. 將本身的實現了JobRunner的任務如上文的MyJobRunner 類注入IOC容器中,例如直接在任務類前面加入@Component註解
  2. 在提交任務時在參數中指定具體要調度的任務,如:{「type」:」MyJobRunner」}
  3. 暴露集中任務調度器JobRunnerDispatcher
<bean id="taskTracker" class="com.github.ltsopensource.spring.TaskTrackerAnnotationFactoryBean" init-method="start">
        <property name="jobRunnerClass" value="com.elin4it.util.taskSchedule.JobRunnerDispatcher"/>
        <property name="bizLoggerLevel" value="INFO"/>
        <!--集羣名,在auto-config配置並從配置中心獲取-->
        <property name="clusterName" value="${task.clusterName}"/>
        <!--zookeeper地址,在auto-config配置並從配置中心獲取-->
        <property name="registryAddress" value="${zookeeper.url}"/>
        <!--節點組名稱,在部署集羣時同一個系統須要統一,在auto-config配置-->
        <property name="nodeGroup" value="${system.name}-DISPATCHER"/>
        <!--工做線程數,根據本身任務定義,要求不小於本系統最大任務數-->
        <property name="workThreads" value="10"/>
        <property name="configs">
            <props>
                <prop key="job.fail.store">leveldb</prop>
            </props>
        </property>
    </bean>

因爲當初寫這篇文章的時候對LTS的原理認識不足以及能力尚淺,因此後面的內容的代碼有嚴重問題,請各位看官不要繼續使用

多個任務節點暴露

LTS原生僅支持一個nodegroup就要寫一套配置,如上所示的XML配置文件,這樣寫很囉嗦,因此我對他進行了一次封裝,先貼上代碼:

1.精簡後的任務配置Bean

import java.util.Properties;

import com.github.ltsopensource.tasktracker.runner.JobRunner;

/** * @author ElinZhou * @version $Id: TaskTrackerCustom.java , v 0.1 2016/5/31 17:19 ElinZhou Exp $ */
public class TaskTrackerCustom {
    /** * 任務運行類 */
    private Class<? extends JobRunner> jobRunnerClass;

    /** * 節點組(默認爲系統名稱-運行類名) */
    private String                     nodeGroup;

    /** * 線程數(默認爲1) */
    private int                        workThreads;
    /** * 額外參數配置 */
    private Properties                 configs = new Properties();

    public Class<? extends JobRunner> getJobRunnerClass() {
        return jobRunnerClass;
    }

    public void setJobRunnerClass(Class<? extends JobRunner> jobRunnerClass) {
        this.jobRunnerClass = jobRunnerClass;
    }

    public String getNodeGroup() {
        return nodeGroup;
    }

    public void setNodeGroup(String nodeGroup) {
        this.nodeGroup = nodeGroup;
    }

    public int getWorkThreads() {
        return workThreads;
    }

    public void setWorkThreads(int workThreads) {
        this.workThreads = workThreads;
    }

    public Properties getConfigs() {
        return configs;
    }

    public void setConfigs(Properties configs) {
        this.configs = configs;
    }
}

2.異常類

/** * 任務執行器異常 * * @author ElinZhou * @version $Id: TaskTrackerException.java , v 0.1 2016/5/31 17:33 ElinZhou Exp $ */
public class TaskTrackerException extends RuntimeException {

    public TaskTrackerException(String detail) {
        super(detail);
    }
}

三、本身封裝工廠Bean

import java.util.*;

import org.apache.commons.lang.StringUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;

import com.github.ltsopensource.spring.tasktracker.JobDispatcher;
import com.github.ltsopensource.tasktracker.TaskTracker;
import com.github.ltsopensource.tasktracker.runner.JobRunner;
import com.github.ltsopensource.tasktracker.runner.RunnerFactory;


/** * 任務執行器XML配置工廠 * * @author ElinZhou * @version $Id: TaskTrackerXmlFactoryBean.java , v 0.1 2016/5/31 17:18 ElinZhou Exp $ */
public class TaskTrackerXmlFactoryBean implements InitializingBean, ApplicationContextAware {

    /** * 任務執行器 */
    private List<TaskTrackerCustom> taskTrackerCustoms = new ArrayList<TaskTrackerCustom>();

    /** * 任務執行器 */
    private List<String>            jobRunnersName     = new ArrayList<String>();

    /** * 註冊中心地址 */
    private String                  registryAddress;

    /** * 集羣名稱 */
    private String                  clusterName;

    /** * 系統名稱 */
    private String                  systemName;

    private ApplicationContext      applicationContext;

    @Override
    public void afterPropertiesSet() throws Exception {
        if (StringUtils.isBlank(registryAddress)) {
            throw new TaskTrackerException("註冊中心地址不能爲空");
        }
        if (StringUtils.isBlank(clusterName)) {
            throw new TaskTrackerException("集羣名稱不能爲空");
        }
        if (StringUtils.isBlank(systemName)) {

            throw new TaskTrackerException("本系統名稱不能爲空");

        }
    }

    public void start() {

        //若是直接配置了JobRunner,則轉換後加入taskTrackerCustoms
        TaskTrackerCustom custom;
        for (String jobRunnerName : jobRunnersName) {
            Class clazz;
            try {
                clazz = Class.forName(jobRunnerName);
            } catch (Exception e) {
                throw new TaskTrackerException(jobRunnerName + " 不存在");
            }
            //判斷該類是否實現了JobRunner
            if (!new HashSet<Class>(Arrays.asList(clazz.getInterfaces())).contains(JobRunner.class)) {
                throw new TaskTrackerException(jobRunnerName + " 沒有實現JobRunner接口");
            }
            custom = new TaskTrackerCustom();
            custom.setNodeGroup(clazz.getSimpleName());
            custom.setJobRunnerClass(clazz);
            custom.setWorkThreads(1);
            taskTrackerCustoms.add(custom);
        }

        //將自定義任務執行器轉換爲框架任務執行器
        TaskTracker taskTracker;
        for (TaskTrackerCustom taskTrackerCustom : taskTrackerCustoms) {
            taskTracker = new TaskTracker();
            // 任務執行類,實現JobRunner 接口
            if (taskTrackerCustom.getJobRunnerClass() == null) {
                throw new TaskTrackerException("任務執行類不能爲空");
            }
            final String beanName = registerRunnerBeanDefinition(taskTrackerCustom
                .getJobRunnerClass());
            taskTracker.setRunnerFactory(new RunnerFactory() {
                @Override
                public JobRunner newRunner() {
                    return (JobRunner) applicationContext.getBean(beanName);
                }
            });
            taskTracker.setRegistryAddress(registryAddress);
            if (StringUtils.isBlank(taskTrackerCustom.getNodeGroup())) {
                taskTrackerCustom.setNodeGroup(taskTrackerCustom.getJobRunnerClass()
                    .getSimpleName());
            }
            taskTracker.setNodeGroup(systemName + "-" + taskTrackerCustom.getNodeGroup()); // 同一個TaskTracker集羣這個名字相同
            taskTracker.setClusterName(clusterName);
            if (taskTrackerCustom.getWorkThreads() == 0) {
                taskTracker.setWorkThreads(1);
            } else {
                taskTracker.setWorkThreads(taskTrackerCustom.getWorkThreads());
            }

            Iterator<Map.Entry<Object, Object>> it = taskTrackerCustom.getConfigs().entrySet()
                .iterator();
            while (it.hasNext()) {
                Map.Entry<Object, Object> entry = it.next();
                Object key = entry.getKey();
                Object value = entry.getValue();
                taskTracker.addConfig((String) key, (String) value);
            }
            taskTracker.start();
        }
    }

    /** * 將 JobRunner 生成Bean放入spring容器中管理 * 採用原型 scope, 因此能夠在JobRunner中使用@Autowired */
    private String registerRunnerBeanDefinition(Class jobRunnerClass) {
        DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) ((ConfigurableApplicationContext) applicationContext)
            .getBeanFactory();
        String jobRunnerBeanName = "LTS_".concat(jobRunnerClass.getSimpleName());
        if (!beanFactory.containsBean(jobRunnerBeanName)) {
            BeanDefinitionBuilder builder = BeanDefinitionBuilder
                .rootBeanDefinition(jobRunnerClass);
            if (jobRunnerClass == JobDispatcher.class) {
                builder.setScope(BeanDefinition.SCOPE_SINGLETON);
                builder.setLazyInit(false);
                builder.getBeanDefinition().getPropertyValues()
                    .addPropertyValue("shardField", null);
            } else {
                builder.setScope(BeanDefinition.SCOPE_PROTOTYPE);
            }
            beanFactory.registerBeanDefinition(jobRunnerBeanName, builder.getBeanDefinition());
        }
        return jobRunnerBeanName;
    }

    public List<TaskTrackerCustom> getTaskTrackerCustoms() {
        return taskTrackerCustoms;
    }

    public void setTaskTrackerCustoms(List<TaskTrackerCustom> taskTrackerCustoms) {
        this.taskTrackerCustoms = taskTrackerCustoms;
    }

    public List<String> getJobRunnersName() {
        return jobRunnersName;
    }

    public void setJobRunnersName(List<String> jobRunnersName) {
        this.jobRunnersName = jobRunnersName;
    }

    public String getRegistryAddress() {
        return registryAddress;
    }

    public void setRegistryAddress(String registryAddress) {
        this.registryAddress = registryAddress;
    }

    public String getClusterName() {
        return clusterName;
    }

    public void setClusterName(String clusterName) {
        this.clusterName = clusterName;
    }

    public String getSystemName() {
        return systemName;
    }

    public void setSystemName(String systemName) {
        this.systemName = systemName;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

以上代碼實現了只須要配置一次registryAddress、clusterName等信息,但能開啓多個任務節點。Spring配置時有兩種方式,一個是將JobRunner實現類全類名組成的List注入jobRunnersName字段中,其中nodegroup將使用‘系統名稱’-‘類型’,線程數設置爲1;另外一種方式建立TaskTrackerCustom ,在其中設置該任務的一些配置,而後將TaskTrackerCustom 組成的List注入到taskTrackerCustoms字段中。
兩種方式能夠同時使用,所配置的任務將都會啓動。

Spring配置代碼:

<bean id="taskTrackerXmlFactoryBean" class="com.elin4it.common.util.taskSchedule.TaskTrackerXmlFactoryBean" init-method="start">
        <!--ZooKeeper地址-->
        <property name="registryAddress" value="zookeeper://cc.yumei.cn:2181"/>
        <!--集羣名稱-->
        <property name="clusterName" value="elin_cluster"/>
        <!--經過任務執行類名建立任務,其他配置均使用默認值-->
        <property name="jobRunnersName">
            <list>
                <value>com.elin4it.biz.daemon.task.withdraw.SingleWithdrawSenderRunner</value>
            </list>
        </property>

        <!--經過自定義封裝的配置類建立任務,可自定義配置-->
        <property name="taskTrackerCustoms">
            <list>
                <ref bean="taskTrackerCustom"/>
            </list>
        </property>
    </bean>


    <bean id="taskTrackerCustom" class="com.elin4it.common.util.taskSchedule.TaskTrackerCustom">
        <property name="jobRunnerClass" value="com.elin4it.biz.daemon.task.DepositQueryRunner"/>
        <property name="nodeGroup" value="DepositQueryRunner"/>
        <property name="workThreads" value="1"/>
    </bean>

接下來在界面上配置任務就能夠運行任務了。

相關文章
相關標籤/搜索