基於Netty+Zookeeper+Quartz調度分析

系列文章

Spring整合Quartz分佈式調度
Quartz數據庫表分析
Quartz調度源碼分析
基於Netty+Zookeeper+Quartz調度分析node

前言

前幾篇文章分別從使用和源碼層面對Quartz作了簡單的分析,在分析的過程當中也發現了Quartz不足的地方;好比底層調度依賴數據庫的悲觀鎖,誰先搶到誰調度,這樣會致使節點負載不均衡;還有調度和執行耦合在一塊兒,致使調度器會受到業務的影響;下面看看如何來解決這幾個問題;git

思路

調度器和執行器拆成不一樣的進程,調度器仍是依賴Quartz自己的調度方式,可是調度的並非具體業務的QuartzJobBean,而是統一的一個RemoteQuartzJobBean,在此Bean中經過Netty遠程調用執行器去執行具體業務Bean;具體的執行器在啓動時註冊到Zookeeper中,調度器能夠在Zookeeper獲取執行器信息,並經過相關的負載算法指定具體的執行器去執行,如下看簡單的實現;github

執行器

1.執行器配置文件

executor_name=firstExecutor
service_address=127.0.0.1:8000
registry_address=127.0.0.1:2181

配置了執行器的名稱,執行器啓動的ip和端口以及Zookeeper的地址信息;算法

2.執行器服務

<bean id="executorServer" class="com.zh.job.executor.ExecutorServer">
        <constructor-arg name="executorName" value="${executor_name}"/>
        <constructor-arg name="serviceAddress" value="${service_address}" />
        <constructor-arg name="serviceRegistry" ref="serviceRegistry" />
    </bean>

ExecutorServer經過Netty啓動服務,並向Zookeeper註冊服務,部分代碼以下:spring

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
    // 建立並初始化 Netty 服務端 Bootstrap 對象
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(bossGroup, workerGroup);
    bootstrap.channel(NioServerSocketChannel.class);
    bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel channel) throws Exception {
            ChannelPipeline pipeline = channel.pipeline();
            pipeline.addLast(new RpcDecoder(Request.class));
            pipeline.addLast(new RpcEncoder(Response.class));
            pipeline.addLast(new ExecutorServerHandler(handlerMap));
        }
    });
    bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
    bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
    // 獲取 RPC 服務器的 IP 地址與端口號
    String[] addressArray = StringUtils.splitByWholeSeparator(serviceAddress, ":");
    String ip = addressArray[0];
    int port = Integer.parseInt(addressArray[1]);
    // 啓動 RPC 服務器
    ChannelFuture future = bootstrap.bind(ip, port).sync();
    // 註冊 RPC 服務地址
    if (serviceRegistry != null) {
        serviceRegistry.register(executorName, serviceAddress);
        LOGGER.info("register service: {} => {}", executorName, serviceAddress);
    }
    LOGGER.info("server started on port {}", port);
    // 關閉 RPC 服務器
    future.channel().closeFuture().sync();
} finally {
    workerGroup.shutdownGracefully();
    bossGroup.shutdownGracefully();
}

在Netty中指定了編碼器解碼器,同時指定了ExecutorServerHandler用來處理調度器發送來的消息(更多代碼查看項目源碼);最後向Zookeeper註冊服務,路徑格式以下:數據庫

/job_registry/firstExecutor/address-0000000008

job_registry是固定值,firstExecutor是配置的具體執行器名稱;bootstrap

3.配置加載任務

添加註解類,用來指定具體的業務Job:segmentfault

@Target({ ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface ExecutorTask {
 
    String name();
 
}

例如具體的業務Task以下所示:服務器

@ExecutorTask(name = "firstTask")
public class FirstTask implements IJobHandler {
 
    private static final Logger LOGGER = LoggerFactory.getLogger(FirstTask.class);
 
    @Override
    public Result execute(String param) throws Exception {
        LOGGER.info("execute firstTask");
        return SUCCESS;
    }
 
}

在啓動執行器服務時,加載有ExecutorTask註解的任務類,此處定義的name要和調度端的名稱相互匹配;併發

4.執行具體業務

Netty中指定了ExecutorServerHandler用來處理接受的調度器信息,經過反射的方式來調用具體的業務Job,部分代碼以下:

private Object handle(Request request) throws Exception {
     // 獲取服務對象
     String serviceName = request.getInterfaceName();
     Object serviceBean = handlerMap.get(serviceName);
     if (serviceBean == null) {
         throw new RuntimeException(String.format("can not find service bean by key: %s", serviceName));
     }
     // 獲取反射調用所需的參數
     Class<?> serviceClass = serviceBean.getClass();
     String methodName = request.getMethodName();
     Class<?>[] parameterTypes = request.getParameterTypes();
     Object[] parameters = request.getParameters();
     // 使用 CGLib 執行反射調用
     FastClass serviceFastClass = FastClass.create(serviceClass);
     FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
     return serviceFastMethod.invoke(serviceBean, parameters);
 }

serviceName對應的就是定義的」firstTask」,而後經過serviceName找到對應的Bean,而後反射調用,最終返回結果;

調度器

調度器仍是依賴Quartz的原生調度方式,只不過調度器不在執行相關業務Task,因此相關配置也是相似,一樣依賴數據庫;

1.定義調度任務

<bean id="firstTask"
     class="org.springframework.scheduling.quartz.JobDetailFactoryBean">
     <property name="jobClass" value="com.zh.job.scheduler.RemoteQuartzJobBean" />
     <property name="jobDataMap">
         <map>
             <entry key="executorBean" value-ref="firstExecutor" />
         </map>
     </property>
 </bean>
 
 <bean id="firstExecutor" class="com.zh.job.scheduler.ExecutorBean">
     <constructor-arg name="executorName" value="firstExecutor"></constructor-arg>
     <constructor-arg name="discoveryAddress" value="${discovery_address}"></constructor-arg>
 </bean>

一樣在調度端定義了名稱問firstTask的任務,能夠發現此類是RemoteQuartzJobBean,並非具體的業務Task;同時也指定了jobDataMap,用來指定執行器名稱和發現的Zookeeper地址;

2.RemoteQuartzJobBean

public class RemoteQuartzJobBean extends QuartzJobBean {
 
    private static final Logger LOGGER = LoggerFactory.getLogger(RemoteQuartzJobBean.class);
 
    private ExecutorBean executorBean;
 
    @Override
    protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
        JobKey jobKey = context.getTrigger().getJobKey();
        LOGGER.info("jobName:" + jobKey.getName() + ",group:" + jobKey.getGroup());
        IJobHandler executor = JobProxy.create(IJobHandler.class, jobKey, this.executorBean);
        Result result;
        try {
            result = executor.execute("");
            LOGGER.info("result:" + result);
        } catch (Exception e) {
            LOGGER.error("", e);
        }
    }
 
    public ExecutorBean getExecutorBean() {
        return executorBean;
    }
 
    public void setExecutorBean(ExecutorBean executorBean) {
        this.executorBean = executorBean;
    }
 
}

此類一樣繼承於QuartzJobBean,這樣Quartz才能調度Bean,在此Bean中經過jobKey和executorBean建立了IJobHandler的代理類,具體代碼以下:

public static <T> T create(final Class<?> interfaceClass, final JobKey jobKey, final ExecutorBean executor) {
        // 建立動態代理對象
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] { interfaceClass },
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        // 建立 RPC 請求對象並設置請求屬性
                        Request request = new Request();
                        request.setRequestId(UUID.randomUUID().toString());
                        request.setInterfaceName(jobKey.getName());
                        request.setMethodName(method.getName());
                        request.setParameterTypes(method.getParameterTypes());
                        request.setParameters(args);
 
                        String serviceAddress = null;
                        ServiceDiscovery serviceDiscovery = ServiceDiscoveryFactory
                                .getServiceDiscovery(executor.getDiscoveryAddress());
                        // 獲取 RPC 服務地址
                        if (serviceDiscovery != null) {
                            serviceAddress = serviceDiscovery.discover(executor.getExecutorName());
                            LOGGER.debug("discover service: {} => {}", executor.getExecutorName(), serviceAddress);
                        }
                        if (StringUtil.isEmpty(serviceAddress)) {
                            throw new RuntimeException("server address is empty");
                        }
                        // 從 RPC 服務地址中解析主機名與端口號
                        String[] array = StringUtil.split(serviceAddress, ":");
                        String host = array[0];
                        int port = Integer.parseInt(array[1]);
                        // 建立 RPC 客戶端對象併發送 RPC 請求
                        ExecutorClient client = new ExecutorClient(host, port);
                        long time = System.currentTimeMillis();
                        Response response = client.send(request);
                        LOGGER.debug("time: {}ms", System.currentTimeMillis() - time);
                        if (response == null) {
                            throw new RuntimeException("response is null");
                        }
                        // 返回 RPC 響應結果
                        if (response.hasException()) {
                            throw response.getException();
                        } else {
                            return response.getResult();
                        }
                    }
                });
    }

在Request中指定了InterfaceName爲jobKey.getName(),也就是這裏的firstTask;經過Zookeeper發現服務時指定了executor.getExecutorName(),這樣能夠在Zookeeper中找到具體的執行器地址,固然這裏的地址多是一個列表,能夠經過負載均衡算法(隨機,輪詢,一致性hash等等)進行分配,獲取到地址後經過Netty遠程鏈接執行器,發送執行job等待返回結果;

簡單測試

分別執行調度器和執行器,相關日誌以下:

1.執行器日誌

2018-09-03 11:17:02 [main] 13::: DEBUG com.zh.job.sample.executor.ExecutorBootstrap - start server
2018-09-03 11:17:03 [main] 31::: DEBUG com.zh.job.registry.impl.ZookeeperServiceRegistry - connect zookeeper
2018-09-03 11:17:03 [main] 49::: DEBUG com.zh.job.registry.impl.ZookeeperServiceRegistry - create address node: /job_registry/firstExecutor/address-0000000009
2018-09-03 11:17:03 [main] 107::: INFO  com.zh.job.executor.ExecutorServer - register service: firstExecutor => 127.0.0.1:8000
2018-09-03 11:17:03 [main] 109::: INFO  com.zh.job.executor.ExecutorServer - server started on port 8000
2018-09-03 11:17:15 [nioEventLoopGroup-3-1] 17::: INFO  com.zh.job.sample.executor.task.FirstTask - execute firstTask

2.調度器日誌

2018-09-03 11:17:14 [myScheduler_Worker-1] 28::: INFO  com.zh.job.scheduler.RemoteQuartzJobBean - jobName:firstTask,group:DEFAULT
2018-09-03 11:17:15 [myScheduler_Worker-2] 28::: INFO  com.zh.job.scheduler.RemoteQuartzJobBean - jobName:firstTask,group:DEFAULT
2018-09-03 11:17:15 [myScheduler_Worker-1] 33::: DEBUG com.zh.job.registry.impl.ZookeeperServiceDiscovery - connect zookeeper
2018-09-03 11:17:15 [myScheduler_Worker-2] 54::: DEBUG com.zh.job.registry.impl.ZookeeperServiceDiscovery - get only address node: address-0000000009
2018-09-03 11:17:15 [myScheduler_Worker-1] 54::: DEBUG com.zh.job.registry.impl.ZookeeperServiceDiscovery - get only address node: address-0000000009
2018-09-03 11:17:15 [myScheduler_Worker-2] 42::: DEBUG com.zh.job.scheduler.JobProxy$1 - discover service: firstExecutor => 127.0.0.1:8000
2018-09-03 11:17:15 [myScheduler_Worker-1] 42::: DEBUG com.zh.job.scheduler.JobProxy$1 - discover service: firstExecutor => 127.0.0.1:8000
2018-09-03 11:17:15 [myScheduler_Worker-1] 55::: DEBUG com.zh.job.scheduler.JobProxy$1 - time: 369ms
2018-09-03 11:17:15 [myScheduler_Worker-1] 33::: INFO  com.zh.job.scheduler.RemoteQuartzJobBean - result:com.zh.job.common.bean.Result@33b61489

總結

本文經過一個實例來分析如何解決原生Quartz調度存在不足的問題,主要體如今調度器與執行器的隔離上,各司其責發揮各自的優點;

示例代碼地址

https://github.com/ksfzhaohui...

相關文章
相關標籤/搜索