1. 爲何不用quartzjava
經過定時任務來進行計算,若是數量很少,能夠輕易的用quartz來完成,若是用戶量特別大,可能短期內處理不完須要處理的數據。另外若是咱們將job直接放在咱們的webapp裏,webapp一般是多節點部署的,這樣,項目須要每隔一段時間執行某個定時任務,可是因爲同時部署在多臺機器上,所以可能會出現任務被執行屢次,形成重複數據的狀況,咱們的job也就是多節點,形成了多個job同時執行,致使job重複執行,爲了不這種狀況,咱們可能多job的節點進行加鎖,保證只有一個節點能執行,或者將job從webapp裏剝離出來,獨自部署一個節點。Elastic job是噹噹網架構師張亮,曹昊和江樹建基於Zookepper、Quartz開發並開源的一個Java分佈式定時任務,解決了Quartz不支持分佈式的弊端。Elastic job主要的功能有支持彈性擴容,經過Zookepper集中管理和監控job,支持失效轉移等,這些都是Quartz等其餘定時任務沒法比擬的。python
2. 原理web
elastic底層的任務調度仍是使用quartz,經過zookeeper來動態給job節點分片,使用elastic-job開發的做業都是客戶的,假如咱們須要使用3臺機器跑job,咱們將任務分紅3片,框架經過zk的協調,最終會讓3臺機器分別分配0,1,2的任務片,好比server0-->0,server1-->1,server2-->2,當server0執行時,能夠只查詢id%3==0的用戶,server1執行時,只查詢id%3==1的用戶,server2執行時,只查詢id%3==2的用戶。當分片數爲1時,在同一個zookepper和jobname狀況下,多臺機器部署了Elastic job時,只有拿到shardingContext.getShardingItem()爲0的機器得以執行,其餘的機器不執行當分片數大於1時,假若有3臺服務器,分紅10片,則分片項分配結果爲服務器A=0,1,2;服務器B=3,4,5;服務器C=6,7,8,9。此時每臺服務器可根據拿到的shardingItem值進行相應的處理,spring
舉例場景:shell
假如job處理數據庫中的數據業務,方法爲:A服務器處理數據庫中Id以0,1,2結尾的數據,B處理數據庫中Id以3,4,5結尾的數據,C處理器處理6,7,8,9結尾的數據,合計處理0-9爲所有數據數據庫
若是服務器C崩潰,Elastic Job自動進行進行失效轉移,將C服務器的分片轉移到A和B服務器上,則分片項分配結果爲服務器A=0,1,2,3,4;服務器B=5,6,7,8,9服務器
此時,A服務器處理數據庫中Id以0,1,2,3,4結尾的數據,B處理數據庫中Id以5,6,7,8,9結尾的數據,合計處理0-9爲所有數據.架構
在上述基礎上,若是咱們增長server3,此時,server3分不到任務分片,由於任務分片只有3片,已經分完了,沒有分到任務分片的程序不執行。若是server2掛了,那麼server2的任務分片會分給server3,server3有了分片後就會執行。若是server3也掛了,框架會自動將server3的分片隨機分給server0或server1,這種特性稱之爲彈性擴容,也就是elastic-job的由來。app
elastic-job不支持單機多實例,經過zk的協調分片是以ip爲單元的,若是經過單機多實例來試驗,結果會致使分片和預期不一致,能夠經過虛擬機模擬多臺機器。框架
3. 做業類型
elastic-job 提供了三種類型的做業:simple,dataflow,script。script類型做業爲腳本類型做業,支持shell,python等類型腳本。simple類型須要實現SimpleJob接口,未通過任何封裝,和quartz原生接口類似。dataflow類型用於處理數據流,須要實現DafaflowJob接口,該接口提供了兩個方法能夠覆蓋,分別用於抓取fetchData和處理processData數據。
4. 代碼演示
1. 依賴
<dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-spring</artifactId> <version>2.1.5</version> </dependency>
2.編寫job
public class OrderStatisticsJob implements SimpleJob { private static final Logger log = LoggerFactory.getLogger(OrderStatisticsJob.class); OrdersService ordersSerivce = null; /** 讀取配置(配置文件之後上分佈式配置動態維護) **/ private void readConfig() { ordersSerivce = (OrdersService) ApplicationHelp.getBean("ordersService"); } synchronized public void start(int sharding) { } @Override public void execute(ShardingContext shardingContext) { // TODO Auto-generated method stub log.info("shardingContext:{}", shardingContext.getShardingItem()); readConfig(); start(1); } }
public class MyDataFlowJob implements DataflowJob<User> { @Override public List<User> fetchData(ShardingContext shardingContext) { List<User> users = null;//查詢users from db return users; } @Override public void processData(ShardingContext shardingContext, List<User> data) { for (User user: data) { user.setStatus(1); //update user } } }
3. Spring配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:reg="http://www.dangdang.com/schema/ddframe/reg" xmlns:job="http://www.dangdang.com/schema/ddframe/job" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.dangdang.com/schema/ddframe/reg http://www.dangdang.com/schema/ddframe/reg/reg.xsd http://www.dangdang.com/schema/ddframe/job http://www.dangdang.com/schema/ddframe/job/job.xsd"> <!--配置做業註冊中心 --> <reg:zookeeper id="regCenter" server-lists="localhost:2181" namespace="dd-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" /> <!-- 配置做業--> <job:simple id="orderStatisticsJob" class="com.beta.cb.mall.task.job.OrderStatisticsJob" registry-center-ref="regCenter" sharding-total-count="2" cron="0/2 * * * * ?" /> <job:dataflow id="myDataFlowJob" class="com.fanfan.sample001.MyDataFlowJob" registry-center-ref="regCenter" sharding-total-count="2" cron="0 0 02 * * ?" streaming-process="true" overwrite="true" /> </beans>