Schedulerx2.0的客戶端提供分佈式執行、多種任務類型、統一日誌等框架,用戶只要依賴schedulerx-worker這個jar包,經過schedulerx2.0提供的編程模型,簡單幾行代碼就能實現一套高可靠可運維的分佈式執行引擎。docker
這篇文章重點是介紹基於schedulerx2.0的分佈式執行引擎原理和最佳實踐,相信看完這篇文章,你們都能寫出高效率的分佈式做業,說不定速度能提高好幾倍:)數據庫
Worker整體架構參考Yarn的架構,分爲TaskMaster, Container, Processor三層:編程
以MapTaskMaster爲例,大概的原理以下圖所示:架構
Schedulerx2.0提供了多種分佈式編程模型,這篇文章主要介紹Map模型(以後的文章還會介紹MapReduce模型,適用更多的業務場景),簡單幾行代碼就能夠將海量數據分佈式到多臺機器上進行分佈式跑批,很是簡單易用。框架
針對不一樣的跑批場景,map模型做業還提供了並行計算、內存網格、網格計算三種執行方式:運維
由於並行任務具備子任務列表:分佈式
如上圖,子任務列表能夠看到每一個子任務的狀態、機器,還有重跑、查看日誌等操做。ide
由於並行計算要作到子任務級別的可視化,而且worker掛了、重啓還能支持手動重跑,就須要把task持久化到server端:post
如上圖所示:性能
網格計算要支持百萬級別的task,若是全部任務都往server回寫,server確定扛不住,因此網格計算的存儲其實是分佈式在用戶本身的機器上的:
如上圖所示:
6.1 需求
舉個例子:
6.2 反面案例
咱們先看下以下代碼是否有問題?
public class ScanSingleTableProcessor extends MapJobProcessor { private static int pageSize = 1000; @Override public ProcessResult process(JobContext context) { String taskName = context.getTaskName(); Object task = context.getTask(); if (WorkerConstants.MAP_TASK_ROOT_NAME.equals(taskName)) { int recordCount = queryRecordCount(); int pageAmount = recordCount / pageSize;//計算分頁數量 for(int i = 0 ; i < pageAmount ; i ++) { List<Record> recordList = queryRecord(i);//根據分頁查詢一頁數據 map(recordList, "record記錄");//把子任務分發出去並行處理 } return new ProcessResult(true);//true表示執行成功,false表示失敗 } else if ("record記錄".equals(taskName)) { //TODO return new ProcessResult(true); } return new ProcessResult(false); } }
如上面的代碼所示,在root任務中,會把數據庫全部記錄讀取出來,每一行就是一個Record,而後分發出去,分佈式到不一樣的worker上去執行。邏輯是沒有問題的,可是實際上性能很是的差。結合網格計算原理,咱們把上面的代碼繪製成下面這幅圖:
如上圖所示,root任務一開始會全量的讀取A表的數據,而後會全量的存到h2中,pull線程還會全量的從h2讀取一次全部的task,還會分發給全部客戶端。因此實際上對A表中的數據:
這個效率是很是低的。
6.3 正面案例
下面給出正面案例的代碼:
public class ScanSingleTableJobProcessor extends MapJobProcessor { private static final int pageSize = 100; static class PageTask { private int startId; private int endId; public PageTask(int startId, int endId) { this.startId = startId; this.endId = endId; } public int getStartId() { return startId; } public int getEndId() { return endId; } } @Override public ProcessResult process(JobContext context) { String taskName = context.getTaskName(); Object task = context.getTask(); if (taskName.equals(WorkerConstants.MAP_TASK_ROOT_NAME)) { System.out.println("start root task"); Pair<Integer, Integer> idPair = queryMinAndMaxId(); int minId = idPair.getFirst(); int maxId = idPair.getSecond(); List<PageTask> taskList = Lists.newArrayList(); int step = (int) ((maxId - minId) / pageSize); //計算分頁數量 for (int i = minId; i < maxId; i+=step) { taskList.add(new PageTask(i, (i+step > maxId ? maxId : i+step))); } return map(taskList, "Level1Dispatch"); } else if (taskName.equals("Level1Dispatch")) { PageTask record = (PageTask)task; long startId = record.getStartId(); long endId = record.getEndId(); //TODO return new ProcessResult(true); } return new ProcessResult(true); } @Override public void postProcess(JobContext context) { //TODO System.out.println("all tasks is finished."); } private Pair<Integer, Integer> queryMinAndMaxId() { //TODO select min(id),max(id) from xxx return null; } }
如上面的代碼所示,
根據上面的代碼和網格計算原理,得出下面這幅圖:
如上圖所示,
綜上,對A表訪問次數少了好幾倍,對h2存儲壓力少了上萬倍,不但執行速度能夠快不少,還保證不會把本身本地的h2數據庫搞掛。
本文爲雲棲社區原創內容,未經容許不得轉載。