基於DAG實現的任務編排框架&平臺

最近在作的工做比較須要一個支持任務編排工做流的框架或者平臺,這裏記錄下實現上的一些思路。前端

任務編排工做流

任務編排是什麼意思呢,顧名思義就是能夠把"任務"這個原子單位按照本身的方式進行編排,任務之間可能互相依賴。複雜一點的編排以後就能造成一個 workflow 工做流了。咱們但願這個工做流按照咱們編排的方式去執行每一個原子 task 任務。以下圖所示,咱們但願先併發運行 Task A 和 Task C,Task A 執行完後串行運行 Task B,在併發等待 Task B 和 C 都結束後運行 Task D,這樣就完成了一個典型的任務編排工做流。node

DAG 有向無環圖

首先咱們瞭解圖這個數據結構,每一個元素稱爲頂點 vertex,頂點之間的連線稱爲邊 edge。像咱們畫的這種帶箭頭關係的稱爲有向圖,箭頭關係之間能造成一個環的成爲有環圖,反之稱爲無環圖。顯然運用在咱們任務編排工做流上,最合適的是 DAG 有向無環圖。數據庫

咱們在代碼裏怎麼存儲圖呢,有兩種數據結構:鄰接矩陣和鄰接表。後端

下圖表示一個有向圖的鄰接矩陣,例如 x->y 的邊,只需將 Array[x][y]標識爲 1 便可。數據結構

此外咱們也能夠使用鄰接表來存儲,這種存儲方式較好地彌補了鄰接矩陣浪費空間的缺點,但相對來講鄰接矩陣能更快地判斷連通性。併發

通常在代碼實現上,咱們會選擇鄰接矩陣,這樣咱們在判斷兩點之間是否有邊更方便點。框架

一個任務編排框架

瞭解了 DAG 的基本知識後咱們能夠來簡單實現一下。首先是存儲結構,咱們的 Dag 表示一整個圖,Node 表示各個頂點,每一個頂點有其 parents 和 children:ide

//Dagpublic final class DefaultDag<T, R> implements Dag<T, R> {	private Map<T, Node<T, R>> nodes = new HashMap<T, Node<T, R>>();
    ...
}//Nodepublic final class Node<T, R> {	/**
	 * incoming dependencies for this node
	 */private Set<Node<T, R>> parents = new LinkedHashSet<Node<T, R>>();/**
     * outgoing dependencies for this node
     */private Set<Node<T, R>> children = new LinkedHashSet<Node<T, R>>();
    ...
}複製代碼

畫兩個頂點,以及爲這兩個頂點連邊操做以下:this

public void addDependency(final T evalFirstNode, final T evalLaterNode) {
		Node<T, R> firstNode = createNode(evalFirstNode);
		Node<T, R> afterNode = createNode(evalLaterNode);

		addEdges(firstNode, afterNode);
	}   private Node<T, R> createNode(final T value) {
		Node<T, R> node = new Node<T, R>(value);		return node;
	}	private void addEdges(final Node<T, R> firstNode, final Node<T, R> afterNode) {		if (!firstNode.equals(afterNode)) {
			firstNode.getChildren().add(afterNode);
			afterNode.getParents().add(firstNode);
		}
	}複製代碼

到如今咱們其實已經把基礎數據結構寫好了,但咱們做爲一個任務編排框架最終是須要線程去執行的,咱們把它和線程池一塊兒給包裝一下。線程

//任務編排線程池public class DefaultDexecutor <T, R> {//執行線程,和2種重試線程private final ExecutorService<T, R> executionEngine;	private final ExecutorService immediatelyRetryExecutor;	private final ScheduledExecutorService scheduledRetryExecutor;//執行狀態private final ExecutorState<T, R> state;
    ...
}//執行狀態public class DefaultExecutorState<T, R> {//底層圖數據結構private final Dag<T, R> graph;//已完成private final Collection<Node<T, R>> processedNodes;//未完成private final Collection<Node<T, R>> unProcessedNodes;//錯誤taskprivate final Collection<ExecutionResult<T, R>> erroredTasks;//執行結果private final Collection<ExecutionResult<T, R>> executionResults;
}複製代碼

能夠看到咱們的線程包括執行線程池,2 種重試線程池。咱們使用 ExecutorState 來保存一些整個任務工做流執行過程當中的一些狀態記錄,包括已完成和未完成的 task,每一個 task 執行的結果等。同時它也依賴咱們底層的圖數據結構 DAG。

接下來咱們要作的事其實很簡單,就是 BFS 這整個 DAG 數據結構,而後提交到線程池中去執行就能夠了,過程當中注意一些節點狀態的保持,結果的保存便可。

仍是以上圖爲例,值得說的一點是在 Task D 這個點須要有一個併發等待的操做,即 Task D 須要依賴 Task B 和 Task C 執行結束後再往下執行。這裏有不少辦法,我選擇了共享變量的方式來完成併發等待。遍歷工做流中被遞歸的方法的僞代碼以下:

private void doProcessNodes(final Set<Node<T, R>> nodes) {		for (Node<T, R> node : nodes) {//共享變量 併發等待if (!processedNodes.contains(node) && processedNodes.containsAll(node.getParents())) {
            Task<T, R> task = newTask(node);this.executionEngine.submit(task);
            ...
            ExecutionResult<T, R> executionResult = this.executionEngine.proce***esult();if (executionResult.isSuccess()) {
			    state.markProcessingDone(processedNode);
		    }//繼續執行孩子節點doExecute(processedNode.getChildren());
            ...
        }
    }
}複製代碼

這樣咱們基本完成了這個任務編排框架的工做,如今咱們能夠以下來進行示例圖中的任務編排以及執行:

DefaultExecutor<String, String> executor = newTaskExecutor();
executor.addDependency("A", "B");
executor.addDependency("B", "D");
executor.addDependency("C", "D");
executor.execute();複製代碼

任務編排平臺化

好了如今咱們已經有一款任務編排框架了,但不少時候咱們想要可視化、平臺化,讓使用者更加無腦。

框架與平臺最大的區別在哪裏?是可拖拽的可視化輸入麼?我以爲這個的複雜度更多在前端。而對於後端平臺來說,與框架最大的區別是數據的持久化。

對於 DAG 的頂點來講,咱們須要將每一個節點 Task 的信息給持久化到關係數據庫中,包括 Task 的狀態、輸出結果等。而對於 DAG 的邊來講,咱們也得用數據庫來存儲各 Task 之間的方向關係。此外,在遍歷執行 DAG 的整個過程當中的中間狀態數據,咱們也得搬運到數據庫中。

首先咱們能夠設計一個 workflow 表,來表示一個工做流。接着咱們設計一個 task 表,來表示一個執行單元。task 表主要字段以下,這裏主要是 task_parents 的設計,它是一個 string,存儲 parents 的 taskId,多個由分隔符分隔。

task_id
workflow_id
task_name
task_status
result
task_parents複製代碼

依賴是上圖這個例子,對比框架來講,咱們首先得將其存儲到數據庫中去,最終可能獲得以下數據:

task_id  workflow_id  task_name  task_status  result  task_parents
  1          1           A           0                    -1
  2          1           B           0                    1
  3          1           C           0                    -1
  4          1           D           0                    2,3複製代碼

能夠看到,這樣也能很好地存儲 DAG 數據,和框架中代碼的輸入方式差異並非很大。

接下來咱們要作的是遍歷執行整個 workflow,這邊和框架的差異也不大。首先咱們能夠利用select * from task where workflow_id = 1 and task_parents = -1來獲取初始化節點 Task A 和 Task C,將其提交到咱們的線程池中。

接着對應框架代碼中的doExecute(processedNode.getChildren());,咱們使用select * from task where task_parents like %3%,就能夠獲得 Task C 的孩子節點 Task D,這裏使用了模糊查詢是由於咱們的 task_parents 多是由多個父親的 taskId 與分隔號組合而成的字符串。查詢到孩子節點後,繼續提交到線程池便可。

別忘了咱們在 Task D 這邊還有一個併發等待的操做,對應框架代碼中的if (!processedNodes.contains(node) && processedNodes.containsAll(node.getParents()))。這邊咱們只要判斷select count(1) from task where task_id in (2,3) and status != 1的個數爲 0 便可,即保證 parents task 所有成功。

另外值得注意的是 task 的重試。在框架中,失敗 task 的重試能夠是當即使用當前線程重試或者放到一個定時線程池中去重試。而在平臺上,咱們的重試基本上來自於用戶在界面上的點擊,即主線程。

至此,咱們已經將任務編排框架的功能基本平臺化了。做爲一個任務編排平臺,可拖拽編排的可視化輸入、整個工做流狀態的可視化展現、任務的可人工重試都是其優勢。

相關文章
相關標籤/搜索