Monorepo 中的任務調度機制

前言

Monorepo 中的一個項目稱爲 project,對 project 進行的具體操做稱爲任務 task,好比 buildtest,能夠狹義地理解爲 npm scripts 中註冊的操做,Monorepo 管理工具應當有能力調度這些任務。git

項目依賴圖

先看一個 🌰,如上圖所示,存在一個依賴關係較爲複雜的 Monorepo,此時須要執行某個任務,例如 build,如何同時保證任務執行順序以及任務執行效率(假設最大任務並行數爲 N)?github

接下來就是枯燥乏味的作題過程,我們先把上面那張項目依賴圖抽象成代碼。shell

問題

interface Project {
  name: string;
  actions: { name: string; fn: () => Promise<void> }[];
  dependencyProjects: Project[];
}

const sleep = (s: number): Promise<void> =>
  new Promise((r) => setTimeout(r, s));

// Monorepo 中註冊的全部項目
const projects: Project[] = [
  "@monorepo/a",
  "@monorepo/b",
  "@monorepo/c",
  "@monorepo/d",
  "@monorepo/x",
  "@monorepo/y",
  "@monorepo/z",
].map((name) => ({
  name,
  actions: [{ name: "build", fn: () => sleep(Math.random() * 1000) }],
  dependencyProjects: [],
}));

const [A, B, C, D, X, Y, Z] = projects;

A.dependencyProjects = [B];
B.dependencyProjects = [D];
C.dependencyProjects = [D, X, Y];
X.dependencyProjects = [Y, Z];

/** * 實現本方法,使得 build 行爲按照正確的順序執行,且保證執行效率 * @param projects 須要執行任務的 project 集合 * @param actionName 具體操做名稱 * @param limit 任務最大並行數 */
function run(projects: Project[], actionName: string, limit: number) {
  // todo
}

run(projects, "build", 12);
複製代碼

解題

很明顯,project 之間存在依賴關係,那麼任務之間也存在依賴關係,那麼能夠獲得如下結論:npm

  1. 當前任務做爲下游任務時,當前任務完成後,須要更新其上游任務的依賴任務,從其內移除當前任務
  2. 當前任務做爲上游任務時,只有當前任務的下游任務都被清空(完成)時,當前任務才能夠執行

因而 task 定義以下:json

interface Task {
  // 任務名 `${projectName}:{actionName}`
  name: string;
  // 當前任務依賴的任務,即當前任務的下游任務,當該 dependenciesSet 被清空,說明當前任務能夠被執行
  dependencies: Set<Task>;
  // 依賴當前任務的任務,即當前任務的上游任務,當前任務完成後,須要更新其上游任務的 dependenciesSet(從其內移除當前任務)
  dependents: Set<Task>;
  // 具體任務執行函數
  fn: () => Promise<void>;
}
複製代碼

初始化任務

根據 projects 參數,構造出項目對應的任務。bash

function run(projects: Project[], actionName: string, limit: number) {
  // 任務名與任務的映射
  const tasks = new Map<string, Task>();
  projects.forEach((project) =>
    tasks.set(getTaskName(project, actionName), {
      name: getTaskName(project, actionName),
      dependencies: new Set(),
      dependents: new Set(),
      fn: project.actions.find((a) => a.name === actionName)?.fn ?? noop,
    })
  );
}

// 獲取任務名
function getTaskName(project: Project, actionName: string) {
  return `${project.name}:${actionName}`;
}

function noop(): Promise<void> {
  return new Promise((r) => r());
}
複製代碼

補充 dependencies 與 dependents

假設存在 project1,對其進行如下操做:markdown

  1. 取到當前項目對應的任務 task1
  2. 獲取當前任務對應的下游任務名 dependencyTaskNames(基於 project1.dependencyProjects)
  3. 遍歷下游任務名 dependencyTaskName
  4. 取到下游任務(上一步初始化而來) dependencyTask
  5. 補充 task1 的 dependencies
  6. 補充 dependencyTask 的 dependents
function run(projects: Project[], actionName: string, limit: number) {
  // ...
  // project 與 project 對應 task 的下游任務名稱
  function getDependencyTaskNames(project: Project): Set<string> {
    const dependencyTaskNames: Set<string> = new Set();
    // 遍歷下游項目
    for (const dep of project.dependencyProjects) {
      // 蒐集下游任務名
      dependencyTaskNames.add(getTaskName(dep, actionName));
    }

    return dependencyTaskNames;
  }

  projects.forEach((project) => {
    // 1. 獲取當前項目對應的任務
    const task = tasks.get(getTaskName(project, actionName))!;
    // 2. 獲取當前任務對應的下游任務名
    const dependencyTaskNames = getDependencyTaskNames(project);
    // 3. 遍歷下游任務名
    for (const dependencyName of dependencyTaskNames) {
      // 4. 取到下游任務(上一步初始化而來)
      const dependency: Task = tasks.get(dependencyName)!;
      // 5. 補充當前任務的 dependencies
      task.dependencies.add(dependency);
      // 6. 補充下游任務的 dependents
      dependency.dependents.add(task);
    }
  });
}
複製代碼

任務依賴圖

並行執行任務

function run(projects: Project[], actionName: string, limit: number) {
  // ...
  const taskQueue: Task[] = [];
  for (const [, task] of tasks) {
    taskQueue.push(task);
  }
  runTasks(taskQueue, limit);
}

async function runTasks(taskQueue: Task[], limit: number) {
  let currentActiveTasks = 0;
  function getNextTask() {
    for (let i = 0; i < taskQueue.length; i++) {
      const task: Task = taskQueue[i];
      // 返回準備好執行的任務
      if (task.dependencies.size === 0) {
        return taskQueue.splice(i, 1)[0];
      }
    }
    return null;
  }

  function _run(task: Task): Promise<void> {
    return task.fn().then(() => {
      console.log("act success", task.name);
      currentActiveTasks--;
      // 當前任務執行完成,從其上游任務的 dependencies 中移除當前任務
      task.dependents.forEach((dependent: Task) => {
        dependent.dependencies.delete(task);
      });
      // 繼續執行
      start();
    });
  }

  async function start() {
    let ctask: Task | null = null;
    const taskPromises: Promise<void>[] = [];
    while (currentActiveTasks < limit && (ctask = getNextTask())) {
      currentActiveTasks++;
      const task: Task = ctask;
      taskPromises.push(_run(task));
    }

    await Promise.all(taskPromises);
  }

  start();
}
複製代碼

執行 run(projects, "build", 12),能夠按照正確順序輸出結果。app

act success @monorepo/z:build
act success @monorepo/y:build
act success @monorepo/x:build
act success @monorepo/d:build
act success @monorepo/b:build
act success @monorepo/a:build
act success @monorepo/c:build
複製代碼

關鍵路徑長度

上文中的實現使得任務能夠按照正確的順序執行,可是在實際任務執行過程當中,最長的任務鏈限制了整個任務樹的執行速度,效率不能獲得保證。dom

關鍵路徑長度:任務距離最遠的根節點的距離。async

interface Task {
  name: string;
  dependencies: Set<Task>;
  dependents: Set<Task>;
  // 關聯路徑長度
  criticalPathLength?: number;
  fn: () => Promise<void>;
}

function run(projects: Project[], actionName: string, limit: number) {
  // ...
  const taskQueue: Task[] = [];
  for (const [, task] of tasks) {
    // 計算關鍵路徑長度
    task.criticalPathLength = calculateCriticalPaths(task);
    taskQueue.push(task);
  }
  // 基於關鍵路徑長度對任務進行降序排序
  taskQueue.sort((a, b) => b.criticalPathLength! - a.criticalPathLength!);
  runTasks(taskQueue, limit);
}

// 計算關鍵路徑長度
function calculateCriticalPaths(task: Task): number {
  // 重複走到某一個任務了 直接返回值
  if (task.criticalPathLength !== undefined) {
    return task.criticalPathLength;
  }

  // 若是沒有 dependents, 說明咱們是 "root",即 app 此類不被依賴的 project
  if (task.dependents.size === 0) {
    task.criticalPathLength = 0;
    return task.criticalPathLength;
  }

  // 遞歸向上取最大值 每次 +1
  const depsLengths: number[] = [];
  task.dependents.forEach((dep) =>
    depsLengths.push(calculateCriticalPaths(dep))
  );
  task.criticalPathLength = Math.max(...depsLengths) + 1;
  return task.criticalPathLength;
}
複製代碼

criticalPathLength

Selecting subsets of projects

實際業務開發中,通常不須要構建 Monorepo 內所有的項目,在 應用級 Monorepo 優化方案 一文中介紹了使用 Monorepo 方式管理業務項目可能遇到的一些坑點以及相關解決方案,其中有這樣一個問題:

發佈速度慢

monorepo-1 若須要發佈 app1,則全部 package 均會被構建,而非僅有 app1 依賴的 package1 與 package2 的 build 腳本被執行。

最終經過 Rush 提供的 Selecting subsets of projects 能力解決了以上問題。

具體命令以下:

# 只會執行 @monorepo/app1 及其依賴 package 的 build script
$ rush build -t @monorepo/app1
複製代碼

-t PROJECT--to PROJECT,後面PROJECT參數爲這次任務的終點項目包名,若不想包含終點項目,能夠改成-T參數,即--to-except PROJECT,與之相似的可挑選出項目子集的參數還有:

  • -f PROJECT, --from PROJECT
  • -F PROJECT, --from-except PROJECT
  • -i PROJECT, --impacted-by PROJECT
  • -I PROJECT, --impacted-by-except PROJECT
  • -o PROJECT, --only PROJECT

若是不指定這些參數,那麼默認會執行全部項目(rush.json 中註冊過的項目)的對應的 npm scripts

固然,也能夠指定多個參數,最差狀況獲取到的 subsets 是 projects 自己,與不指定參數表現一致(通常不會)。

實際應用場景:在 CI 階段,會篩選出全部發生變動的項目及其會影響到的項目進行一次 build check,好比一次提交改動了 @monorepo/a 以及 @monorepo/b的源碼,CI 就會執行如下命令:

$ rush build -t @monorepo/a -f @monorepo/a -t @monorepo/b -f @monorepo/b
複製代碼

不用擔憂某些 project 的任務會被重複執行,這種任務只是圖裏的一個入度不爲零的點,挑選出 subsets 後,按照前面的任務調度機制執行任務便可,任務都是並行的,速度通常可接受。

除了內置的rush build等命令支持以上參數(默認執行 package.json 中的 build script),Rush 也將此能力開放給了自定義命令,也就是說你能夠自定義一個 rush foo 命令,用於調用指定範圍項目 package.json 中的 foo script,配合 Rush 的任務調度機制,任務能夠保證執行順序(若是須要的話),具體能夠參考 custom_commands 一節。

Selecting subsets of projects 的具體實現不在本文討論範圍內。

結語

拓撲排序與關鍵路徑,作了道題。🌚

相關文章
相關標籤/搜索