elastic-job-lite

1. 爲何不用quartzjava

  經過定時任務來進行計算,若是數量很少,能夠輕易的用quartz來完成,若是用戶量特別大,可能短期內處理不完須要處理的數據。另外若是咱們將job直接放在咱們的webapp裏,webapp一般是多節點部署的,這樣,項目須要每隔一段時間執行某個定時任務,可是因爲同時部署在多臺機器上,所以可能會出現任務被執行屢次,形成重複數據的狀況,咱們的job也就是多節點,形成了多個job同時執行,致使job重複執行,爲了不這種狀況,咱們可能多job的節點進行加鎖,保證只有一個節點能執行,或者將job從webapp裏剝離出來,獨自部署一個節點。Elastic job是噹噹網架構師張亮,曹昊和江樹建基於Zookepper、Quartz開發並開源的一個Java分佈式定時任務,解決了Quartz不支持分佈式的弊端。Elastic job主要的功能有支持彈性擴容,經過Zookepper集中管理和監控job,支持失效轉移等,這些都是Quartz等其餘定時任務沒法比擬的。python

2. 原理web

  elastic底層的任務調度仍是使用quartz,經過zookeeper來動態給job節點分片,使用elastic-job開發的做業都是客戶的,假如咱們須要使用3臺機器跑job,咱們將任務分紅3片,框架經過zk的協調,最終會讓3臺機器分別分配0,1,2的任務片,好比server0-->0,server1-->1,server2-->2,當server0執行時,能夠只查詢id%3==0的用戶,server1執行時,只查詢id%3==1的用戶,server2執行時,只查詢id%3==2的用戶。當分片數爲1時,在同一個zookepper和jobname狀況下,多臺機器部署了Elastic job時,只有拿到shardingContext.getShardingItem()爲0的機器得以執行,其餘的機器不執行當分片數大於1時,假若有3臺服務器,分紅10片,則分片項分配結果爲服務器A=0,1,2;服務器B=3,4,5;服務器C=6,7,8,9。此時每臺服務器可根據拿到的shardingItem值進行相應的處理,spring

舉例場景:shell

假如job處理數據庫中的數據業務,方法爲:A服務器處理數據庫中Id以0,1,2結尾的數據,B處理數據庫中Id以3,4,5結尾的數據,C處理器處理6,7,8,9結尾的數據,合計處理0-9爲所有數據數據庫

若是服務器C崩潰,Elastic Job自動進行進行失效轉移,將C服務器的分片轉移到A和B服務器上,則分片項分配結果爲服務器A=0,1,2,3,4;服務器B=5,6,7,8,9服務器

此時,A服務器處理數據庫中Id以0,1,2,3,4結尾的數據,B處理數據庫中Id以5,6,7,8,9結尾的數據,合計處理0-9爲所有數據.架構

在上述基礎上,若是咱們增長server3,此時,server3分不到任務分片,由於任務分片只有3片,已經分完了,沒有分到任務分片的程序不執行。若是server2掛了,那麼server2的任務分片會分給server3,server3有了分片後就會執行。若是server3也掛了,框架會自動將server3的分片隨機分給server0或server1,這種特性稱之爲彈性擴容,也就是elastic-job的由來。app

  elastic-job不支持單機多實例,經過zk的協調分片是以ip爲單元的,若是經過單機多實例來試驗,結果會致使分片和預期不一致,能夠經過虛擬機模擬多臺機器。框架

3. 做業類型

  elastic-job 提供了三種類型的做業:simple,dataflow,script。script類型做業爲腳本類型做業,支持shell,python等類型腳本。simple類型須要實現SimpleJob接口,未通過任何封裝,和quartz原生接口類似。dataflow類型用於處理數據流,須要實現DafaflowJob接口,該接口提供了兩個方法能夠覆蓋,分別用於抓取fetchData和處理processData數據。

4. 代碼演示

1. 依賴

<dependency>
			<groupId>com.dangdang</groupId>
			<artifactId>elastic-job-lite-spring</artifactId>
			<version>2.1.5</version>
		</dependency>

 

2.編寫job

public class OrderStatisticsJob implements SimpleJob {

	private static final Logger log = LoggerFactory.getLogger(OrderStatisticsJob.class);

	OrdersService ordersSerivce = null;

	/** 讀取配置(配置文件之後上分佈式配置動態維護) **/
	private void readConfig() {
		ordersSerivce = (OrdersService) ApplicationHelp.getBean("ordersService");
	}

	synchronized public void start(int sharding) {

	}

	@Override
	public void execute(ShardingContext shardingContext) {
		// TODO Auto-generated method stub
		log.info("shardingContext:{}", shardingContext.getShardingItem());
		readConfig();
		start(1);
	}
}
public class MyDataFlowJob implements DataflowJob<User> { @Override public List<User> fetchData(ShardingContext shardingContext) { List<User> users = null;//查詢users from db
        return users; } @Override public void processData(ShardingContext shardingContext, List<User> data) { for (User user: data) { user.setStatus(1); //update user
 } } }

 

 

3. Spring配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:reg="http://www.dangdang.com/schema/ddframe/reg" xmlns:job="http://www.dangdang.com/schema/ddframe/job" xsi:schemaLocation="http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.dangdang.com/schema/ddframe/reg
                        http://www.dangdang.com/schema/ddframe/reg/reg.xsd
                        http://www.dangdang.com/schema/ddframe/job
                        http://www.dangdang.com/schema/ddframe/job/job.xsd">
    <!--配置做業註冊中心 -->
    <reg:zookeeper id="regCenter" server-lists="localhost:2181" namespace="dd-job"
                   base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" />
 
    <!-- 配置做業-->
    <job:simple id="orderStatisticsJob" class="com.beta.cb.mall.task.job.OrderStatisticsJob" registry-center-ref="regCenter" sharding-total-count="2" cron="0/2 * * * * ?" />
   <job:dataflow id="myDataFlowJob" class="com.fanfan.sample001.MyDataFlowJob" registry-center-ref="regCenter"              sharding-total-count="2" cron="0 0 02 * * ?" streaming-process="true" overwrite="true" />
</beans>
相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息