Schedulerx2.0分佈式計算原理&最佳實踐

1. 前言

Schedulerx2.0的客戶端提供分佈式執行、多種任務類型、統一日誌等框架,用戶只要依賴schedulerx-worker這個jar包,經過schedulerx2.0提供的編程模型,簡單幾行代碼就能實現一套高可靠可運維的分佈式執行引擎。docker

這篇文章重點是介紹基於schedulerx2.0的分佈式執行引擎原理和最佳實踐,相信看完這篇文章,你們都能寫出高效率的分佈式做業,說不定速度能提高好幾倍:)數據庫

2. 可擴展的執行引擎

Worker整體架構參考Yarn的架構,分爲TaskMaster, Container, Processor三層:編程

  • TaskMaster:相似於yarn的AppMaster,支持可擴展的分佈式執行框架,進行整個jobInstance的生命週期管理、container的資源管理,同時還有failover等能力。默認實現StandaloneTaskMaster(單機執行),BroadcastTaskMaster(廣播執行),MapTaskMaster(並行計算、內存網格、網格計算),MapReduceTaskMaster(並行計算、內存網格、網格計算)。
  • Container:執行業務邏輯的容器框架,支持線程/進程/docker/actor等。
  • Processor:業務邏輯框架,不一樣的processor表示不一樣的任務類型。

以MapTaskMaster爲例,大概的原理以下圖所示:架構

3. 分佈式編程模型之Map模型

Schedulerx2.0提供了多種分佈式編程模型,這篇文章主要介紹Map模型(以後的文章還會介紹MapReduce模型,適用更多的業務場景),簡單幾行代碼就能夠將海量數據分佈式到多臺機器上進行分佈式跑批,很是簡單易用。框架

針對不一樣的跑批場景,map模型做業還提供了並行計算、內存網格、網格計算三種執行方式:運維

  • 並行計算:子任務300如下,有子任務列表。
  • 內存網格:子任務5W如下,無子任務列表,速度快。
  • 網格計算:子任務100W如下,無子任務列表。

4. 並行計算原理

由於並行任務具備子任務列表:分佈式

如上圖,子任務列表能夠看到每一個子任務的狀態、機器,還有重跑、查看日誌等操做。ide

由於並行計算要作到子任務級別的可視化,而且worker掛了、重啓還能支持手動重跑,就須要把task持久化到server端:post

如上圖所示:性能

  1. server觸發jobInstance到某個worker,選中爲master。
  2. MapTaskMaster選擇某個worker執行root任務,當執行map方法時,會回調MapTaskMaster。
  3. MapTaskMaster收到map方法,會把task持久化到server端。
  4. 同時,MapTaskMaster還有個pull線程,不停拉取INIT狀態的task,並派發給其餘worker執行。

5. 網格計算原理

網格計算要支持百萬級別的task,若是全部任務都往server回寫,server確定扛不住,因此網格計算的存儲其實是分佈式在用戶本身的機器上的:

如上圖所示:

  1. server觸發jobInstance到某個worker,選中爲master。
  2. MapTaskMaster選擇某個worker執行root任務,當執行map方法時,會回調MapTaskMaster。
  3. MapTaskMaster收到map方法,會把task持久化到本地h2數據庫。
  4. 同時,MapTaskMaster還有個pull線程,不停拉取INIT狀態的task,並派發給其餘worker執行。

6. 最佳實踐

6.1 需求

舉個例子:

  1. 讀取A表中status=0的數據。
  2. 處理這些數據,插入B表。
  3. 把A表中處理過的數據的修改status=1。
  4. 數據量有4億+,但願縮短期。

6.2 反面案例

咱們先看下以下代碼是否有問題?

public class ScanSingleTableProcessor extends MapJobProcessor {
    private static int pageSize = 1000;

    @Override
    public ProcessResult process(JobContext context) {
        String taskName = context.getTaskName();
        Object task = context.getTask();

        if (WorkerConstants.MAP_TASK_ROOT_NAME.equals(taskName)) {
            int recordCount = queryRecordCount();
            int pageAmount = recordCount / pageSize;//計算分頁數量
            for(int i = 0 ; i < pageAmount ; i ++) {
                List<Record> recordList = queryRecord(i);//根據分頁查詢一頁數據
                map(recordList, "record記錄");//把子任務分發出去並行處理
            }
            return new ProcessResult(true);//true表示執行成功,false表示失敗
        } else if ("record記錄".equals(taskName)) {
            //TODO
            return new ProcessResult(true);
        }
        return new ProcessResult(false);
    }
}

如上面的代碼所示,在root任務中,會把數據庫全部記錄讀取出來,每一行就是一個Record,而後分發出去,分佈式到不一樣的worker上去執行。邏輯是沒有問題的,可是實際上性能很是的差。結合網格計算原理,咱們把上面的代碼繪製成下面這幅圖:

如上圖所示,root任務一開始會全量的讀取A表的數據,而後會全量的存到h2中,pull線程還會全量的從h2讀取一次全部的task,還會分發給全部客戶端。因此實際上對A表中的數據:

  • 全量讀2次
  • 全量寫一次
  • 全量傳輸一次

這個效率是很是低的。

6.3 正面案例

下面給出正面案例的代碼:

public class ScanSingleTableJobProcessor extends MapJobProcessor {
    private static final int pageSize = 100;

    static class PageTask {
        private int startId;
        private int endId;
        public PageTask(int startId, int endId) {
             this.startId = startId;
             this.endId = endId;
        }
        public int getStartId() {
              return startId;
        }
        public int getEndId() {
              return endId;
        }
    }

    @Override
    public ProcessResult process(JobContext context) {
        String taskName = context.getTaskName();
        Object task = context.getTask();
        if (taskName.equals(WorkerConstants.MAP_TASK_ROOT_NAME)) {
            System.out.println("start root task");
            Pair<Integer, Integer> idPair = queryMinAndMaxId();
            int minId = idPair.getFirst();
            int maxId = idPair.getSecond();
            List<PageTask> taskList = Lists.newArrayList();
            int step = (int) ((maxId - minId) / pageSize); //計算分頁數量
            for (int i = minId; i < maxId; i+=step) {
                taskList.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));
            }
            return map(taskList, "Level1Dispatch");
        } else if (taskName.equals("Level1Dispatch")) {
            PageTask record = (PageTask)task;
            long startId = record.getStartId();
            long endId = record.getEndId();
            //TODO
            return new ProcessResult(true);
        }
        return new ProcessResult(true);
    }

    @Override
    public void postProcess(JobContext context) {
        //TODO
        System.out.println("all tasks is finished.");
    }

    private Pair<Integer, Integer> queryMinAndMaxId() {
        //TODO select min(id),max(id) from xxx
        return null;
    }
}

如上面的代碼所示,

  • 每一個task不是整行記錄的record,而是PageTask,裏面就2個字段,startId和endId。
  • root任務,沒有全量的讀取A表,而是讀一下整張表的minId和maxId,而後構造PageTask進行分頁。好比task1表示PageTask[1,1000],task2表示PageTask[1001,2000]。每一個task處理A表不一樣的數據。
  • 在下一級task中,若是拿到的是PageTask,再根據id區間去A表處理數據。

根據上面的代碼和網格計算原理,得出下面這幅圖:

如上圖所示,

  • A表只須要全量讀取一次。
  • 子任務數量比反面案例少了上千、上萬倍。
  • 子任務的body很是小,若是recod中有大字段,也少了上千、上萬倍。

綜上,對A表訪問次數少了好幾倍,對h2存儲壓力少了上萬倍,不但執行速度能夠快不少,還保證不會把本身本地的h2數據庫搞掛。



本文做者:黃曉萌

閱讀原文

本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索