Quartz數據庫表分析git
Quartz調度源碼分析github
基於Netty+Zookeeper+Quartz調度分析算法
前幾篇文章分別從使用和源碼層面對Quartz作了簡單的分析,在分析的過程當中也發現了Quartz不足的地方;好比底層調度依賴數據庫的悲觀鎖,誰先搶到誰調度,這樣會致使節點負載不均衡;還有調度和執行耦合在一塊兒,致使調度器會受到業務的影響;下面看看如何來解決這幾個問題;spring
調度器和執行器拆成不一樣的進程,調度器仍是依賴Quartz自己的調度方式,可是調度的並非具體業務的QuartzJobBean,而是統一的一個RemoteQuartzJobBean,在此Bean中經過Netty遠程調用執行器去執行具體業務Bean;具體的執行器在啓動時註冊到Zookeeper中,調度器能夠在Zookeeper獲取執行器信息,並經過相關的負載算法指定具體的執行器去執行,如下看簡單的實現;數據庫
executor_name=firstExecutor service_address=127.0.0.1:8000 registry_address=127.0.0.1:2181
配置了執行器的名稱,執行器啓動的ip和端口以及Zookeeper的地址信息;bootstrap
<bean id="executorServer" class="com.zh.job.executor.ExecutorServer"> <constructor-arg name="executorName" value="${executor_name}"/> <constructor-arg name="serviceAddress" value="${service_address}" /> <constructor-arg name="serviceRegistry" ref="serviceRegistry" /> </bean>
ExecutorServer經過Netty啓動服務,並向Zookeeper註冊服務,部分代碼以下:服務器
EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 建立並初始化 Netty 服務端 Bootstrap 對象 ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new RpcDecoder(Request.class)); pipeline.addLast(new RpcEncoder(Response.class)); pipeline.addLast(new ExecutorServerHandler(handlerMap)); } }); bootstrap.option(ChannelOption.SO_BACKLOG, 1024); bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); // 獲取 RPC 服務器的 IP 地址與端口號 String[] addressArray = StringUtils.splitByWholeSeparator(serviceAddress, ":"); String ip = addressArray[0]; int port = Integer.parseInt(addressArray[1]); // 啓動 RPC 服務器 ChannelFuture future = bootstrap.bind(ip, port).sync(); // 註冊 RPC 服務地址 if (serviceRegistry != null) { serviceRegistry.register(executorName, serviceAddress); LOGGER.info("register service: {} => {}", executorName, serviceAddress); } LOGGER.info("server started on port {}", port); // 關閉 RPC 服務器 future.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); }
在Netty中指定了編碼器解碼器,同時指定了ExecutorServerHandler用來處理調度器發送來的消息(更多代碼查看項目源碼);最後向Zookeeper註冊服務,路徑格式以下:併發
/job_registry/firstExecutor/address-0000000008
job_registry是固定值,firstExecutor是配置的具體執行器名稱;負載均衡
添加註解類,用來指定具體的業務Job:
@Target({ ElementType.TYPE }) @Retention(RetentionPolicy.RUNTIME) @Component public @interface ExecutorTask { String name(); }
例如具體的業務Task以下所示:
@ExecutorTask(name = "firstTask") public class FirstTask implements IJobHandler { private static final Logger LOGGER = LoggerFactory.getLogger(FirstTask.class); @Override public Result execute(String param) throws Exception { LOGGER.info("execute firstTask"); return SUCCESS; } }
在啓動執行器服務時,加載有ExecutorTask註解的任務類,此處定義的name要和調度端的名稱相互匹配;
Netty中指定了ExecutorServerHandler用來處理接受的調度器信息,經過反射的方式來調用具體的業務Job,部分代碼以下:
private Object handle(Request request) throws Exception { // 獲取服務對象 String serviceName = request.getInterfaceName(); Object serviceBean = handlerMap.get(serviceName); if (serviceBean == null) { throw new RuntimeException(String.format("can not find service bean by key: %s", serviceName)); } // 獲取反射調用所需的參數 Class<?> serviceClass = serviceBean.getClass(); String methodName = request.getMethodName(); Class<?>[] parameterTypes = request.getParameterTypes(); Object[] parameters = request.getParameters(); // 使用 CGLib 執行反射調用 FastClass serviceFastClass = FastClass.create(serviceClass); FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes); return serviceFastMethod.invoke(serviceBean, parameters); }
serviceName對應的就是定義的」firstTask」,而後經過serviceName找到對應的Bean,而後反射調用,最終返回結果;
調度器仍是依賴Quartz的原生調度方式,只不過調度器不在執行相關業務Task,因此相關配置也是相似,一樣依賴數據庫;
<bean id="firstTask" class="org.springframework.scheduling.quartz.JobDetailFactoryBean"> <property name="jobClass" value="com.zh.job.scheduler.RemoteQuartzJobBean" /> <property name="jobDataMap"> <map> <entry key="executorBean" value-ref="firstExecutor" /> </map> </property> </bean> <bean id="firstExecutor" class="com.zh.job.scheduler.ExecutorBean"> <constructor-arg name="executorName" value="firstExecutor"></constructor-arg> <constructor-arg name="discoveryAddress" value="${discovery_address}"></constructor-arg> </bean>
一樣在調度端定義了名稱問firstTask的任務,能夠發現此類是RemoteQuartzJobBean,並非具體的業務Task;同時也指定了jobDataMap,用來指定執行器名稱和發現的Zookeeper地址;
public class RemoteQuartzJobBean extends QuartzJobBean { private static final Logger LOGGER = LoggerFactory.getLogger(RemoteQuartzJobBean.class); private ExecutorBean executorBean; @Override protected void executeInternal(JobExecutionContext context) throws JobExecutionException { JobKey jobKey = context.getTrigger().getJobKey(); LOGGER.info("jobName:" + jobKey.getName() + ",group:" + jobKey.getGroup()); IJobHandler executor = JobProxy.create(IJobHandler.class, jobKey, this.executorBean); Result result; try { result = executor.execute(""); LOGGER.info("result:" + result); } catch (Exception e) { LOGGER.error("", e); } } public ExecutorBean getExecutorBean() { return executorBean; } public void setExecutorBean(ExecutorBean executorBean) { this.executorBean = executorBean; } }
此類一樣繼承於QuartzJobBean,這樣Quartz才能調度Bean,在此Bean中經過jobKey和executorBean建立了IJobHandler的代理類,具體代碼以下:
public static <T> T create(final Class<?> interfaceClass, final JobKey jobKey, final ExecutorBean executor) { // 建立動態代理對象 return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] { interfaceClass }, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // 建立 RPC 請求對象並設置請求屬性 Request request = new Request(); request.setRequestId(UUID.randomUUID().toString()); request.setInterfaceName(jobKey.getName()); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); request.setParameters(args); String serviceAddress = null; ServiceDiscovery serviceDiscovery = ServiceDiscoveryFactory .getServiceDiscovery(executor.getDiscoveryAddress()); // 獲取 RPC 服務地址 if (serviceDiscovery != null) { serviceAddress = serviceDiscovery.discover(executor.getExecutorName()); LOGGER.debug("discover service: {} => {}", executor.getExecutorName(), serviceAddress); } if (StringUtil.isEmpty(serviceAddress)) { throw new RuntimeException("server address is empty"); } // 從 RPC 服務地址中解析主機名與端口號 String[] array = StringUtil.split(serviceAddress, ":"); String host = array[0]; int port = Integer.parseInt(array[1]); // 建立 RPC 客戶端對象併發送 RPC 請求 ExecutorClient client = new ExecutorClient(host, port); long time = System.currentTimeMillis(); Response response = client.send(request); LOGGER.debug("time: {}ms", System.currentTimeMillis() - time); if (response == null) { throw new RuntimeException("response is null"); } // 返回 RPC 響應結果 if (response.hasException()) { throw response.getException(); } else { return response.getResult(); } } }); }
在Request中指定了InterfaceName爲jobKey.getName(),也就是這裏的firstTask;經過Zookeeper發現服務時指定了executor.getExecutorName(),這樣能夠在Zookeeper中找到具體的執行器地址,固然這裏的地址多是一個列表,能夠經過負載均衡算法(隨機,輪詢,一致性hash等等)進行分配,獲取到地址後經過Netty遠程鏈接執行器,發送執行job等待返回結果;
分別執行調度器和執行器,相關日誌以下:
2018-09-03 11:17:02 [main] 13::: DEBUG com.zh.job.sample.executor.ExecutorBootstrap - start server 2018-09-03 11:17:03 [main] 31::: DEBUG com.zh.job.registry.impl.ZookeeperServiceRegistry - connect zookeeper 2018-09-03 11:17:03 [main] 49::: DEBUG com.zh.job.registry.impl.ZookeeperServiceRegistry - create address node: /job_registry/firstExecutor/address-0000000009 2018-09-03 11:17:03 [main] 107::: INFO com.zh.job.executor.ExecutorServer - register service: firstExecutor => 127.0.0.1:8000 2018-09-03 11:17:03 [main] 109::: INFO com.zh.job.executor.ExecutorServer - server started on port 8000 2018-09-03 11:17:15 [nioEventLoopGroup-3-1] 17::: INFO com.zh.job.sample.executor.task.FirstTask - execute firstTask
2018-09-03 11:17:14 [myScheduler_Worker-1] 28::: INFO com.zh.job.scheduler.RemoteQuartzJobBean - jobName:firstTask,group:DEFAULT 2018-09-03 11:17:15 [myScheduler_Worker-2] 28::: INFO com.zh.job.scheduler.RemoteQuartzJobBean - jobName:firstTask,group:DEFAULT 2018-09-03 11:17:15 [myScheduler_Worker-1] 33::: DEBUG com.zh.job.registry.impl.ZookeeperServiceDiscovery - connect zookeeper 2018-09-03 11:17:15 [myScheduler_Worker-2] 54::: DEBUG com.zh.job.registry.impl.ZookeeperServiceDiscovery - get only address node: address-0000000009 2018-09-03 11:17:15 [myScheduler_Worker-1] 54::: DEBUG com.zh.job.registry.impl.ZookeeperServiceDiscovery - get only address node: address-0000000009 2018-09-03 11:17:15 [myScheduler_Worker-2] 42::: DEBUG com.zh.job.scheduler.JobProxy$1 - discover service: firstExecutor => 127.0.0.1:8000 2018-09-03 11:17:15 [myScheduler_Worker-1] 42::: DEBUG com.zh.job.scheduler.JobProxy$1 - discover service: firstExecutor => 127.0.0.1:8000 2018-09-03 11:17:15 [myScheduler_Worker-1] 55::: DEBUG com.zh.job.scheduler.JobProxy$1 - time: 369ms 2018-09-03 11:17:15 [myScheduler_Worker-1] 33::: INFO com.zh.job.scheduler.RemoteQuartzJobBean - result:com.zh.job.common.bean.Result@33b61489
本文經過一個實例來分析如何解決原生Quartz調度存在不足的問題,主要體如今調度器與執行器的隔離上,各司其責發揮各自的優點;