XXL-JOB特性一覽
基本概念
xxljob解決的是如下幾個痛點:php
1) 跑批處理監控沒法可視化,當前跑批的狀態是:成功?失敗?掛起?進度?爲何失敗/緣由?html
2)批處理任務不可重用,特別是沒法對批處理任務作:定時?重複使用?頻次?其中路由調度?java
3)批處理任務沒法作到網格計算(特別像:websphere ibm grid computing)即批處理任務自己能夠作成集羣、fail over、shardingmysql
由其是批處理任務能夠作成「網格計算」這個功能,有了xxljob後你會發覺特別強大。其實網格計算:grid computing早在7,8年前就已經被提出過,我在《IBM網格計算與企業批處理任務架構》一文中詳細有過介紹。nginx
咱們在一些項目中如:銀行、保險、零商業門店系統中的對賬、結賬、覈算、日結等操做中常常會碰到一些"批處理「做業。git
這些批處理常常會涉及到一些大數據處理,同時處理一批增、刪、改、查等SQL,每每涉及到好幾張表,這邊取點數據那邊寫點數據,運行一些存儲過程等。程序員
批處理每每耗時、耗資源,每每還會用到多線程去設計程序代碼,有時處理很差還會碰到內存泄漏、溢出、不夠、CPU佔用高達99%,服務器被嚴重堵塞等現象。github
筆者曾經經歷過一個批處理的3次優化,試圖從如下幾個點去着手優化這個企業跑批。web
首先,咱們須要該批處理筆者按照數據庫鏈接池的原理實現了一個線程池,使得線程數能夠動態設定,不用的線程還可還回線程池。redis
其次,它須要支持負載均衡。
再者呢,它須要能夠在線動態進行如:schedule配置。
而後確定是良好的監控機制。
在當時動用了5個研發,最終雖然取得了較滿意的結果,可是當中不斷的優化程序、算法上耗時彼多,尤爲是在作負截勻衡時用到了很複雜的機制等。你們知道,一個web或者是一個app容器當達到極限時能夠經過加入集羣以及增長節點的手段來提升整體的處理能力,可是這個批處理每每是一個應用程序,要把應用程序作成集羣經過增長節點的方式來提升處理能力可不是一件簡單的事,對吧?
固然我不是說不能實現,硬代碼經過SOCKET通信利用MQ機制加數據庫隊列是一種比較通用的設計手段,能夠作到應用程序在處理大數據大事務時的集羣這樣的能力。而當時的ibm websphere grid computing組件提供了這麼一種能力,可是它太貴了,至少須要上百萬才能搞得定。
因而咱們就有了xxljob這麼一個開源的網格計算產品出來了。所謂網格計算和集羣不同的地方在於,
網格本質上就是動態的。集羣包含的處理器和資源的數量一般都是靜態的;而在網格上,資源則能夠動態出現。資源能夠根據須要添加到網格中,或從網格中刪除。網格天生就是在本地網、城域網或廣域網上進行分佈的。一般,集羣物理上都包含在一個位置的相同地方;網格能夠分佈在任何地方。集羣互連技術能夠產生很是低的網絡延時,若是集羣距離很遠,這可能會致使產生不少問題。
xxljob的基本使用
源碼地址
要使用xxljob請去下載這的源碼,目前最新版本爲:2.2.0,源碼地址在這:https://github.com/xuxueli/xxl-job/tree/v2.2.0
下載解壓後你會獲得這麼一個目錄。
請使用eclipse把它整個導入進你的workspace中去,你會獲得嘎許多東東
其中咱們主要使用的是xxl-job-admin這個工程。要作生產級別的使用,咱們必須使用數據庫作載體。
準備數據庫
咱們打開「xxl-job-master->doc->db->tables_xxl_job.sql」這個文件,把它放到咱們的mysql中運行它。
CREATE database if NOT EXISTS `xxl_job` default character set utf8mb4 collate utf8mb4_unicode_ci; use `xxl_job`; SET NAMES utf8mb4; CREATE TABLE `xxl_job_info` ( `id` int(11) NOT NULL AUTO_INCREMENT, `job_group` int(11) NOT NULL COMMENT '執行器主鍵ID', `job_cron` varchar(128) NOT NULL COMMENT '任務執行CRON', `job_desc` varchar(255) NOT NULL, `add_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, `author` varchar(64) DEFAULT NULL COMMENT '做者', `alarm_email` varchar(255) DEFAULT NULL COMMENT '報警郵件', `executor_route_strategy` varchar(50) DEFAULT NULL COMMENT '執行器路由策略', `executor_handler` varchar(255) DEFAULT NULL COMMENT '執行器任務handler', `executor_param` varchar(512) DEFAULT NULL COMMENT '執行器任務參數', `executor_block_strategy` varchar(50) DEFAULT NULL COMMENT '阻塞處理策略', `executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任務執行超時時間,單位秒', `executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失敗重試次數', `glue_type` varchar(50) NOT NULL COMMENT 'GLUE類型', `glue_source` mediumtext COMMENT 'GLUE源代碼', `glue_remark` varchar(128) DEFAULT NULL COMMENT 'GLUE備註', `glue_updatetime` datetime DEFAULT NULL COMMENT 'GLUE更新時間', `child_jobid` varchar(255) DEFAULT NULL COMMENT '子任務ID,多個逗號分隔', `trigger_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '調度狀態:0-中止,1-運行', `trigger_last_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '上次調度時間', `trigger_next_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '下次調度時間', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; CREATE TABLE `xxl_job_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `job_group` int(11) NOT NULL COMMENT '執行器主鍵ID', `job_id` int(11) NOT NULL COMMENT '任務,主鍵ID', `executor_address` varchar(255) DEFAULT NULL COMMENT '執行器地址,本次執行的地址', `executor_handler` varchar(255) DEFAULT NULL COMMENT '執行器任務handler', `executor_param` varchar(512) DEFAULT NULL COMMENT '執行器任務參數', `executor_sharding_param` varchar(20) DEFAULT NULL COMMENT '執行器任務分片參數,格式如 1/2', `executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失敗重試次數', `trigger_time` datetime DEFAULT NULL COMMENT '調度-時間', `trigger_code` int(11) NOT NULL COMMENT '調度-結果', `trigger_msg` text COMMENT '調度-日誌', `handle_time` datetime DEFAULT NULL COMMENT '執行-時間', `handle_code` int(11) NOT NULL COMMENT '執行-狀態', `handle_msg` text COMMENT '執行-日誌', `alarm_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '告警狀態:0-默認、1-無需告警、2-告警成功、3-告警失敗', PRIMARY KEY (`id`), KEY `I_trigger_time` (`trigger_time`), KEY `I_handle_code` (`handle_code`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; CREATE TABLE `xxl_job_log_report` ( `id` int(11) NOT NULL AUTO_INCREMENT, `trigger_day` datetime DEFAULT NULL COMMENT '調度-時間', `running_count` int(11) NOT NULL DEFAULT '0' COMMENT '運行中-日誌數量', `suc_count` int(11) NOT NULL DEFAULT '0' COMMENT '執行成功-日誌數量', `fail_count` int(11) NOT NULL DEFAULT '0' COMMENT '執行失敗-日誌數量', PRIMARY KEY (`id`), UNIQUE KEY `i_trigger_day` (`trigger_day`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; CREATE TABLE `xxl_job_logglue` ( `id` int(11) NOT NULL AUTO_INCREMENT, `job_id` int(11) NOT NULL COMMENT '任務,主鍵ID', `glue_type` varchar(50) DEFAULT NULL COMMENT 'GLUE類型', `glue_source` mediumtext COMMENT 'GLUE源代碼', `glue_remark` varchar(128) NOT NULL COMMENT 'GLUE備註', `add_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; CREATE TABLE `xxl_job_registry` ( `id` int(11) NOT NULL AUTO_INCREMENT, `registry_group` varchar(50) NOT NULL, `registry_key` varchar(255) NOT NULL, `registry_value` varchar(255) NOT NULL, `update_time` datetime DEFAULT NULL, PRIMARY KEY (`id`), KEY `i_g_k_v` (`registry_group`,`registry_key`,`registry_value`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; CREATE TABLE `xxl_job_group` ( `id` int(11) NOT NULL AUTO_INCREMENT, `app_name` varchar(64) NOT NULL COMMENT '執行器AppName', `title` varchar(12) NOT NULL COMMENT '執行器名稱', `address_type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '執行器地址類型:0=自動註冊、1=手動錄入', `address_list` varchar(512) DEFAULT NULL COMMENT '執行器地址列表,多地址逗號分隔', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; CREATE TABLE `xxl_job_user` ( `id` int(11) NOT NULL AUTO_INCREMENT, `username` varchar(50) NOT NULL COMMENT '帳號', `password` varchar(50) NOT NULL COMMENT '密碼', `role` tinyint(4) NOT NULL COMMENT '角色:0-普通用戶、1-管理員', `permission` varchar(255) DEFAULT NULL COMMENT '權限:執行器ID列表,多個逗號分割', PRIMARY KEY (`id`), UNIQUE KEY `i_username` (`username`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; CREATE TABLE `xxl_job_lock` ( `lock_name` varchar(50) NOT NULL COMMENT '鎖名稱', PRIMARY KEY (`lock_name`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; INSERT INTO `xxl_job_group`(`id`, `app_name`, `title`, `address_type`, `address_list`) VALUES (1, 'xxl-job-executor-sample', '示例執行器', 0, NULL); INSERT INTO `xxl_job_info`(`id`, `job_group`, `job_cron`, `job_desc`, `add_time`, `update_time`, `author`, `alarm_email`, `executor_route_strategy`, `executor_handler`, `executor_param`, `executor_block_strategy`, `executor_timeout`, `executor_fail_retry_count`, `glue_type`, `glue_source`, `glue_remark`, `glue_updatetime`, `child_jobid`) VALUES (1, 1, '0 0 0 * * ? *', '測試任務1', '2018-11-03 22:21:31', '2018-11-03 22:21:31', 'XXL', '', 'FIRST', 'demoJobHandler', '', 'SERIAL_EXECUTION', 0, 0, 'BEAN', '', 'GLUE代碼初始化', '2018-11-03 22:21:31', ''); INSERT INTO `xxl_job_user`(`id`, `username`, `password`, `role`, `permission`) VALUES (1, 'admin', 'e10adc3949ba59abbe56e057f20f883e', 1, NULL); INSERT INTO `xxl_job_lock` ( `lock_name`) VALUES ( 'schedule_lock'); commit;
運行後它會在咱們的mysql中生成一個叫xxl_job的schema,在下面會有8張表(不是12張啊,網上都是12張的2.0的beta版,不許確!以我爲準)。
配置xxl-job-admin工程
咱們打開application.properties
### web server.port=9091 server.servlet.context-path=/xxl-job-admin ### actuator management.server.servlet.context-path=/actuator management.health.mail.enabled=false ### resources spring.mvc.servlet.load-on-startup=0 spring.mvc.static-path-pattern=/static/** spring.resources.static-locations=classpath:/static/ ### freemarker spring.freemarker.templateLoaderPath=classpath:/templates/ spring.freemarker.suffix=.ftl spring.freemarker.charset=UTF-8 spring.freemarker.request-context-attribute=request spring.freemarker.settings.number_format=0.########## ### mybatis mybatis.mapper-locations=classpath:/mybatis-mapper/*Mapper.xml #mybatis.type-aliases-package=com.xxl.job.admin.core.model ### xxl-job, datasource spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai spring.datasource.username=jobadmin spring.datasource.password=111111 spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver ### datasource-pool spring.datasource.type=com.zaxxer.hikari.HikariDataSource spring.datasource.hikari.minimum-idle=10 spring.datasource.hikari.maximum-pool-size=30 spring.datasource.hikari.auto-commit=true spring.datasource.hikari.idle-timeout=30000 spring.datasource.hikari.pool-name=HikariCP spring.datasource.hikari.max-lifetime=900000 spring.datasource.hikari.connection-timeout=10000 spring.datasource.hikari.connection-test-query=SELECT 1 ### xxl-job, email spring.mail.host=localhost spring.mail.port=25 spring.mail.username=javamail spring.mail.password=111111 spring.mail.properties.mail.smtp.auth=true spring.mail.properties.mail.smtp.starttls.enable=false spring.mail.properties.mail.smtp.starttls.required=false spring.mail.properties.mail.smtp.socketFactory.class=javax.net.ssl.SSLSocketFactory ### xxl-job, access token xxl.job.accessToken= ### xxl-job, i18n (default is zh_CN, and you can choose "zh_CN", "zh_TC" and "en") xxl.job.i18n=zh_CN ## xxl-job, triggerpool max size xxl.job.triggerpool.fast.max=200 xxl.job.triggerpool.slow.max=100 ### xxl-job, log retention days xxl.job.logretentiondays=30
下面是核心參數解析
xxl-job-admin portal所在地址
server.port=9091
server.servlet.context-path=/xxl-job-admin
xxl-job-admin的數據庫配置
### xxl-job, datasource
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=jobadmin
spring.datasource.password=111111
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
xxl-job-admin默認使用了hikari數據庫鏈接池,它是目前性能最好的數據庫鏈接池。目前市面上的數據庫鏈接池的性能比從高到低的位置爲:hikariCP>druid>tomcat-jdbc>dbcp>c3p0 。
druid鏈接池只是功能比較多而己,這不是瞎說的,這是業界的權威數據對比:
是否支持PSCache | 是 | 是 | 是 | 否 | 否 |
監控 | jmx | jmx/log/http | jmx,log | jmx | jmx |
擴展性 | 弱 | 好 | 弱 | 弱 | 弱 |
sql攔截及解析 | 無 | 支持 | 無 | 無 | 無 |
代碼 | 簡單 | 中等 | 複雜 | 簡單 | 簡單 |
更新時間 | 2015.8.6 | 2015.10.10 | 2015.12.09 | 2015.12.3 | |
特色 | 依賴於common-pool | 阿里開源,功能全面 | 歷史久遠,代碼邏輯複雜,且不易維護 | 優化力度大,功能簡單,起源於boneCP | |
鏈接池管理 | LinkedBlockingDeque | 數組 | FairBlockingQueue | threadlocal+CopyOnWriteArrayList |
報警郵件的設置
### xxl-job, email
spring.mail.host=localhost
spring.mail.port=25
spring.mail.username=javamail
spring.mail.password=111111
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=false
spring.mail.properties.mail.smtp.starttls.required=false
spring.mail.properties.mail.smtp.socketFactory.class=javax.net.ssl.SSLSocketFactory
我在個人本機搭建了一個james郵件服務器,由於我沒有域、也沒有證書,一貧如洗。因此我就把starttls全給關閉了的哈!
運行xxl-job-admin
配完後咱們經過XxlJobAdminApplication.java來啓動它。
輸入http://localhost:9091/xxl-job-admin/ 後咱們會被提示用戶名和密碼。xxl-job-admin的默認用戶名和密碼爲admin/123456.
登陸後請自行點擊右上角的「歡迎xxx「圖標而後修改admin的密碼便可。
xxl-job的基本實現
有了xxl-admin-job後咱們就須要再知道一個概念,executor。
executor
什麼是executor?
一個executor就是一個spring boot的xxl-job。沒錯,在生產上運行時我建議是一個批處理一個job,以達到最佳的擴展性。
你要省一點,也是能夠的,那麼能夠把一組業務相同的job放在一個spring boot工程中。
如這邊,咱們定義了4個job,每一個job都符合這樣的規則:
@Component public class DemoJob { @XxlJob("demoJobHandler") public ReturnT<String> demoJobHandler(String param) throws Exception { XxlJobLogger.log("XXL-JOB, Hello World."); //具體作點蝦米內容 return ReturnT.SUCCESS; } }
xxl-job結構體解說
首先,它必須是一個@Component。
其次,它有一個"public ReturnT<String> demoJobHandler(String param) throws Exception "方法,且這個方法上有@XxlJob("job入口名")。
須要注意的地方是:
- 若是job返回的是成功,那麼須要使用return ReturnT.SUCCESS;
- 若是job有碰到exception反回失敗,那麼須要使用return ReturnT.FAIL,相應的FAIL也會觸發郵件的報警;
- 任何須要在xxl-job-admin中顯示成功、運行、失敗日誌的內容,你必須使用:XxlJobLogger而不是log4j,你也可使用log4j混用;
建立咱們本身的executor
工程介紹
咱們的工程是標準的parent->module羣→子module的maven工程,所以我下面給出完整的maven依賴。
parent工程
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.sky.retail.platform</groupId> <artifactId>sky-parent2.0</artifactId> <version>0.0.1</version> <packaging>pom</packaging> <properties> <java.version>1.8</java.version> <spring-boot.version>2.1.7.RELEASE</spring-boot.version> <spring-cloud-zk-discovery.version>2.1.3.RELEASE</spring-cloud-zk-discovery.version> <zookeeper.version>3.4.13</zookeeper.version> <spring-cloud.version>Finchley.RELEASE</spring-cloud.version> <dubbo.version>2.7.3</dubbo.version> <curator-framework.version>4.0.1</curator-framework.version> <curator-recipes.version>2.8.0</curator-recipes.version> <druid.version>1.1.20</druid.version> <guava.version>27.0.1-jre</guava.version> <fastjson.version>1.2.59</fastjson.version> <dubbo-registry-nacos.version>2.7.3</dubbo-registry-nacos.version> <nacos-client.version>1.1.4</nacos-client.version> <mysql-connector-java.version>5.1.46</mysql-connector-java.version> <disruptor.version>3.4.2</disruptor.version> <aspectj.version>1.8.13</aspectj.version> <spring.data.redis>1.8.14-RELEASE</spring.data.redis> <skycommon.version>0.0.1</skycommon.version> <seata.version>1.0.0</seata.version> <netty.version>4.1.42.Final</netty.version> <nacos.spring.version>0.1.4</nacos.spring.version> <lombok.version>1.16.22</lombok.version> <javax.servlet.version>3.1.0</javax.servlet.version> <mybatis.version>3.4.5</mybatis.version> <mybatis.spring.version>1.3.1</mybatis.spring.version> <spring.kafka.version>1.3.10.RELEASE</spring.kafka.version> <kafka.client.version>1.0.2</kafka.client.version> <shardingsphere.jdbc.version>4.0.0</shardingsphere.jdbc.version> <xmemcached.version>2.4.6</xmemcached.version> <swagger.version>2.9.2</swagger.version> <swagger.bootstrap.ui.version>1.9.6</swagger.bootstrap.ui.version> <swagger.model.version>1.5.23</swagger.model.version> <swagger-annotations.version>1.5.22</swagger-annotations.version> <swagger-models.version>1.5.22</swagger-models.version> <swagger-bootstrap-ui.version>1.9.5</swagger-bootstrap-ui.version> <sky-pos-common.version>0.0.1</sky-pos-common.version> <sky-sharding-jdbc.version>0.0.1</sky-sharding-jdbc.version> <cxf.version>3.1.6</cxf.version> <jackson-databind.version>2.9.6</jackson-databind.version> <gson.version>2.8.6</gson.version> <groovy.version>2.5.8</groovy.version> <logback-ext-spring.version>0.1.4</logback-ext-spring.version> <jcl-over-slf4j.version>1.7.25</jcl-over-slf4j.version> <spock-spring.version>2.0-M2-groovy-2.5</spock-spring.version> <xxljob.version>2.2.0</xxljob.version> <commons-lang.version>2.6</commons-lang.version> <maven.compiler.source>${java.version}</maven.compiler.source> <maven.compiler.target>${java.version}</maven.compiler.target> <compiler.plugin.version>3.6.0</compiler.plugin.version> <war.plugin.version>3.2.3</war.plugin.version> <jar.plugin.version>3.1.1</jar.plugin.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> </properties> <dependencyManagement> <!-- spring cloud --> <dependencies> <!-- Logback --> <dependency> <groupId>org.logback-extensions</groupId> <artifactId>logback-ext-spring</artifactId> <version>${logback-ext-spring.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> <version>${jcl-over-slf4j.version}</version> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-zookeeper-discovery</artifactId> <version>${spring-cloud-zk-discovery.version}</version> <exclusions> <exclusion> <artifactId>commons-logging</artifactId> <groupId>commons-logging</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> <!--zk --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-zookeeper-discovery</artifactId> </dependency> <!--zk工具包 --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>${zookeeper.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <!-- xxl-rpc-core --> <dependency> <groupId>com.xuxueli</groupId> <artifactId>xxl-job-core</artifactId> <version>${xxljob.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <version>${spring-boot.version}</version> <scope>test</scope> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.spockframework</groupId> <artifactId>spock-core</artifactId> <version>1.3-groovy-2.4</version> <scope>test</scope> </dependency> <dependency> <groupId>org.spockframework</groupId> <artifactId>spock-spring</artifactId> <version>1.3-RC1-groovy-2.4</version> <scope>test</scope> </dependency> <dependency> <groupId>org.codehaus.groovy</groupId> <artifactId>groovy-all</artifactId> <version>2.4.6</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>${gson.version}</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>${jackson-databind.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web-services</artifactId> <version>${spring-boot.version}</version> </dependency> <dependency> <groupId>org.apache.cxf</groupId> <artifactId>cxf-rt-frontend-jaxws</artifactId> <version>${cxf.version}</version> </dependency> <dependency> <groupId>org.apache.cxf</groupId> <artifactId>cxf-rt-transports-http</artifactId> <version>${cxf.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-security</artifactId> <version>${spring-boot.version}</version> </dependency> <dependency> <groupId>io.github.swagger2markup</groupId> <artifactId>swagger2markup</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>${swagger.version}</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>${swagger.version}</version> </dependency> <dependency> <groupId>com.github.xiaoymin</groupId> <artifactId>swagger-bootstrap-ui</artifactId> <version>${swagger-bootstrap-ui.version}</version> </dependency> <dependency> <groupId>io.swagger</groupId> <artifactId>swagger-annotations</artifactId> <version>${swagger-annotations.version}</version> </dependency> <dependency> <groupId>io.swagger</groupId> <artifactId>swagger-models</artifactId> <version>${swagger-models.version}</version> </dependency> <dependency> <groupId>org.sky</groupId> <artifactId>sky-sharding-jdbc</artifactId> <version>${sky-sharding-jdbc.version}</version> </dependency> <dependency> <groupId>org.sky.retail.platform</groupId> <artifactId>sky-pos-common2.0</artifactId> <version>${sky-pos-common.version}</version> </dependency> <dependency> <groupId>com.googlecode.xmemcached</groupId> <artifactId>xmemcached</artifactId> <version>${xmemcached.version}</version> </dependency> <dependency> <groupId>org.apache.shardingsphere</groupId> <artifactId>sharding-jdbc-core</artifactId> <version>${shardingsphere.jdbc.version}</version> </dependency> <!-- <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>${spring.kafka.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.client.version}</version> </dependency> --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.10.RELEASE</version> </dependency> <dependency> <groupId>org.mybatis</groupId> <artifactId>mybatis</artifactId> <version>${mybatis.version}</version> </dependency> <dependency> <groupId>org.mybatis</groupId> <artifactId>mybatis-spring</artifactId> <version>${mybatis.spring.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>${spring-boot.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring-boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>org.apache.dubbo</groupId> <artifactId>dubbo-spring-boot-starter</artifactId> <version>${dubbo.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.dubbo</groupId> <artifactId>dubbo</artifactId> <version>${dubbo.version}</version> <exclusions> <exclusion> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>${curator-framework.version}</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>${curator-recipes.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql-connector-java.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>${druid.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>${druid.version}</version> </dependency> <dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>${disruptor.version}</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>${guava.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> <dependency> <groupId>org.apache.dubbo</groupId> <artifactId>dubbo-registry-nacos</artifactId> <version>${dubbo-registry-nacos.version}</version> </dependency> <dependency> <groupId>com.alibaba.nacos</groupId> <artifactId>nacos-client</artifactId> <version>${nacos-client.version}</version> </dependency> <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjweaver</artifactId> <version>${aspectj.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>${spring-boot.version}</version> </dependency> <dependency> <groupId>io.seata</groupId> <artifactId>seata-all</artifactId> <version>${seata.version}</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>${netty.version}</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>${lombok.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/com.alibaba.boot/nacos-config-spring-boot-starter --> <dependency> <groupId>com.alibaba.boot</groupId> <artifactId>nacos-config-spring-boot-starter</artifactId> <version>${nacos.spring.version}</version> <exclusions> <exclusion> <artifactId>nacos-client</artifactId> <groupId>com.alibaba.nacos</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>net.sourceforge.groboutils</groupId> <artifactId>groboutils-core</artifactId> <version>5</version> </dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>${commons-lang.version}</version> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>${compiler.plugin.version}</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> <dependencies> <dependency> <groupId>org.codehaus.groovy</groupId> <artifactId>groovy-eclipse-compiler</artifactId> <version>2.7.0-01</version> </dependency> </dependencies> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-war-plugin</artifactId> <version>${war.plugin.version}</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>${jar.plugin.version}</version> </plugin> </plugins> </build> </project>
sky-pos-common2.0工程
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.sky.retail.platform</groupId> <artifactId>sky-parent2.0</artifactId> <version>0.0.1</version> </parent> <artifactId>sky-pos-common2.0</artifactId> <packaging>jar</packaging> <dependencies> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> </dependency> <dependency> <groupId>com.googlecode.xmemcached</groupId> <artifactId>xmemcached</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjweaver</artifactId> </dependency> <dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> </dependencies> </project>
sky-pos-xxljob工程
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.sky.retail.platform</groupId> <artifactId>sky-parent2.0</artifactId> <version>0.0.1</version> </parent> <packaging>jar</packaging> <artifactId>sky-pos-xxljob</artifactId> <dependencies> <!-- Logback --> <dependency> <groupId>org.logback-extensions</groupId> <artifactId>logback-ext-spring</artifactId> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> </dependency> <dependency> <groupId>com.xuxueli</groupId> <artifactId>xxl-job-core</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjweaver</artifactId> </dependency> <dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> </dependency> <dependency> <groupId>org.sky.retail.platform</groupId> <artifactId>sky-pos-common2.0</artifactId> </dependency> </dependencies> <build> <sourceDirectory>src/main/java</sourceDirectory> <testSourceDirectory>src/test/java</testSourceDirectory> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>${compiler.plugin.version}</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> </plugins> <resources> <resource> <directory>src/main/resources</directory> </resource> <resource> <directory>src/main/webapp</directory> <targetPath>META-INF/resources</targetPath> <includes> <include>**/**</include> </includes> </resource> <resource> <directory>src/main/resources</directory> <filtering>true</filtering> <includes> <include>application.properties</include> <include>application-${profileActive}.properties</include> </includes> </resource> </resources> </build> </project>
sky-pos-xxljob核心工程
這個,就是咱們的poc工程,咱們按照spring boot2的規範,先說一下它的application.yml文件內容。
application.yml
server: port: 8082 logging: #日誌文件 config: classpath:logback.xml spring: datasource: type: com.alibaba.druid.pool.DruidDataSource driverClassName: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/storetrans01?useUnicode=true&characterEncoding=utf-8&useSSL=false username: storetrans01 password: 111111 xxl: job: admin: #調度中心部署跟地址:如調度中心集羣部署存在多個地址則用逗號分隔。 #執行器將會使用該地址進行"執行器心跳註冊"和"任務結果回調"。 addresses: http://localhost/xxl-job-admin accessToken: #分別配置執行器的名稱、ip地址、端口號 #注意:若是配置多個執行器時,防止端口衝突 executor: appname: posDemoExecutor ip: 127.0.0.1 port: 9999 address: #執行器運行日誌文件存儲的磁盤位置,須要對該路徑擁有讀寫權限 logpath: /Users/apple/data/applogs/xxl-job #執行器Log文件按期清理功能,指定日誌保存天數,日誌文件過時自動刪除。限制至少保持3天,不然功能不生效; #-1表示永不刪除 logretentiondays: -1
咱們在此:
- 定義了它的web服務器(內嵌tomcat運行端口);
- 日誌文件爲logback.xml,這是xxljob必需要使用的日誌類,同時你也能夠扔一個log4j2.xml文件在resources目錄,這個不要緊的,可是默認要申明使用logback;
- 數據源,由於咱們後面的一個驗證集羣模式和利用網絡計算能力的poc要用到數據庫;
- xxljob admin的address的申明,它個地址就是咱們佈署前面那個xxl-job-admin管理端的服務器所在地址,若是你有多個調度器集羣你可使用相似:http://server1/xxl-job-admin,http://server2/xxl-job-admin這樣來作申明,其實在這咱們已經使用了兩個調度器作集羣了,咱們實際上是設置了2個xxl-job-admin集羣上面咱們使用了nginx來作了ha了,那麼這樣nginx再能夠由多個,因而就組成了一個個的網絡;
- 你能夠在executor與admin(又叫調度器)間使用token通信;
- executor申明,這個executor就是「該springboot中全部的xxl-job在運行時被xxl-job-admin的調度端所」回調「的端口。說白點就是executor與調度端是互相保持心跳並使用」配置管理中心註冊「來保持互相同步的這麼一種機制。
springboot無註解配置類
DataSourceConfig.java
package org.sky.retail.xxljob.config; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; import com.alibaba.druid.pool.DruidDataSource; import javax.sql.DataSource; @Configuration public class DataSourceConfig { @Bean("dataSource") @ConfigurationProperties(prefix = "spring.datasource") public DataSource createDataSource() { return new DruidDataSource(); } @Bean public JdbcTemplate dataSource(DataSource dataSource) { return new JdbcTemplate(dataSource); } }
XxlJobConfig.java
請注意,網上的不少博客用的是2.0第一版,不少函數和類別都已經和2.2.0版不同啦!
package org.sky.retail.xxljob.config; import com.xxl.job.core.executor.impl.XxlJobSpringExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * xxl-job config * * @author mk.yuan 2020-05-06 */ @Configuration public class XxlJobConfig { private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class); @Value("${xxl.job.admin.addresses}") private String adminAddresses; @Value("${xxl.job.accessToken}") private String accessToken; @Value("${xxl.job.executor.appname}") private String appname; @Value("${xxl.job.executor.address}") private String address; @Value("${xxl.job.executor.ip}") private String ip; @Value("${xxl.job.executor.port}") private int port; @Value("${xxl.job.executor.logpath}") private String logPath; @Value("${xxl.job.executor.logretentiondays}") private int logRetentionDays; @Bean public XxlJobSpringExecutor xxlJobExecutor() { logger.info(">>>>>>>>>>> xxl-job config init."); XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(appname); xxlJobSpringExecutor.setAddress(address); xxlJobSpringExecutor.setIp(ip); xxlJobSpringExecutor.setPort(port); xxlJobSpringExecutor.setAccessToken(accessToken); xxlJobSpringExecutor.setLogPath(logPath); xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays); return xxlJobSpringExecutor; } /** * 針對多網卡、容器內部署等狀況,可藉助 "spring-cloud-commons" 提供的 "InetUtils" 組件靈活定製註冊IP; * * 一、引入依賴: <dependency> <groupId>org.springframework.cloud</groupId> * <artifactId>spring-cloud-commons</artifactId> <version>${version}</version> * </dependency> * * 二、配置文件,或者容器啓動變量 spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.' * * 三、獲取IP String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress(); */ }
HelloWorld級別的第一個executor
DemoJob.java
package org.sky.retail.xxljob.executor; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.annotation.XxlJob; import com.xxl.job.core.log.XxlJobLogger; @Component public class DemoJob { private static Logger logger = LoggerFactory.getLogger(DemoJob.class); @XxlJob("demoJobHandler") public ReturnT<String> demoJobHandler(String param) throws Exception { XxlJobLogger.log("XXL-JOB, Hello World."); for (int i = 0; i < 5; i++) { XxlJobLogger.log("beat at:" + i); TimeUnit.SECONDS.sleep(2); } return ReturnT.SUCCESS; } }
job解說
它其實作了一件很簡單的事情,模擬一個耗時5秒的job運行,一邊運行一邊輸出日誌,最後完成。
佈署DemoJob.java至xxl-job-admin
首先,咱們先把sky-pos-xxljob運行起來,爲此咱們書寫一個主啓動類
XxlJob.java
package org.sky.retail.xxljob; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; import org.springframework.context.annotation.ComponentScan; import org.springframework.transaction.annotation.EnableTransactionManagement; @SpringBootApplication @ComponentScan(basePackages = { "org.sky" }) @EnableAutoConfiguration(exclude = DataSourceAutoConfiguration.class) @EnableTransactionManagement public class XxlJob { public static void main(String[] args) { SpringApplication.run(XxlJob.class, args); } }
它運行在9999端口。
新建執行器
打開xxl-job-admin
咱們新增一個執行器叫posDemoExecutor的,以下內容:
這邊的註冊方式有須要要注意的:
- 自動發現不可作executor的集羣
- 手動錄入方式後須要在機器地址中輸入相應的executor地址,若是你是executor集羣,那麼你可使用逗號來間隔每一個executor的地址
新建任務
咱們在任務管理中新建一個任務叫」demoJobHandler「,這個名字就和你的executor中的」 @XxlJob("demoJobHandler")「必須相同,它是用來作bean的註冊用的。
具體任務內容以下:
請注意這邊咱們有幾樣東西須要注意:
- 故障轉移,我會使用一樣功能多個executor給作」故障轉移「,說白了我一個job由若干個workers去共同作,一我的作,其它workers等待,若是那個主要負責的workers猝了,另外一個worker立刻頂上;
- 任務超時,多久時間executor沒有和xxl-job-admin(調度端)通信,那麼該任務執行就認爲失敗;
- jobHandler就是@XxlJob("demoJobHandler")中的bean的名字;
- cron,這個是標準的cron表達式,咱們這邊設置的是每隔5秒執行一次,說白了就是你點」開始「這個工做,它會每隔5秒開始執行一下;
全配置好了後,咱們回到eclipse裏。
保持當前的xxljob的executor處於運行狀態,而後咱們把application.yml文件中的:
- server:port:8081,這邊從原本的8082改爲8081
- xxl:job:executor:port,這邊從原本的9999改爲9998
再運行一下XxlJob.java,使得在eclipse裏再啓動起一個xxljob的executor來,它運行在9998端口
而後咱們來運行這個任務吧!
運行demoJobHandler
啓動後咱們是能夠在「調度日誌」裏不斷的看到日誌文件被生成。
實際的日誌文件位於這「/Users/apple/data/applogs/xxl-job」,它也在咱們的application.yml文件中所配
在界面中咱們能夠直接看到該日誌內容:
模擬executor宕機
咱們配了2個executor,一個運行在9999,一個運行在9998,若是咱們殺了一個,理論上另外一個executor會馬上把任務接手過來。
因此咱們殺9998.
殺了,而後咱們來看看admin端
咱們能夠看到xxl-job-admin端任務狀態是運行,xxl-job-admin後端也拋了錯。
可是,咱們依舊能夠看到調度日誌裏的「成功」任務還在不斷的增長:
間隔了1分鐘的查詢:
咱們再來查查看有沒有「失敗」的任務
有任務的失敗記錄,同時咱們也在executor 9998被殺死時也接到了報警郵件。
可是咱們的executor依舊運行的很好。
這說明,2個executor一個掛了,另外一個馬上把這個executor的任務給接手過來了。
該場景適用於「多個executor集羣運行同一個相同的任務「,可是同時只有一個executor在運行,其它的executor一旦碰到運行中的executor掛機時馬上會被頂上,它會在1秒不到內自動啓來。
xxl-job的HA機制的實現
目標
製做一個job,它主要做的事爲:
- 每次在s_order表中選取10條handle_flag=0的記錄,而且該記錄在s_order_item中存在;
- 根據以上關係關聯查詢s_order表和s_order_item表,並計算s_order_item表中的goods_amount*sales_price的乘職;
- 而後把計算後的值插入到s_order_rpt表中;
- 插入s_order_rpt表成功後,把這10條記錄在s_order表中的handle_flag置爲1;
驗證步驟
- 啓動兩個一樣的job,一個運行在9998端口,一個運行在9999端口;
- 把job設成「故障轉移」機制;
- 讓該job始終處於運行狀態;
- 在運行時,殺掉第1個job,觀察任務,觀察數據庫,因爲2個job中有1個活着,所以不會報錯,任務照樣進行。可是因爲是jbox-executor被殺,由於會受到「報警郵件」;
- 把殺掉的第1個job恢復,觀察任務,觀察數據庫,任務照樣進行;
- 殺掉第2個job,觀察任務,觀察數據庫,因爲2個job中有1個活着,所以不會報錯,任務照樣進行,可是因爲是jbox-executor被殺,由於會受到「報警郵件」;
- 讓第1個job以單實例繼續運行完後,觀察s_order_rpt中的記錄數應等於最早s_order中handle_flag=0的總記錄數
開始驗證
s_order中總記錄數爲:2108條
製做job-dbJob
package org.sky.retail.xxljob.executor; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import javax.annotation.Resource; import org.sky.retail.xxljob.service.OrderService; import org.sky.retail.xxljob.vo.OrderVO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.annotation.XxlJob; import com.xxl.job.core.log.XxlJobLogger; @Component public class DbJob { private static Logger logger = LoggerFactory.getLogger(DbJob.class); @Resource private OrderService orderService; @XxlJob("dbJob") public ReturnT<String> dbJob(String param) throws Exception { try { int result = orderService.updateOrderRpt(); return ReturnT.SUCCESS; } catch (Exception e) { XxlJobLogger.log(e.getMessage(), e); return ReturnT.FAIL; } } }
OrderDao.java
package org.sky.retail.xxljob.dao; import java.util.List; import org.sky.platform.retail.exception.RunTimeException; import org.sky.retail.xxljob.vo.OrderVO; public interface OrderDao { public List<OrderVO> getOrdersByLimit(int limits) throws RunTimeException; public int updateOrderRpt(List<OrderVO> orderList) throws RunTimeException; public int isOrderExisted(String orderId) throws RunTimeException; }
OrderDaoImpl.java
package org.sky.retail.xxljob.dao; import java.math.BigDecimal; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import javax.annotation.Resource; import org.sky.platform.retail.exception.RunTimeException; import org.sky.retail.xxljob.vo.OrderVO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.RowMapper; import org.springframework.stereotype.Component; @Component public class OrderDaoImpl implements OrderDao { protected Logger logger = LoggerFactory.getLogger(this.getClass()); @Resource private JdbcTemplate jdbcTemplate; @Override public int isOrderExisted(String orderId) throws RunTimeException { int result = 0; String checkOrderExistSql = "select count(*) from s_order_rpt where order_id=?"; try { result = jdbcTemplate.queryForObject(checkOrderExistSql, Integer.class, orderId); return result; } catch (Exception e) { logger.error("isOrderExisted error: " + e.getMessage(), e); throw new RunTimeException("isOrderExisted error: " + e.getMessage(), e); } } @Override public List<OrderVO> getOrdersByLimit(int limits) throws RunTimeException { List<OrderVO> orderList = new ArrayList<OrderVO>(); StringBuffer sql = new StringBuffer(); sql.append( " select o.order_id,i.goods_id,o.handle_flag,i.sales_price,i.goods_amount from s_order o, s_order_item i"); sql.append(" where o.handle_flag=0"); sql.append(" and o.order_id=i.order_id limit ?"); try { orderList = jdbcTemplate.query(sql.toString(), new Object[] { new Integer(limits) }, new RowMapper<OrderVO>() { @Override public OrderVO mapRow(ResultSet rs, int rowNum) throws SQLException { OrderVO order = new OrderVO(); order.setOrderId(rs.getString("order_id")); order.setGoodsAmount(rs.getInt("goods_amount")); order.setSalesPrice(rs.getBigDecimal("sales_price")); order.setHandleFlag(rs.getInt("handle_flag")); order.setGoodsId(rs.getInt("goods_id")); return order; } }); orderList.forEach((item) -> { logger.info(">>>>>>order_id->" + item.getOrderId() + " goods_amount->" + item.getGoodsAmount() + " sales_price->" + item.getSalesPrice() + " handle_flag->" + item.getHandleFlag()); }); } catch (Exception e) { logger.error("getOrdersByLimit[" + limits + "] error: " + e.getMessage(), e); throw new RunTimeException("getOrdersByLimit[" + limits + "] error: " + e.getMessage(), e); } return orderList; } public int addOrderRpt(OrderVO order) throws RunTimeException { int result = 0; String addOrderRptSql = ""; addOrderRptSql = "insert into s_order_rpt(order_id,goods_id,goods_amount,sales_price,total_price)values(?,?,?,?,?)"; try { BigDecimal goodsAmount = BigDecimal.valueOf(order.getGoodsAmount()); BigDecimal salesPrice = order.getSalesPrice(); BigDecimal totalPrice = salesPrice.multiply(goodsAmount); String orderId = order.getOrderId(); int goodsId = order.getGoodsId(); result = isOrderExisted(orderId); if (result < 1) { logger.info(">>>>>>orderId->" + orderId + " goodsAmount->" + goodsAmount + " and salesPrice->" + salesPrice + " and totalPrice->" + totalPrice); // result = 1; // totalPrice=salesPrice*goodsAmount; jdbcTemplate.update(addOrderRptSql.toString(), orderId, goodsId, goodsAmount, salesPrice, totalPrice); } else { logger.info(">>>>>>orderId->" + orderId + " existed, skipped"); } result = 1; } catch (Exception e) { logger.error("add s_order_rpt error: " + e.getMessage(), e); throw new RunTimeException("add s_order_rpt error: " + e.getMessage(), e); } return result; } public int updateOrderRpt(List<OrderVO> orderList) throws RunTimeException { int result = 0; String updatedOrderSql = "update s_order set handle_flag=? where order_id=?"; try { for (Iterator it = orderList.iterator(); it.hasNext();) { OrderVO order = (OrderVO) it.next(); int insertRecord = addOrderRpt(order); if (insertRecord > 0) { jdbcTemplate.update(updatedOrderSql, "1", order.getOrderId()); result = 1; } } return result; } catch (Exception e) { logger.error("updateTotalPrice error: " + e.getMessage(), e); throw new RunTimeException("updateTotalPrice error: " + e.getMessage(), e); } } }
OrderService.java
package org.sky.retail.xxljob.service; import java.util.List; import org.sky.platform.retail.exception.RunTimeException; import org.sky.retail.xxljob.vo.OrderVO; public interface OrderService { public List<OrderVO> getOrdersByLimit(int limits) throws RunTimeException; public int updateOrderRpt() throws RunTimeException; }
OrderServiceImpl.java
package org.sky.retail.xxljob.service; import java.util.ArrayList; import java.util.List; import javax.annotation.Resource; import org.sky.platform.retail.exception.RunTimeException; import org.sky.retail.xxljob.dao.OrderDao; import org.sky.retail.xxljob.vo.OrderVO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; @Service public class OrderServiceImpl implements OrderService { protected Logger logger = LoggerFactory.getLogger(this.getClass()); @Resource private OrderDao orderDao; @Override @Transactional(propagation = Propagation.REQUIRED) public List<OrderVO> getOrdersByLimit(int limits) throws RunTimeException { List<OrderVO> orderList = new ArrayList<OrderVO>(); try { orderList = orderDao.getOrdersByLimit(10); } catch (Exception e) { logger.error(e.getMessage(), e); throw new RunTimeException(e.getMessage(), e); } return orderList; } @Override @Transactional(propagation = Propagation.REQUIRED) public int updateOrderRpt() throws RunTimeException { int result = 0; try { List<OrderVO> rawOrderList = getOrdersByLimit(20); result = orderDao.updateOrderRpt(rawOrderList); logger.info(">>>>>>updateOrderRpt result->" + result); } catch (Exception e) { logger.error(e.getMessage(), e); throw new RunTimeException(e.getMessage(), e); } return result; } }
OrderVO.java
package org.sky.retail.xxljob.vo; import java.io.Serializable; import java.math.BigDecimal; public class OrderVO implements Serializable { private static final long serialVersionUID = 1L; private String orderId = ""; private int goodsAmount = 0; private BigDecimal salesPrice = new BigDecimal(0.0); private int handleFlag = 0; private int goodsId = 0; private BigDecimal totalPrice = new BigDecimal(0.0); public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId = orderId; } public int getGoodsAmount() { return goodsAmount; } public void setGoodsAmount(int goodsAmount) { this.goodsAmount = goodsAmount; } public BigDecimal getSalesPrice() { return salesPrice; } public void setSalesPrice(BigDecimal salesPrice) { this.salesPrice = salesPrice; } public int getHandleFlag() { return handleFlag; } public void setHandleFlag(int handleFlag) { this.handleFlag = handleFlag; } public int getGoodsId() { return goodsId; } public void setGoodsId(int goodsId) { this.goodsId = goodsId; } public BigDecimal getTotalPrice() { return totalPrice; } public void setTotalPrice(BigDecimal totalPrice) { this.totalPrice = totalPrice; } }
dbJob在xxl-job-admin中的配置
啓動後,咱們使用下面的sql能夠看到job已經開始運行
select count(*) from s_order_rpt;
啓動成兩個job,而後,咱們把第1個job殺掉
咱們發覺整個數據庫繼續在進數據且xxl-job-admin控制檯無任何報錯。
可是收到了報警郵件,說明全流程正確也說明了:9998被殺後任務已經飄到了9999上。
這說明另外一個job已經接手了前一個job的工做。
而後咱們把第1個job恢復,而後殺第2個job
咱們發覺數據庫繼續進數據進的很後,同時xxl-job-admin控制端也無出錯記錄
而後咱們把第1個job保持「單實例」運行,直到結束。
最後整個s_order_rpt中的記錄數爲:2108
而s_order表handle_flag爲1的記錄從0變成了:2108
結論
只要你啓動n個job,這n個job作同一件事同時這n個是job被分配成了「故障轉移」,不管你是:
- 還未運行時,先殺一個進程
- 仍是運行到一半時,把正在運行的進程殺掉(前面先殺1,讓2運行,而後2在運行時恢復1再殺2讓1繼續運行直到結束足以證實)
都不影響這種「故障轉移」類的job。
附:建表語句:
s_order表
CREATE TABLE `s_order` ( `order_id` varchar(32) NOT NULL DEFAULT '', `price_amount` decimal(11,2) DEFAULT NULL, `channel` tinyint(4) DEFAULT NULL, `order_type` int(3) DEFAULT NULL, `member_id` varchar(32) DEFAULT NULL, `order_status` tinyint(4) DEFAULT NULL, `handle_flag` tinyint(1) DEFAULT '0', PRIMARY KEY (`order_id`), KEY `idx_order_handle_flag` (`handle_flag`) ) ENGINE=InnoDB;
s_order_item表
CREATE TABLE `s_order_item` ( `item_id` int(32) unsigned NOT NULL AUTO_INCREMENT, `store_id` int(8) DEFAULT NULL, `trans_id` varchar(32) DEFAULT NULL, `pos_id` int(16) DEFAULT NULL, `order_id` varchar(32) DEFAULT NULL, `goods_id` int(16) DEFAULT NULL, `goods_amount` int(11) DEFAULT NULL, `sales_price` decimal(11,2) DEFAULT NULL, PRIMARY KEY (`item_id`) ) ENGINE=InnoDB;
s_order_rpt表
CREATE TABLE `s_order_rpt` ( `order_id` varchar(32) NOT NULL, `goods_id` int(16) DEFAULT NULL, `goods_amount` int(11) DEFAULT NULL, `sales_price` decimal(11,2) DEFAULT NULL, `total_price` decimal(11,2) DEFAULT NULL, PRIMARY KEY (`order_id`) ) ENGINE=InnoDB;
xxljob強大的高可用網格計算功能全流程
在有了前三篇的基礎上,咱們將會實現一個全真的基於網格計算的跑批實例。
- 建立1個100萬條記錄的csv文件;
- 建立2個executor運行在不一樣的實例中(爲演示我運行在本機,可是運行在2個不一樣的端口上,它是真正的獨立的2個java進程);
- 2個executor裏用一套xxljob自帶的sharding機制,自動把100萬條記錄按照executor數量的實際取模後sharding到不一樣的executor中解析、進db(如何sharing是xxljob自帶的強大功能);
- 使用2個xxl-admin-job運行在不一樣的實例中(爲演示我運行在本機,可是運行在2個不一樣的端口上,它是真正的獨立的2個java進程)來做成ha
付諸實施
建立了100萬條記錄
我本身寫了一個生成100萬條記錄的csv生成器,它的內容以下:
0,FTWQHI,1 1,sZaKaD,5 2,7ONcDE,2 3,hqSRSH,3 4,g1FeyD,2 5,Pajn15,5
它有3個字段,分別對應着:goods_id,goods_name,stock,一共有100萬條記錄
這塊代碼就做爲各位本身作練習了,沒什麼難的。
建立xxljob
ImportProduct.java
package org.sky.retail.xxljob.executor; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.InputStreamReader; import java.io.File; import javax.annotation.Resource; import org.sky.platform.retail.util.StringUtility; import org.sky.retail.xxljob.service.ProductService; import org.sky.retail.xxljob.vo.ProductVO; import org.springframework.stereotype.Component; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.annotation.XxlJob; import com.xxl.job.core.log.XxlJobLogger; import com.xxl.job.core.util.ShardingUtil; @Component public class ImportProduct { @Resource private ProductService productService; @XxlJob("importProduct") public ReturnT<String> shardingJobHandler(String param) throws Exception { //多少條記錄cpu一休息,每次休息250ms int rowLimit=1000; // 分片參數 ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo(); XxlJobLogger.log("分片參數:當前分片序號 = {}, 總分片數 = {}", shardingVO.getIndex(), shardingVO.getTotal()); // String filePath = "/Users/apple/data/demo_product.csv"; BufferedReader file = null; if (param == null || param.trim().length() < 1) { XxlJobLogger.log("必須輸入合法的須要導入的csv文件路徑"); return ReturnT.FAIL; } else { File f = new File(param); if (!f.exists()) { XxlJobLogger.log("必須輸入合法的須要導入的csv文件路徑"); return ReturnT.FAIL; } } try { String filePath = param; file = new BufferedReader(new InputStreamReader(new FileInputStream(filePath), "UTF-8")); String record; int rowNum = 0; int productNum = 0; while ((record = file.readLine()) != null) { boolean check = true; String goodsName = ""; int stock = 0; String fields[] = record.split(","); if (fields == null || fields.length < 3) { XxlJobLogger.log("row[" + rowNum + "] has error->skipped"); check = false; } else { if (StringUtility.isInteger(fields[0])) { productNum = Integer.parseInt(fields[0]); } else { XxlJobLogger.log("row[" + rowNum + "] and col[0] expected numeric but it is [" + fields[0] + "]->skipped"); check = false; } } goodsName = fields[1]; if (StringUtility.isInteger(fields[2])) { stock = Integer.valueOf(fields[2]); } else { XxlJobLogger.log( "row[" + rowNum + "] and col[2] expected numeric but it is [" + fields[2] + "]->skipped"); check = false; } if (check) { if (productNum % shardingVO.getTotal() == shardingVO.getIndex()) { // XxlJobLogger.log("第 {} 片, 命中分片開始處理第 {} 條記錄", shardingVO.getIndex(), // (productNum + 1)); ProductVO product = new ProductVO(); product.setGoodsId(productNum); product.setGoodsName(goodsName); product.setStock(stock); try { productService.addProduct(product); } catch (Exception e) { XxlJobLogger.log("第 {} 片, 第 {} 條記錄處理出錯,出錯緣由{}", shardingVO.getIndex(), (productNum + 1), e.getMessage(), e); } if (rowNum % rowLimit == 0) { XxlJobLogger.log("第 {} 片, 處理到 goods_id -> {}", shardingVO.getIndex(), productNum); rowNum = 0; Thread.sleep(250); } rowNum++; } } } XxlJobLogger.log("第 {} 片, 第 {} 條記錄處理完畢", shardingVO.getIndex(), (productNum + 1)); } catch (Exception e) { XxlJobLogger.log("error: " + e.getMessage(), e); return ReturnT.FAIL; } finally { try { if (file != null) { file.close(); file = null; } } catch (Exception e) { } } return ReturnT.SUCCESS; } }
核心代碼解說
這裏面最最好玩的部分在於這邊
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo(); XxlJobLogger.log("分片參數:當前分片序號 = {}, 總分片數 = {}", shardingVO.getIndex(), shardingVO.getTotal()); if (productNum % shardingVO.getTotal() == shardingVO.getIndex()) { //do import job }
這個ShardingUtil正是xxljob自帶的,它的做用是這樣的:
只要你在xxl-job-admin即調度器中,把任務中的【路由策略】設成了」分片廣播「,你在相應的executor裏就會獲得shardingVO.getTotal和shardingVO.getIndex這2個參數。
在xxl-job-admin中咱們是設置成了2個executor,所以:
- shardingVO.getTotal會獲得2;
- 在executor1中shardingVO.getIndex你會獲得0;
- 在executor2中shardingVO.getIndex你會獲得1;
後面的,若是你真的是程序員,那麼求個模就能對數據進行分片了。而後剩下的就簡單的,插數據庫呀!
ProductDao.java
package org.sky.retail.xxljob.dao; import org.sky.platform.retail.exception.RunTimeException; import org.sky.retail.xxljob.vo.ProductVO; public interface ProductDao { public void addProduct(ProductVO product) throws RunTimeException; }
ProductDaoImpl.java
package org.sky.retail.xxljob.dao; import javax.annotation.Resource; import org.sky.platform.retail.exception.RunTimeException; import org.sky.retail.xxljob.vo.ProductVO; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; @Component public class ProductDaoImpl implements ProductDao { @Resource private JdbcTemplate jdbcTemplate; @Override public void addProduct(ProductVO product) throws RunTimeException { String productSql = "insert into my_goods(goods_id,goods_name,stock)values(?,?,?)"; try { jdbcTemplate.update(productSql, product.getGoodsId(), product.getGoodsName(), product.getStock()); } catch (Exception e) { throw new RunTimeException( "error occured on insert into my_goods with: goods_id->" + product.getGoodsId() + " goods_name->" + product.getGoodsName() + " error: " + " stock->" + product.getStock() + e.getMessage(), e); } } }
啓動兩executor
第一步:把sky-pos-xxljob中的application.yml保持成如下形式,並啓動
第二:把sky-pos-xxljob中的application.yml改爲如下形式,並啓動
第三步:把xxl-job-admin作成cluster
xxl-job-admin的集羣
- xxl-job-admin能夠啓動多個,可是它們只能以ha形式來運行;
- 多個admin能夠端口號不一樣,可是它們必須指向同一個數據庫且必須使用數據庫才能使用ha集羣;
- xxl-job-admin的2.0.x版本使用的是落後的quartz,它的技術停留在3-4年前。而如今最新版的xxl-job-admin是自本身用netty寫的計時器,它在效率、可維護性、穩定性上目前在業界排在(非大數據定時器)首位
第一步:保持xxl-job-admin中的application.properties爲以下形式並啓動
第二步:把xxl-job-admin中的application.properties改成以下形式並啓動
至此,咱們一共啓動了2個executor2個admin(調度端),下面咱們就須要配至admin的ha了。
xxl-job-admin的HA集羣
其實,咱們在2個executor中已經使用了經過ng作代理的xxl-job-admin的負載均衡的地址形式了
修改nginix
注:
nginix怎麼裝不屬於本教程範圍,這個不會不該該!
咱們在nginix.conf文件所在目錄建立一個vhost文件夾
在vhost目錄內放置一個xxljob_admin.conf文件
內容以下:
upstream xxljob_admin{ server localhost:9090 weight=1; server localhost:9091 weight=1; } server { listen 80; server_name localhost; access_log /usr/local/var/log/nginx/xxljob_admin.access.log; error_log /usr/local/var/log/nginx/xxljob_admin.error.log; #root html; #index index.html index.htm index.jsp index.php; location / { proxy_http_version 1.1; proxy_set_header Connection ""; proxy_pass http://xxljob_admin; #Proxy Settings proxy_redirect off; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_next_upstream error timeout invalid_header http_500 http_502 http_503 http_504; proxy_max_temp_file_size 0; proxy_connect_timeout 90; proxy_send_timeout 90; proxy_read_timeout 90; proxy_buffer_size 4k; proxy_buffers 4 32k; proxy_busy_buffers_size 64k; proxy_temp_file_write_size 64k; } }
注意:
必定要有如下2行
proxy_http_version 1.1; proxy_set_header Connection "";
不然你在傳發tomcat的json api請求時,調用端會收到http請求error
配完後咱們就能夠直接這樣啓動了:
瀏覽器中輸入:http://localhost/xxl-job-admin/
測試
在測試開始前咱們製做了一個監控mysql的qps, tps以及其它相關關鍵指標的小腳本,叫mysql_performance.sh,一塊兒糞上!
#!/bin/bash mysqladmin -uroot -p'111111' extended-status -i1|awk 'BEGIN{local_switch=0;print "QPS Commit Rollback TPS Threads_con Threads_run \n------------------------------------------------------- "} $2 ~ /Queries$/ {q=$4-lq;lq=$4;} $2 ~ /Com_commit$/ {c=$4-lc;lc=$4;} $2 ~ /Com_rollback$/ {r=$4-lr;lr=$4;} $2 ~ /Threads_connected$/ {tc=$4;} $2 ~ /Threads_running$/ {tr=$4; if(local_switch==0) {local_switch=1; count=0} else { if(count>10) {count=0;print "------------------------------------------------------- \nQPS Commit Rollback TPS Threads_con Threads_run \n------------------------------------------------------- ";} else{ count+=1; printf "%-6d %-8d %-7d %-8d %-10d %d \n", q,c,r,c+r,tc,tr; } } }'
運行的話直接./mysql_performance.sh,而後你會看到實時刷新的一個相似70年代ibm在各機場裏作的「航班表」大黑屏(是否是有點matrix的感受,^_^)
配置告警用郵箱
咱們使用的是apache的james來配置的發告警郵件用郵箱,沒有啓用tls。
注:apache james如何配置不屬於本教程範圍,這塊建議自行補足,這個很簡單,你應該會配。
在xxl-job-admin中配置任務
有多少個executor作「集羣運行」是在這邊配置的
創測試用數據庫表my_goods
CREATE TABLE `my_goods` ( `goods_id` int(16) NOT NULL, `goods_name` varchar(100) DEFAULT NULL, `stock` int(11) DEFAULT NULL, PRIMARY KEY (`goods_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
運行
點擊完【執行一次】後,在彈出框的任務參數中請填入相應的咱們的含有100萬條記錄的csv所在的全路徑。
進入調度日誌中,咱們看到任務已經開始執行
咱們在mysql_performance中也已經看到數據庫開始起量了。
目前我由於心疼個人mac,所以我只讓cpu的運行效率在65%左右,所以我在個人ImportJob中使用了以下技巧:
if (rowNum % rowLimit == 0) { XxlJobLogger.log("第 {} 片, 處理到 goods_id -> {}", shardingVO.getIndex(), productNum); rowNum = 0; Thread.sleep(250); }
實際生產上這幾句能夠徹底去掉,我在我本機上當這幾句代碼被註釋掉時,個人小破筆記本上的平均指標:qps能夠到達12,000,而tps到達了2300.
咱們能夠看到,這個任務一運行就產生了2條調度日誌,咱們分別來查看這2個日誌。
executor0的日誌:
它執行的都是「單數」的goos_id的入庫。
executor1的日誌:
它執行的都是「偶數」的goos_id的入庫。
殺掉一個xxl-job-admin進程
我call,殺了!!!
- 觀察2個xxl-job的executor端,一點無出錯
- 觀完mysql_performance的監控端,qps, tps一點不影響
- 觀察http://localhost/xxl-job/admin中的調度日誌
跑的還很歡,我call。。。這都跑完了。
觀察mysql端
隨便抽3條記錄:
- 若是去除了sleep,在我本機運行實際用時在6分鐘。
- 而使用單executor運行實際耗時17分鐘。
完美,打完收工!