Elastic-Job開發指南

開發指南

代碼開發

做業類型

目前提供3種做業類型,分別是Simple,DataFlow和Script。html

DataFlow類型用於處理數據流,它又提供2種做業類型,分別是ThroughputDataFlow和SequenceDataFlow。須要繼承相應的抽象類。git

Script類型用於處理腳本,可直接使用,無需編碼。github

方法參數shardingContext包含做業配置,分片和運行時信息。可經過getShardingTotalCount(),getShardingItems()等方法分別獲取分片總數,運行在本做業服務器的分片序列號集合等。web

Simple類型做業

Simple類型做業意爲簡單實現,未經任何封裝的類型。須要繼承AbstractSimpleElasticJob,該類只提供了一個方法用於覆蓋,此方法將被定時執行。用於執行普通的定時任務,與Quartz原生接口類似,只是增長了彈性擴縮容和分片等功能。spring

 

public class JobMain {  
   
    public static void main(final String[] args) {  
        long startTimeoutMills = 5000L;  
        long completeTimeoutMills = 10000L;  
        new JobScheduler(regCenter, jobConfig, new MyDistributeOnceElasticJobListener(startTimeoutMills, completeTimeoutMills)).init();  
    }  
}

ThroughputDataFlow類型做業

ThroughputDataFlow類型做業意爲高吞吐的數據流做業。須要繼承AbstractInpidualThroughputDataFlowElasticJob並能夠指定返回值泛型,該類提供3個方法可覆蓋,分別用於抓取數據,處理數據和指定是否流式處理數據。能夠獲取數據處理成功失敗次數等輔助監控信息。若是流式處理數據,fetchData方法的返回值只有爲null或長度爲空時,做業纔會中止執行,不然做業會一直運行下去;非流式處理數據則只會在每次做業執行過程當中執行一次fetchData方法和processData方法,即完成本次做業。流式數據處理參照TbSchedule設計,適用於不間歇的數據處理。shell

做業執行時會將fetchData的數據傳遞給processData處理,其中processData獲得的數據是經過多線程(線程池大小可配)拆分的。若是採用流式做業處理方式,建議processData處理數據後更新其狀態,避免fetchData再次抓取到,從而使得做業永遠不會中止。processData的返回值用於表示數據是否處理成功,拋出異常或者返回false將會在統計信息中納入失敗次數,返回true則納入成功次數。數據庫

public class MyElasticJob extends AbstractInpidualThroughputDataFlowElasticJob {  
   
    @Override 
    public List fetchData(JobExecutionMultipleShardingContext context) {  
        Map<integer, string=""> offset = context.getOffsets();  
        List result = // get data from database by sharding items and by offset  
        return result;  
    }  
   
    @Override 
    public boolean processData(JobExecutionMultipleShardingContext context, Foo data) {  
        // process data  
        // ...  
   
        // store offset  
        for (int each : context.getShardingItems()) {  
            updateOffset(each, "your offset, maybe id");  
        }  
        return true;  
    }  
}

SequenceDataFlow類型做業

SequenceDataFlow類型做業和ThroughputDataFlow做業類型極爲類似,所不一樣的是ThroughputDataFlow做業類型能夠將獲取到的數據多線程處理,但不會保證多線程處理數據的順序。如:從2個分片共獲取到100條數據,第1個分片40條,第2個分片60條,配置爲兩個線程處理,則第1個線程處理前50條數據,第2個線程處理後50條數據,無視分片項;SequenceDataFlow類型做業則根據當前服務器所分配的分片項數量進行多線程處理,每一個分片項使用同一線程處理,防止了同一分片的數據被多線程處理,從而致使的順序問題。如:從2個分片共獲取到100條數據,第1個分片40條,第2個分片60條,則系統自動分配兩個線程處理,第1個線程處理第1個分片的40條數據,第2個線程處理第2個分片的60條數據。因爲ThroughputDataFlow做業可使用多於分片項的任意線程數處理,因此性能調優的可能會優於SequenceDataFlow做業。api

public class MyElasticJob extends AbstractInpidualSequenceDataFlowElasticJob {  
   
    @Override 
    public List fetchData(JobExecutionSingleShardingContext context) {  
        int offset = context.getOffset();  
        List result = // get data from database by sharding items and by offset  
        return result;  
    }  
   
    @Override 
    public boolean processData(JobExecutionSingleShardingContext context, Foo data) {  
        // process data  
        // ...  
   
        // store offset  
        updateOffset(context.getShardingItem(), "your offset, maybe id");  
        return true;  
    }  
}

Script類型做業

Script類型做業意爲腳本類型做業,支持shell,Python,perl等全部類型腳本。只需經過控制檯/代碼配置scriptCommandLine便可。執行腳本路徑能夠包含參數,最後一個參數爲做業運行時信息.bash

#!/bin/bash  
echo sharding execution context is $*

做業運行時輸出服務器

sharding execution context is {"shardingItems":[0,1,2,3,4,5,6,7,8,9],"shardingItemParameters":{},"offsets":{},"jobName":"scriptElasticDemoJob","shardingTotalCount":10,"jobParameter":"","monitorExecution":true,"fetchDataCount":1}

批量處理

爲了提升數據處理效率,數據流類型做業提供了批量處理數據的功能。以前逐條處理數據的兩個抽象類分別是AbstractInpidualThroughputDataFlowElasticJob和AbstractInpidualSequenceDataFlowElasticJob,批量處理則使用另外兩個接口AbstractBatchThroughputDataFlowElasticJob和AbstractBatchSequenceDataFlowElasticJob。不一樣之處在於processData方法的返回值從boolean類型變爲int類型,用於表示一批數據處理的成功數量,第二個入參則轉變爲List數據集合。

異常處理

elastic-job在最上層接口提供了handleJobExecutionException方法,使用做業時能夠覆蓋此方法,並使用quartz提供的JobExecutionException控制異常後做業的聲明週期。默認實現是直接將異常拋出。示例:

任務監聽配置

能夠經過配置多個任務監聽器,在任務執行前和執行後執行監聽的方法。監聽器分爲每臺做業節點均執行和分佈式場景中僅單一節點執行兩種。

每臺做業節點均執行的監聽

若做業處理做業服務器的文件,處理完成後刪除文件,可考慮使用每一個節點均執行清理任務。此類型任務實現簡單,且無需考慮全局分佈式任務是否完成,請儘可能使用此類型監聽器。

步驟:

定義監聽器

import com.dangdang.ddframe.job.api.JobExecutionMultipleShardingContext;  
import com.dangdang.ddframe.job.api.listener.ElasticJobListener;  
   
public class MyElasticJobListener implements ElasticJobListener {  
   
    @Override 
    public void beforeJobExecuted(final JobExecutionMultipleShardingContext shardingContext) {  
        // do something ...  
    }  
   
    @Override 
    public void afterJobExecuted(final JobExecutionMultipleShardingContext shardingContext) {  
        // do something ...  
    }  
}

將監聽器做爲參數傳入JobScheduler

public class JobMain {  
   
    public static void main(final String[] args) {  
        new JobScheduler(regCenter, jobConfig, new MyElasticJobListener()).init();  
    }  
} 

分佈式場景中僅單一節點執行的監聽

若做業處理數據庫數據,處理完成後只需一個節點完成數據清理任務便可。此類型任務處理複雜,需同步分佈式環境下做業的狀態同步,提供了超時設置來避免做業不一樣步致使的死鎖,請謹慎使用。

步驟:

定義監聽器

import com.dangdang.ddframe.job.api.JobExecutionMultipleShardingContext;  
import com.dangdang.ddframe.job.api.listener.AbstractDistributeOnceElasticJobListener;  
   
public final class TestDistributeOnceElasticJobListener extends AbstractDistributeOnceElasticJobListener {  
   
    public TestDistributeOnceElasticJobListener(final long startTimeoutMills, final long completeTimeoutMills) {  
        super(startTimeoutMills, completeTimeoutMills);  
    }  
   
    @Override 
    public void doBeforeJobExecutedAtLastStarted(final JobExecutionMultipleShardingContext shardingContext) {  
        // do something ...  
    }  
   
    @Override 
    public void doAfterJobExecutedAtLastCompleted(final JobExecutionMultipleShardingContext shardingContext) {  
        // do something ...  
    }  
}

將監聽器做爲參數傳入JobScheduler

public class JobMain {  
   
    public static void main(final String[] args) {  
        long startTimeoutMills = 5000L;  
        long completeTimeoutMills = 10000L;  
        new JobScheduler(regCenter, jobConfig, new MyDistributeOnceElasticJobListener(startTimeoutMills, completeTimeoutMills)).init();  
    }  
}

做業配置

spring容器配合使用做業,能夠將做業Bean配置爲Spring Bean,可在做業中經過依賴注入使用Spring容器管理的數據源等對象。可用placeholder佔位符從屬性文件中取值。

Spring命名空間配置

job:simple命名空間屬性詳細說明

屬性名 類型 是否必填 缺省值 描述
id String   做業名稱
class String   做業實現類,需實現ElasticJob接口,腳本型做業不須要配置
registry-center-ref String   註冊中心Bean的引用,需引用reg:zookeeper的聲明
cron String   cron表達式,用於配置做業觸發時間
sharding-total-count int   做業分片總數
sharding-item-parameters String   分片序列號和參數用等號分隔,多個鍵值對用逗號分隔
分片序列號從0開始,不可大於或等於做業分片總數
如:
0=a,1=b,2=c
job-parameter String   做業自定義參數
能夠配置多個相同的做業,可是用不一樣的參數做爲不一樣的調度實例
monitor-execution boolean true 監控做業運行時狀態
每次做業執行時間和間隔時間均很是短的狀況,建議不監控做業運行時狀態以提高效率。由於是瞬時狀態,因此無必要監控。請用戶自行增長數據堆積監控。而且不能保證數據重複選取,應在做業中實現冪等性。
每次做業執行時間和間隔時間均較長的狀況,建議監控做業運行時狀態,可保證數據不會重複選取。
monitor-port int -1 做業監控端口
建議配置做業監控端口, 方便開發者dump做業信息。
使用方法: echo 「dump」 | nc 127.0.0.1 9888
max-time-diff-seconds int -1 最大容許的本機與註冊中心的時間偏差秒數
若是時間偏差超過配置秒數則做業啓動時將拋異常
配置爲-1表示不校驗時間偏差
failover boolean false 是否開啓失效轉移
僅monitorExecution開啓,失效轉移纔有效
misfire boolean true 是否開啓錯過任務從新執行
job-sharding-strategy-class String true 做業分片策略實現類全路徑
默認使用平均分配策略
詳情參見:做業分片策略
description String   做業描述信息
disabled boolean false 做業是否禁止啓動
可用於部署做業時,先禁止啓動,部署結束後統一啓動
overwrite boolean false 本地配置是否可覆蓋註冊中心配置
若是可覆蓋,每次啓動做業都以本地配置爲準

job:dataflow命名空間屬性詳細說明

job:dataflow命名空間擁有job:simple命名空間的所有屬性,如下僅列出特有屬性

屬性名 類型 是否必填 缺省值 描述
process-count-interval-seconds int 300 統計做業處理數據數量的間隔時間
單位:秒
concurrent-data-process-thread-count int CPU核數*2 同時處理數據的併發線程數
不能小於1
僅ThroughputDataFlow做業有效
fetch-data-count int 1 每次抓取的數據量
streaming-process boolean false 是否流式處理數據
若是流式處理數據, 則fetchData不返回空結果將持續執行做業
若是非流式處理數據, 則處理數據完成後做業結束

job:script命名空間屬性詳細說明,基本屬性參照job:simple命名空間屬性詳細說明

job:script命名空間擁有job:simple命名空間的所有屬性,如下僅列出特有屬性

屬性名 類型 是否必填 缺省值 描述
script-command-line String   腳本型做業執行命令行

job:listener命名空間屬性詳細說明

job:listener必須配置爲job:bean的子元素

屬性名 類型 是否必填 缺省值 描述
class String   前置後置任務監聽實現類,需實現ElasticJobListener接口
started-timeout-milliseconds long Long.MAX_VALUE AbstractDistributeOnceElasticJobListener型監聽器,最後一個做業執行前的執行方法的超時時間
單位:毫秒
completed-timeout-milliseconds long Long.MAX_VALUE AbstractDistributeOnceElasticJobListener型監聽器,最後一個做業執行後的執行方法的超時時間
單位:毫秒

reg:bean命名空間屬性詳細說明

屬性名 類型 是否必填 缺省值 描述
id String   註冊中心在Spring容器中的主鍵
server-lists String   鏈接Zookeeper服務器的列表
包括IP地址和端口號
多個地址用逗號分隔
如: host1:2181,host2:2181
namespace String   Zookeeper的命名空間
base-sleep-time-milliseconds int 1000 等待重試的間隔時間的初始值
單位:毫秒
max-sleep-time-milliseconds int 3000 等待重試的間隔時間的最大值
單位:毫秒
max-retries int 3 最大重試次數
session-timeout-milliseconds int 60000 會話超時時間
單位:毫秒
connection-timeout-milliseconds int 15000 鏈接超時時間
單位:毫秒
digest String 無驗證 鏈接Zookeeper的權限令牌
缺省爲不須要權限驗證

不使用Spring配置

若是不使用Spring框架,能夠用以下方式啓動做業。

import com.dangdang.ddframe.job.api.config.JobConfiguration;  
import com.dangdang.ddframe.job.api.JobScheduler;  
import com.dangdang.ddframe.reg.base.CoordinatorRegistryCenter;  
import com.dangdang.ddframe.reg.zookeeper.ZookeeperConfiguration;  
import com.dangdang.ddframe.reg.zookeeper.ZookeeperRegistryCenter;  
import com.dangdang.example.elasticjob.core.job.SimpleJobDemo;  
import com.dangdang.example.elasticjob.core.job.ThroughputDataFlowJobDemo;  
import com.dangdang.example.elasticjob.core.job.SequenceDataFlowJobDemo;  
import com.dangdang.ddframe.job.plugin.job.type.integrated.ScriptElasticJob;  
   
public class JobDemo {  
   
    // 定義Zookeeper註冊中心配置對象  
    private ZookeeperConfiguration zkConfig = new ZookeeperConfiguration("localhost:2181", "elastic-job-example", 1000, 3000, 3);  
   
    // 定義Zookeeper註冊中心  
    private CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zkConfig);  
   
    // 定義簡單做業配置對象  
    private final SimpleJobConfiguration simpleJobConfig = JobConfigurationFactory.createSimpleJobConfigurationBuilder("simpleElasticDemoJob",  
                    SimpleJobDemo.class, 10, "0/30 * * * * ?").build();  
   
    // 定義高吞吐流式處理的數據流做業配置對象  
    private final DataFlowJobConfiguration throughputJobConfig = JobConfigurationFactory.createDataFlowJobConfigurationBuilder("throughputDataFlowElasticDemoJob",  
                    ThroughputDataFlowJobDemo.class, 10, "0/5 * * * * ?").streamingProcess(true).build();  
   
    // 定義順序的數據流做業配置對象  
    private final DataFlowJobConfiguration sequenceJobConfig = JobConfigurationFactory.createDataFlowJobConfigurationBuilder("sequenceDataFlowElasticDemoJob",  
                    SequenceDataFlowJobDemo.class, 10, "0/5 * * * * ?").build();  
   
    // 定義腳本做業配置對象  
    private final ScriptJobConfiguration scriptJobConfig = JobConfigurationFactory.createScriptJobConfigurationBuilder("scriptElasticDemoJob",  
                    10, "0/5 * * * * ?", "test.sh").build();  
   
    public static void main(final String[] args) {  
        new JobDemo().init();  
    }  
   
    private void init() {  
        // 鏈接註冊中心  
        regCenter.init();  
        // 啓動簡單做業  
        new JobScheduler(regCenter, simpleJobConfig).init();  
        // 啓動高吞吐流式處理的數據流做業  
        new JobScheduler(regCenter, throughputJobConfig).init();  
        // 啓動順序的數據流做業  
        new JobScheduler(regCenter, sequenceJobConfig).init();  
        // 啓動腳本做業  
        new JobScheduler(regCenter, scriptJobConfig).init();  
    }  
}

 

轉載:https://www.2cto.com/kf/201611/566016.html

相關文章
相關標籤/搜索