房價網是怎麼使用分佈式做業框架elastic-job

Elastic-Job是什麼?

Elastic-Job是一個分佈式調度解決方案,由兩個相互獨立的子項目Elastic-Job-Lite和Elastic-Job-Cloud組成。java

Elastic-Job-Lite定位爲輕量級無中心化解決方案,使用jar包的形式提供分佈式任務的協調服務;Elastic-Job-Cloud採用自研Mesos Framework的解決方案,額外提供資源治理、應用分發以及進程隔離等功能。python

官網地址:elasticjob.io/git

Github:github.com/elasticjob/…github

爲何要使用Elastic-Job

目前咱們公司用的是基於Linux Crontab的定時任務執行器。web

存在以下問題:shell

  • 沒法集中管理任務
  • 不能水平擴展
  • 無可視化界面操做
  • 存在單點故障

除了Linux Crontab在java這塊的方案還有 Quartz,但 Quartz缺乏分佈式並行調度的功能。bash

存在的問題也很明顯:服務器

  • 當個人項目是一個單體應用時,在裏面基於Quartz起一個定時任務,能夠很愉快的運行
  • 當個人項目作了負載,擴充到3臺節點時,3個節點上的任務會同時執行,數據亂了
  • 同時執行要保證數據沒問題須要引入分佈式鎖來調度,難度增大

怎麼解決?

1.自研框架

這種狀況下可能須要本身去開發一個可以知足公司業務需求的調度框架,成本較高,不推薦微信

以前我也有想過要本身寫一個,思路有了,就是還沒開始,調度框架只要是調度問題,像Elastic-Job就作的很是好,它把分片的規則讓你本身定義,而後根據你定的片的數據給你調度下,至於每一個節點處理什麼數據你本身去控制。架構

若是說部採用這種方式,也不去寫數據的分發,那麼我以爲最簡單的辦法就是用消息隊列來實現了。

採用zookeeper來作調度,存儲任務數據,定義一個通用的接口,分紅2部分,以下:

public interface Job {
    void read();
    void process(Object data);
}
複製代碼

而後使用者經過實現上面的接口來讀取須要處理的數據,在process中處理分發過來的數據

至於分發的話一個任務能夠經過註解來標記使用一個隊列,也可使用通用的,這樣就能夠實現多個消費者同時消費了,就算其中一個掛掉也不影響整個任務,也不用考慮失效轉移了。

可是要作控制的是read方法,必須只有一個節點執行,否則數據就分發重複了。

上面只是提供一個簡單的思路,固然有web頁面管理任務,也能夠手動執行任務等等。

2.選擇開源方案

TBSchedule:阿里早期開源的分佈式任務調度系統。代碼略陳舊,使用timer而非線程池執行任務調度。衆所周知,timer在處理異常情況時是有缺陷的。並且TBSchedule做業類型較爲單一,只能是獲取/處理數據一種模式。還有就是文檔缺失比較嚴重。

Spring Batch: Spring Batch是一個輕量級的,徹底面向Spring的批處理框架,能夠應用於企業級大量的數據處理系統。Spring Batch以POJO和你們熟知的Spring框架爲基礎,使開發者更容易的訪問和利用企業級服務。Spring Batch能夠提供大量的,可重複的數據處理功能,包括日誌記錄/跟蹤,事務管理,做業處理統計工做從新啓動、跳過,和資源管理等重要功能。

Elastic-Job:國內開源產品,中文文檔,入門快速,使用簡單,功能齊全,社區活躍,由噹噹網架構師張亮主導,目前在開源方面投入了比較多的時間。

爲何選擇Elastic-Job?

  • 分佈式調度協調
  • 彈性擴容縮容
  • 失效轉移
  • 錯過執行做業重觸發
  • 做業分片一致性,保證同一分片在分佈式環境中僅一個執行實例
  • 自診斷並修復分佈式不穩定形成的問題
  • 支持並行調度
  • 支持做業生命週期操做
  • 豐富的做業類型
  • Spring整合以及命名空間提供
  • 運維平臺

做業類型介紹

Simple:簡單做業,經常使用, 意爲簡單實現,未經任何封裝的類型。需實現SimpleJob接口。該接口僅提供單一方法用於覆蓋,此方法將定時執行。與Quartz原生接口類似,但提供了彈性擴縮容和分片等功能。

public class MyElasticJob implements SimpleJob {
    
    @Override
    public void execute(ShardingContext context) {
        switch (context.getShardingItem()) {
            case 0: 
                // do something by sharding item 0
                break;
            case 1: 
                // do something by sharding item 1
                break;
            case 2: 
                // do something by sharding item 2
                break;
            // case n: ...
        }
    }
}
複製代碼

DataFlow:Dataflow類型用於處理數據流,需實現DataflowJob接口。該接口提供2個方法可供覆蓋,分別用於抓取(fetchData)和處理(processData)數據。

public class MyElasticJob implements DataflowJob<Foo> {
    
    @Override
    public List<Foo> fetchData(ShardingContext context) {
        switch (context.getShardingItem()) {
            case 0: 
                List<Foo> data = // get data from database by sharding item 0
                return data;
            case 1: 
                List<Foo> data = // get data from database by sharding item 1
                return data;
            case 2: 
                List<Foo> data = // get data from database by sharding item 2
                return data;
            // case n: ...
        }
    }
    
    @Override
    public void processData(ShardingContext shardingContext, List<Foo> data) {
        // process data
        // ...
    }
}
複製代碼

Script:Script類型做業意爲腳本類型做業,支持shell,python,perl等全部類型腳本。只需經過控制檯或代碼配置scriptCommandLine便可,無需編碼。執行腳本路徑可包含參數,參數傳遞完畢後,做業框架會自動追加最後一個參數爲做業運行時信息。

其實我建議增長一種任務類型,就是流水式任務,爲此我還特地提了一個issues:

github.com/elasticjob/…

在特定的業務需求下,A任務執行完以後,須要執行B任務,以此類推,這種具備依賴性的流水式的任務。

在目前能夠將這些任務合在一塊兒,經過代碼調用的方式來達到效果。

但我但願能增長這樣一個功能,好比加一個配置,job-after="com.xxx.job.XXXJob" 在執行完這個任務以後,自動調用另外一個任務BB,BB任務只須要配置任務信息,把cron去掉就能夠,由於BB是依靠別的任務觸發執行的。

固然這些任務必須在同一個zk的命名空間下,若是能支持誇命名空間就更好了

這樣就能達到,流水式的任務操做了,而且每一個任務能夠用不一樣的分片key
複製代碼

開始使用

1.關於框架怎麼搭建,怎麼配置就不作講解了,官網文檔確定比我寫的好,通常開源框架都有demo,你們下載下來導入IDE中便可運行。

demo地址:github.com/elasticjob/…

2.介紹下使用中的一些經驗

  • 建議按產品來劃分任務,一個產品對應一個任務的項目,當團隊比較大的時候可能一個小組負責一個產品,這樣就不會跟別的混在一塊兒了
  • 任務的描述必定要寫清楚,幹嗎用的,在配置任務中有個描述的配置,填寫清楚
/**
 * 用戶維度統計任務<br>統計出用戶的房產,置換,貸款等信息
 * @author yinjihuan
 */
public class UserStatJob implements SimpleJob {

	private Logger logger = LoggerFactory.getLogger(UserStatJob.class);
	
	@Autowired
	private EnterpriseProductUserService enterpriseProductUserService;
	
	@Autowired
	private UserStatService userStatService;
	
	@Autowired
	private HouseInfoService houseInfoService;
	
	@Autowired
	private HouseSubstitutionService houseSubstitutionService;
	
	@Autowired
	private LoanApplyService loanApplyService;
	
	@Override
	public void execute(ShardingContext shardingContext) {
		logger.info("開始執行UserStatJob");
		long total = enterpriseProductUserService.queryCount();
		int pages = PageBean.calcPages(total, 1000);
		for (int i = 1; i <= pages; i++) {
			List<EnterpriseProductUser> users = enterpriseProductUserService.queryByPage(i, 1000);
			for (EnterpriseProductUser user : users) {
				try {
					processStat(user);
				} catch (Exception e) {
					logger.error("用戶維度統計任務異常", e);
					DingDingMessageUtil.sendTextMessage("用戶維度統計任務異常:" + e.getMessage());
				}
			}
		}
		logger.info("UserStatJob執行結束");
	}
	
	private void processStat(EnterpriseProductUser user) {
		UserStat stat = userStatService.getByUid(user.getEid(), user.getUid());
		Long eid = user.getEid();
		String uid = user.getUid();
		if (stat == null) {
			stat = new UserStat();
			stat.setEid(eid);
			stat.setUid(uid);
			stat.setUserAddTime(user.getAddTime());
			stat.setCity(user.getCity());
			stat.setRegion(user.getRegion());
		}
		stat.setHouseCount(houseInfoService.queryCountByEidAndUid(eid, uid));
		stat.setHousePrice(houseInfoService.querySumMoneyByEidAndUid(eid, uid));
		stat.setSubstitutionCount(houseSubstitutionService.queryCount(eid, uid));
		stat.setSubstitutionMaxPrice(houseSubstitutionService.queryMaxBudget(eid, uid));
		stat.setLoanEvalCount(loanApplyService.queryUserCountByType(eid, uid, 2));
		stat.setLoanEvalMaxPrice(loanApplyService.queryMaxEvalMoney(eid, uid));
		stat.setLoanCount(loanApplyService.queryUserCountByType(eid, uid, 1));
		stat.setModifyDate(new Date());
		userStatService.save(stat);
	}

}
複製代碼
<!-- 用戶統計任務 天天1點10分執行 -->
 <job:simple id="userStatJob" class="com.fangjia.job.fsh.job.UserStatJob" registry-center-ref="regCenter"
    	 sharding-total-count="1" cron="0 10 1 * * ?" sharding-item-parameters=""
    	 failover="true" description="【房生活】用戶維度統計任務,統計出用戶的房產,置換,貸款等信息 UserStatJob"
    	 overwrite="true" event-trace-rdb-data-source="elasticJobLog" job-exception-handler="com.fangjia.job.fsh.handler.CustomJobExceptionHandler">
    	 
    	  <job:listener class="com.fangjia.job.fsh.listener.MessageElasticJobListener"></job:listener>
    	  
 </job:simple>
複製代碼
  • 爲每一個任務配置一個統一的監聽器,來對任務的執行,結束進行通知,能夠是短信,郵件或者別的,我這邊用的是釘釘的機器人來發消息通知到釘釘
/**
 * 做業監聽器, 執行先後發送釘釘消息進行通知
 * @author yinjihuan
 */
public class MessageElasticJobListener implements ElasticJobListener {
    
    @Override
    public void beforeJobExecuted(ShardingContexts shardingContexts) {
    	String date = DateUtils.date2Str(new Date());
    	String msg = date + " 【FSH-" + shardingContexts.getJobName() + "】任務開始執行====" + JsonUtils.toJson(shardingContexts);
    	DingDingMessageUtil.sendTextMessage(msg);
    }
    
    @Override
    public void afterJobExecuted(ShardingContexts shardingContexts) {
    	String date = DateUtils.date2Str(new Date());
    	String msg = date + " 【FSH-" + shardingContexts.getJobName() + "】任務執行結束====" + JsonUtils.toJson(shardingContexts);
    	DingDingMessageUtil.sendTextMessage(msg);
    }

}
複製代碼
  • 能夠在每一個任務類上定義一個註解,註解用來標識這個任務是誰開發的,而後對應的釘釘消息就發送給誰,我我的建議仍是建一個羣,而後你們都在裏面,由於若是單獨發給一個開發人員,除非他的主動性很高,否則也沒什麼用,我我的建議發在羣裏,這樣領導看見了就會說那個誰誰誰,你的任務報錯了,去查下緣由。我這邊是統一發的,沒有定義註解。

  • 任務的異常處理,能夠在任務中對異常進行處理,除了記錄日誌,也用統一封裝好的發送釘釘消息來進行通知,實時知道任務是否有異常,能夠查看我上面的代碼。

  • 還有一種是沒捕獲的異常,怎麼通知到羣裏,能夠自定義異常處理類來實現, 經過配置job-exception-handler="com.fangjia.job.fsh.handler.CustomJobExceptionHandler"

/**
 * 自定義異常處理,在任務異常時使用釘釘發送通知
 * @author yinjihuan
 */
public class CustomJobExceptionHandler implements JobExceptionHandler {
	
	private Logger logger = LoggerFactory.getLogger(CustomJobExceptionHandler.class);
	
	@Override
	public void handleException(String jobName, Throwable cause) {
		logger.error(String.format("Job '%s' exception occur in job processing", jobName), cause);
		DingDingMessageUtil.sendTextMessage("【"+jobName+"】任務異常。" + cause.getMessage());
	}

}
複製代碼
  • 能夠經過監聽job_name\instances\job_instance_id節點是否存在來判斷做業節點是否掛掉,該節點爲臨時節點,若是做業服務器下線,該節點將刪除。固然也能夠經過其餘的工具來進行監控。

  • 任務的編寫儘可能考慮到水平擴展性,像我上面貼的那個列子其實就沒考慮到,只是一個單純的任務,由於我沒有用到shardingParameter來處理對應的片的數據,這邊其實建議你們考慮下,若是任務時間短。處理的數據少,能夠寫成我這樣。若是可以預計到將來有大量數據須要處理,並且時間很長的話最好配置下分片的規則,而且將代碼寫成按分片來處理,這樣到了後面就直接修改配置,增長下節點就好了。

更多技術分享請關注微信公衆號:猿天地

image.png
相關文章
相關標籤/搜索