Apache DolphinScheduler 是一個分佈式去中心化,易擴展的可視化 DAG 工做流任務調度系統。致力於解決數據處理流程中錯綜複雜的依賴關係,使調度系統在數據處理流程中開箱即用。java
近日,伯毅同窗給社區貢獻了工做流核心表結構的剖析文章,很是細緻,喜歡的夥伴請轉走python
在 dolphinscheduler 庫中建立的全部工做流定義(模板)都保存在 t_ds_process_definition 表中.mysql
該數據庫表結構以下表所示:sql
序號 | 字段 | 類型 | 描述 |
---|---|---|---|
1 | id | int(11) | 主鍵 |
2 | name | varchar(255) | 流程定義名稱 |
3 | version | int(11) | 流程定義版本 |
4 | release_state | tinyint(4) | 流程定義的發佈狀態:0 未上線 , 1已上線 |
5 | project_id | int(11) | 項目id |
6 | user_id | int(11) | 流程定義所屬用戶id |
7 | process_definition_json | longtext | 流程定義JSON |
8 | description | text | 流程定義描述 |
9 | global_params | text | 全局參數 |
10 | flag | tinyint(4) | 流程是否可用:0 不可用,1 可用 |
11 | locations | text | 節點座標信息 |
12 | connects | text | 節點連線信息 |
13 | receivers | text | 收件人 |
14 | receivers_cc | text | 抄送人 |
15 | create_time | datetime | 建立時間 |
16 | timeout | int(11) | 超時時間 |
17 | tenant_id | int(11) | 租戶id |
18 | update_time | datetime | 更新時間 |
19 | modify_by | varchar(36) | 修改用戶 |
20 | resource_ids | varchar(255) | 資源ids |
其中 process_definition_json 字段爲核心字段, 定義了 DAG 圖中的任務信息.該數據以JSON 的方式進行存儲.shell
公共的數據結構以下表:數據庫
序號 | 字段 | 類型 | 描述 |
---|---|---|---|
1 | globalParams | Array | 全局參數 |
2 | tasks | Array | 流程中的任務集合 [ 各個類型的結構請參考以下章節] |
3 | tenantId | int | 租戶id |
4 | timeout | int | 超時時間 |
數據示例:apache
{ "globalParams":[ { "prop":"golbal_bizdate", "direct":"IN", "type":"VARCHAR", "value":"${system.biz.date}" } ], "tasks":Array[1], "tenantId":0, "timeout":0 }
** Shell 節點數據結構以下:**json
序號 | 參數名 | 類型 | 描述 | 描述 | |
---|---|---|---|---|---|
1 | id | String | 任務編碼 | ||
2 | type | String | 類型 | SHELL | |
3 | name | String | 名稱 | ||
4 | params | Object | 自定義參數 | Json 格式 | |
5 | rawScript | String | Shell腳本 | ||
6 | localParams | Array | 自定義參數 | ||
7 | resourceList | Array | 資源文件 | ||
8 | description | String | 描述 | ||
9 | runFlag | String | 運行標識 | ||
10 | conditionResult | Object | 條件分支 | ||
11 | successNode | Array | 成功跳轉節點 | ||
12 | failedNode | Array | 失敗跳轉節點 | ||
13 | dependence | Object | 任務依賴 | 與params互斥 | |
14 | maxRetryTimes | String | 最大重試次數 | ||
15 | retryInterval | String | 重試間隔 | ||
16 | timeout | Object | 超時控制 | ||
17 | taskInstancePriority | String | 任務優先級 | ||
18 | workerGroup | String | Worker 分組 | ||
19 | preTasks | Array | 前置任務 |
Shell 節點數據樣例:bash
{ "type":"SHELL", "id":"tasks-80760", "name":"Shell Task", "params":{ "resourceList":[ { "id":3, "name":"run.sh", "res":"run.sh" } ], "localParams":[ ], "rawScript":"echo "This is a shell script"" }, "description":"", "runFlag":"NORMAL", "conditionResult":{ "successNode":[ "" ], "failedNode":[ "" ] }, "dependence":{ }, "maxRetryTimes":"0", "retryInterval":"1", "timeout":{ "strategy":"", "interval":null, "enable":false }, "taskInstancePriority":"MEDIUM", "workerGroup":"default", "preTasks":[ ] }
經過 SQL 對指定的數據源進行數據查詢、更新操做.微信
** SQL 節點數據結構以下:**
序號 | 參數名 | 類型 | 描述 | 描述 | |
---|---|---|---|---|---|
1 | id | String | 任務編碼 | ||
2 | type | String | 類型 | SQL | |
3 | name | String | 名稱 | ||
4 | params | Object | 自定義參數 | Json 格式 | |
5 | type | String | 數據庫類型 | ||
6 | datasource | Int | 數據源id | ||
7 | sql | String | 查詢SQL語句 | ||
8 | udfs | String | udf函數 | UDF函數id,以逗號分隔. | |
9 | sqlType | String | SQL節點類型 | 0 查詢 , 1 非查詢 | |
10 | title | String | 郵件標題 | ||
11 | receivers | String | 收件人 | ||
12 | receiversCc | String | 抄送人 | ||
13 | showType | String | 郵件顯示類型 | TABLE 表格 , ATTACHMENT附件 | |
14 | connParams | String | 鏈接參數 | ||
15 | preStatements | Array | 前置SQL | ||
16 | postStatements | Array | 後置SQL | ||
17 | localParams | Array | 自定義參數 | ||
18 | description | String | 描述 | ||
19 | runFlag | String | 運行標識 | ||
20 | conditionResult | Object | 條件分支 | ||
21 | successNode | Array | 成功跳轉節點 | ||
22 | failedNode | Array | 失敗跳轉節點 | ||
23 | dependence | Object | 任務依賴 | 與params互斥 | |
24 | maxRetryTimes | String | 最大重試次數 | ||
25 | retryInterval | String | 重試間隔 | ||
26 | timeout | Object | 超時控制 | ||
27 | taskInstancePriority | String | 任務優先級 | ||
28 | workerGroup | String | Worker 分組 | ||
29 | preTasks | Array | 前置任務 |
** SQL 節點數據樣例:**
{ "type":"SQL", "id":"tasks-95648", "name":"SqlTask-Query", "params":{ "type":"MYSQL", "datasource":1, "sql":"select id , namge , age from emp where id = ${id}", "udfs":"", "sqlType":"0", "title":"xxxx@xxx.com", "receivers":"xxxx@xxx.com", "receiversCc":"", "showType":"TABLE", "localParams":[ { "prop":"id", "direct":"IN", "type":"INTEGER", "value":"1" } ], "connParams":"", "preStatements":[ "insert into emp ( id,name ) value (1,'Li' )" ], "postStatements":[ ] }, "description":"", "runFlag":"NORMAL", "conditionResult":{ "successNode":[ "" ], "failedNode":[ "" ] }, "dependence":{ }, "maxRetryTimes":"0", "retryInterval":"1", "timeout":{ "strategy":"", "interval":null, "enable":false }, "taskInstancePriority":"MEDIUM", "workerGroup":"default", "preTasks":[ ] }
** Spark 節點數據結構以下:**
序號 | 參數名 | 類型 | 描述 | 描述 | |
---|---|---|---|---|---|
1 | id | String | 任務編碼 | ||
2 | type | String | 類型 | SPARK | |
3 | name | String | 名稱 | ||
4 | params | Object | 自定義參數 | Json 格式 | |
5 | mainClass | String | 運行主類 | ||
6 | mainArgs | String | 運行參數 | ||
7 | others | String | 其餘參數 | ||
8 | mainJar | Object | 程序 jar 包 | ||
9 | deployMode | String | 部署模式 | local,client,cluster | |
10 | driverCores | String | driver核數 | ||
11 | driverMemory | String | driver 內存數 | ||
12 | numExecutors | String | executor數量 | ||
13 | executorMemory | String | executor內存 | ||
14 | executorCores | String | executor核數 | ||
15 | programType | String | 程序類型 | JAVA,SCALA,PYTHON | |
16 | sparkVersion | String | Spark 版本 | SPARK1 , SPARK2 | |
17 | localParams | Array | 自定義參數 | ||
18 | resourceList | Array | 資源文件 | ||
19 | description | String | 描述 | ||
20 | runFlag | String | 運行標識 | ||
21 | conditionResult | Object | 條件分支 | ||
22 | successNode | Array | 成功跳轉節點 | ||
23 | failedNode | Array | 失敗跳轉節點 | ||
24 | dependence | Object | 任務依賴 | 與params互斥 | |
25 | maxRetryTimes | String | 最大重試次數 | ||
26 | retryInterval | String | 重試間隔 | ||
27 | timeout | Object | 超時控制 | ||
28 | taskInstancePriority | String | 任務優先級 | ||
29 | workerGroup | String | Worker 分組 | ||
30 | preTasks | Array | 前置任務 |
** Spark 節點數據樣例:**
{ "type":"SPARK", "id":"tasks-87430", "name":"SparkTask", "params":{ "mainClass":"org.apache.spark.examples.SparkPi", "mainJar":{ "id":4 }, "deployMode":"cluster", "resourceList":[ { "id":3, "name":"run.sh", "res":"run.sh" } ], "localParams":[ ], "driverCores":1, "driverMemory":"512M", "numExecutors":2, "executorMemory":"2G", "executorCores":2, "mainArgs":"10", "others":"", "programType":"SCALA", "sparkVersion":"SPARK2" }, "description":"", "runFlag":"NORMAL", "conditionResult":{ "successNode":[ "" ], "failedNode":[ "" ] }, "dependence":{ }, "maxRetryTimes":"0", "retryInterval":"1", "timeout":{ "strategy":"", "interval":null, "enable":false }, "taskInstancePriority":"MEDIUM", "workerGroup":"default", "preTasks":[ ] }
** MapReduce(MR) 節點數據結構以下:**
序號 | 參數名 | 類型 | 描述 | 描述 | |
---|---|---|---|---|---|
1 | id | String | 任務編碼 | ||
2 | type | String | 類型 | MR | |
3 | name | String | 名稱 | ||
4 | params | Object | 自定義參數 | Json 格式 | |
5 | mainClass | String | 運行主類 | ||
6 | mainArgs | String | 運行參數 | ||
7 | others | String | 其餘參數 | ||
8 | mainJar | Object | 程序 jar 包 | ||
9 | programType | String | 程序類型 | JAVA,PYTHON | |
10 | localParams | Array | 自定義參數 | ||
11 | resourceList | Array | 資源文件 | ||
12 | description | String | 描述 | ||
13 | runFlag | String | 運行標識 | ||
14 | conditionResult | Object | 條件分支 | ||
15 | successNode | Array | 成功跳轉節點 | ||
16 | failedNode | Array | 失敗跳轉節點 | ||
17 | dependence | Object | 任務依賴 | 與params互斥 | |
18 | maxRetryTimes | String | 最大重試次數 | ||
19 | retryInterval | String | 重試間隔 | ||
20 | timeout | Object | 超時控制 | ||
21 | taskInstancePriority | String | 任務優先級 | ||
22 | workerGroup | String | Worker 分組 | ||
23 | preTasks | Array | 前置任務 |
** MapReduce(MR) 節點數據樣例:**
{ "type":"MR", "id":"tasks-28997", "name":"MRTask", "params":{ "mainClass":"wordcount", "mainJar":{ "id":5 }, "resourceList":[ { "id":3, "name":"run.sh", "res":"run.sh" } ], "localParams":[ ], "mainArgs":"/tmp/wordcount/input /tmp/wordcount/output/", "others":"", "programType":"JAVA" }, "description":"", "runFlag":"NORMAL", "conditionResult":{ "successNode":[ "" ], "failedNode":[ "" ] }, "dependence":{ }, "maxRetryTimes":"0", "retryInterval":"1", "timeout":{ "strategy":"", "interval":null, "enable":false }, "taskInstancePriority":"MEDIUM", "workerGroup":"default", "preTasks":[ ] }
** Python 節點數據結構以下:**
序號 | 參數名 | 類型 | 描述 | 描述 | |
---|---|---|---|---|---|
1 | id | String | 任務編碼 | ||
2 | type | String | 類型 | PYTHON | |
3 | name | String | 名稱 | ||
4 | params | Object | 自定義參數 | Json 格式 | |
5 | rawScript | String | Python腳本 | ||
6 | localParams | Array | 自定義參數 | ||
7 | resourceList | Array | 資源文件 | ||
8 | description | String | 描述 | ||
9 | runFlag | String | 運行標識 | ||
10 | conditionResult | Object | 條件分支 | ||
11 | successNode | Array | 成功跳轉節點 | ||
12 | failedNode | Array | 失敗跳轉節點 | ||
13 | dependence | Object | 任務依賴 | 與params互斥 | |
14 | maxRetryTimes | String | 最大重試次數 | ||
15 | retryInterval | String | 重試間隔 | ||
16 | timeout | Object | 超時控制 | ||
17 | taskInstancePriority | String | 任務優先級 | ||
18 | workerGroup | String | Worker 分組 | ||
19 | preTasks | Array | 前置任務 |
Python節點數據樣例:
{ "type":"PYTHON", "id":"tasks-5463", "name":"Python Task", "params":{ "resourceList":[ { "id":3, "name":"run.sh", "res":"run.sh" } ], "localParams":[ ], "rawScript":"print("This is a python script")" }, "description":"", "runFlag":"NORMAL", "conditionResult":{ "successNode":[ "" ], "failedNode":[ "" ] }, "dependence":{ }, "maxRetryTimes":"0", "retryInterval":"1", "timeout":{ "strategy":"", "interval":null, "enable":false }, "taskInstancePriority":"MEDIUM", "workerGroup":"default", "preTasks":[ ] }
Flink 節點數據結構以下:
序號 | 參數名 | 類型 | 描述 | 描述 | |
---|---|---|---|---|---|
1 | id | String | 任務編碼 | ||
2 | type | String | 類型 | FLINK | |
3 | name | String | 名稱 | ||
4 | params | Object | 自定義參數 | Json 格式 | |
5 | mainClass | String | 運行主類 | ||
6 | mainArgs | String | 運行參數 | ||
7 | others | String | 其餘參數 | ||
8 | mainJar | Object | 程序 jar 包 | ||
9 | deployMode | String | 部署模式 | local,client,cluster | |
10 | slot | String | slot數量 | ||
11 | taskManager | String | taskManage數量 | ||
12 | taskManagerMemory | String | taskManager內存數 | ||
13 | jobManagerMemory | String | jobManager內存數 | ||
14 | programType | String | 程序類型 | JAVA,SCALA,PYTHON | |
15 | localParams | Array | 自定義參數 | ||
16 | resourceList | Array | 資源文件 | ||
17 | description | String | 描述 | ||
18 | runFlag | String | 運行標識 | ||
19 | conditionResult | Object | 條件分支 | ||
20 | successNode | Array | 成功跳轉節點 | ||
21 | failedNode | Array | 失敗跳轉節點 | ||
22 | dependence | Object | 任務依賴 | 與params互斥 | |
23 | maxRetryTimes | String | 最大重試次數 | ||
24 | retryInterval | String | 重試間隔 | ||
25 | timeout | Object | 超時控制 | ||
26 | taskInstancePriority | String | 任務優先級 | ||
27 | workerGroup | String | Worker 分組 | ||
38 | preTasks | Array | 前置任務 |
** Flink 節點數據樣例:**
{ "type":"FLINK", "id":"tasks-17135", "name":"FlinkTask", "params":{ "mainClass":"com.flink.demo", "mainJar":{ "id":6 }, "deployMode":"cluster", "resourceList":[ { "id":3, "name":"run.sh", "res":"run.sh" } ], "localParams":[ ], "slot":1, "taskManager":"2", "jobManagerMemory":"1G", "taskManagerMemory":"2G", "executorCores":2, "mainArgs":"100", "others":"", "programType":"SCALA" }, "description":"", "runFlag":"NORMAL", "conditionResult":{ "successNode":[ "" ], "failedNode":[ "" ] }, "dependence":{ }, "maxRetryTimes":"0", "retryInterval":"1", "timeout":{ "strategy":"", "interval":null, "enable":false }, "taskInstancePriority":"MEDIUM", "workerGroup":"default", "preTasks":[ ] }
Http 節點數據結構以下:
序號 | 參數名 | 類型 | 描述 | 描述 | |
---|---|---|---|---|---|
1 | id | String | 任務編碼 | ||
2 | type | String | 類型 | HTTP | |
3 | name | String | 名稱 | ||
4 | params | Object | 自定義參數 | Json 格式 | |
5 | url | String | 請求地址 | ||
6 | httpMethod | String | 請求方式 | GET,POST,HEAD,PUT,DELETE | |
7 | httpParams | Array | 請求參數 | ||
8 | httpCheckCondition | String | 校驗條件 | 默認響應碼200 | |
9 | condition | String | 校驗內容 | ||
10 | localParams | Array | 自定義參數 | ||
11 | description | String | 描述 | ||
12 | runFlag | String | 運行標識 | ||
13 | conditionResult | Object | 條件分支 | ||
14 | successNode | Array | 成功跳轉節點 | ||
15 | failedNode | Array | 失敗跳轉節點 | ||
16 | dependence | Object | 任務依賴 | 與params互斥 | |
17 | maxRetryTimes | String | 最大重試次數 | ||
18 | retryInterval | String | 重試間隔 | ||
19 | timeout | Object | 超時控制 | ||
20 | taskInstancePriority | String | 任務優先級 | ||
21 | workerGroup | String | Worker 分組 | ||
22 | preTasks | Array | 前置任務 |
** Http 節點數據樣例:**
{ "type":"HTTP", "id":"tasks-60499", "name":"HttpTask", "params":{ "localParams":[ ], "httpParams":[ { "prop":"id", "httpParametersType":"PARAMETER", "value":"1" }, { "prop":"name", "httpParametersType":"PARAMETER", "value":"Bo" } ], "url":"https://www.xxxxx.com:9012", "httpMethod":"POST", "httpCheckCondition":"STATUS_CODE_DEFAULT", "condition":"" }, "description":"", "runFlag":"NORMAL", "conditionResult":{ "successNode":[ "" ], "failedNode":[ "" ] }, "dependence":{ }, "maxRetryTimes":"0", "retryInterval":"1", "timeout":{ "strategy":"", "interval":null, "enable":false }, "taskInstancePriority":"MEDIUM", "workerGroup":"default", "preTasks":[ ] }
** DataX 節點數據結構以下:**
序號 | 參數名 | 類型 | 描述 | 描述 | |
---|---|---|---|---|---|
1 | id | String | 任務編碼 | ||
2 | type | String | 類型 | DATAX | |
3 | name | String | 名稱 | ||
4 | params | Object | 自定義參數 | Json 格式 | |
5 | customConfig | Int | 自定義類型 | 0定製 , 1自定義 | |
6 | dsType | String | 源數據庫類型 | ||
7 | dataSource | Int | 源數據庫ID | ||
8 | dtType | String | 目標數據庫類型 | ||
9 | dataTarget | Int | 目標數據庫ID | ||
10 | sql | String | SQL語句 | ||
11 | targetTable | String | 目標表 | ||
12 | jobSpeedByte | Int | 限流(字節數) | ||
13 | jobSpeedRecord | Int | 限流(記錄數) | ||
14 | preStatements | Array | 前置SQL | ||
15 | postStatements | Array | 後置SQL | ||
16 | json | String | 自定義配置 | customConfig=1時生效 | |
17 | localParams | Array | 自定義參數 | customConfig=1時生效 | |
18 | description | String | 描述 | ||
19 | runFlag | String | 運行標識 | ||
20 | conditionResult | Object | 條件分支 | ||
21 | successNode | Array | 成功跳轉節點 | ||
22 | failedNode | Array | 失敗跳轉節點 | ||
23 | dependence | Object | 任務依賴 | 與params互斥 | |
24 | maxRetryTimes | String | 最大重試次數 | ||
25 | retryInterval | String | 重試間隔 | ||
26 | timeout | Object | 超時控制 | ||
27 | taskInstancePriority | String | 任務優先級 | ||
28 | workerGroup | String | Worker 分組 | ||
29 | preTasks | Array | 前置任務 |
DataX 節點數據樣例:
{ "type":"DATAX", "id":"tasks-91196", "name":"DataxTask-DB", "params":{ "customConfig":0, "dsType":"MYSQL", "dataSource":1, "dtType":"MYSQL", "dataTarget":1, "sql":"select id, name ,age from user ", "targetTable":"emp", "jobSpeedByte":524288, "jobSpeedRecord":500, "preStatements":[ "truncate table emp " ], "postStatements":[ "truncate table user" ] }, "description":"", "runFlag":"NORMAL", "conditionResult":{ "successNode":[ "" ], "failedNode":[ "" ] }, "dependence":{ }, "maxRetryTimes":"0", "retryInterval":"1", "timeout":{ "strategy":"", "interval":null, "enable":false }, "taskInstancePriority":"MEDIUM", "workerGroup":"default", "preTasks":[ ] }
Sqoop 節點數據結構以下:
序號 | 參數名 | 類型 | 描述 | 描述 | |
---|---|---|---|---|---|
1 | id | String | 任務編碼 | ||
2 | type | String | 類型 | SQOOP | |
3 | name | String | 名稱 | ||
4 | params | Object | 自定義參數 | JSON 格式 | |
5 | concurrency | Int | 併發度 | ||
6 | modelType | String | 流向 | import,export | |
7 | sourceType | String | 數據源類型 | ||
8 | sourceParams | String | 數據源參數 | JSON格式 | |
9 | targetType | String | 目標數據類型 | ||
10 | targetParams | String | 目標數據參數 | JSON格式 | |
11 | localParams | Array | 自定義參數 | ||
12 | description | String | 描述 | ||
13 | runFlag | String | 運行標識 | ||
14 | conditionResult | Object | 條件分支 | ||
15 | successNode | Array | 成功跳轉節點 | ||
16 | failedNode | Array | 失敗跳轉節點 | ||
17 | dependence | Object | 任務依賴 | 與params互斥 | |
18 | maxRetryTimes | String | 最大重試次數 | ||
19 | retryInterval | String | 重試間隔 | ||
20 | timeout | Object | 超時控制 | ||
21 | taskInstancePriority | String | 任務優先級 | ||
22 | workerGroup | String | Worker 分組 | ||
23 | preTasks | Array | 前置任務 |
Sqoop 節點數據樣例:
{ "type":"SQOOP", "id":"tasks-82041", "name":"Sqoop Task", "params":{ "concurrency":1, "modelType":"import", "sourceType":"MYSQL", "targetType":"HDFS", "sourceParams":"{"srcType":"MYSQL","srcDatasource":1,"srcTable":"","srcQueryType":"1","srcQuerySql":"selec id , name from user","srcColumnType":"0","srcColumns":"","srcConditionList":[],"mapColumnHive":[{"prop":"hivetype-key","direct":"IN","type":"VARCHAR","value":"hivetype-value"}],"mapColumnJava":[{"prop":"javatype-key","direct":"IN","type":"VARCHAR","value":"javatype-value"}]}", "targetParams":"{"targetPath":"/user/hive/warehouse/ods.db/user","deleteTargetDir":false,"fileType":"--as-avrodatafile","compressionCodec":"snappy","fieldsTerminated":",","linesTerminated":"@"}", "localParams":[ ] }, "description":"", "runFlag":"NORMAL", "conditionResult":{ "successNode":[ "" ], "failedNode":[ "" ] }, "dependence":{ }, "maxRetryTimes":"0", "retryInterval":"1", "timeout":{ "strategy":"", "interval":null, "enable":false }, "taskInstancePriority":"MEDIUM", "workerGroup":"default", "preTasks":[ ] }
條件分支節點數據結構以下:
序號 | 參數名 | 類型 | 描述 | 描述 | |
---|---|---|---|---|---|
1 | id | String | 任務編碼 | ||
2 | type | String | 類型 | SHELL | |
3 | name | String | 名稱 | ||
4 | params | Object | 自定義參數 | null | |
5 | description | String | 描述 | ||
6 | runFlag | String | 運行標識 | ||
7 | conditionResult | Object | 條件分支 | ||
8 | successNode | Array | 成功跳轉節點 | ||
9 | failedNode | Array | 失敗跳轉節點 | ||
10 | dependence | Object | 任務依賴 | 與params互斥 | |
11 | maxRetryTimes | String | 最大重試次數 | ||
12 | retryInterval | String | 重試間隔 | ||
13 | timeout | Object | 超時控制 | ||
14 | taskInstancePriority | String | 任務優先級 | ||
15 | workerGroup | String | Worker 分組 | ||
16 | preTasks | Array | 前置任務 |
條件分支節點數據樣例:
{ "type":"CONDITIONS", "id":"tasks-96189", "name":"條件", "params":{ }, "description":"", "runFlag":"NORMAL", "conditionResult":{ "successNode":[ "test04" ], "failedNode":[ "test05" ] }, "dependence":{ "relation":"AND", "dependTaskList":[ ] }, "maxRetryTimes":"0", "retryInterval":"1", "timeout":{ "strategy":"", "interval":null, "enable":false }, "taskInstancePriority":"MEDIUM", "workerGroup":"default", "preTasks":[ "test01", "test02" ] }
子流程節點數據結構以下:
序號 | 參數名 | 類型 | 描述 | 描述 | |
---|---|---|---|---|---|
1 | id | String | 任務編碼 | ||
2 | type | String | 類型 | SHELL | |
3 | name | String | 名稱 | ||
4 | params | Object | 自定義參數 | Json 格式 | |
5 | processDefinitionId | Int | 流程定義id | ||
6 | description | String | 描述 | ||
7 | runFlag | String | 運行標識 | ||
8 | conditionResult | Object | 條件分支 | ||
9 | successNode | Array | 成功跳轉節點 | ||
10 | failedNode | Array | 失敗跳轉節點 | ||
11 | dependence | Object | 任務依賴 | 與params互斥 | |
12 | maxRetryTimes | String | 最大重試次數 | ||
13 | retryInterval | String | 重試間隔 | ||
14 | timeout | Object | 超時控制 | ||
15 | taskInstancePriority | String | 任務優先級 | ||
16 | workerGroup | String | Worker 分組 | ||
17 | preTasks | Array | 前置任務 |
子流程節點數據樣例:
{ "type":"SUB_PROCESS", "id":"tasks-14806", "name":"SubProcessTask", "params":{ "processDefinitionId":2 }, "description":"", "runFlag":"NORMAL", "conditionResult":{ "successNode":[ "" ], "failedNode":[ "" ] }, "dependence":{ }, "timeout":{ "strategy":"", "interval":null, "enable":false }, "taskInstancePriority":"MEDIUM", "workerGroup":"default", "preTasks":[ ] }
依賴(DEPENDENT)節點數據結構以下:
序號 | 參數名 | 類型 | 描述 | 描述 | |
---|---|---|---|---|---|
1 | id | String | 任務編碼 | ||
2 | type | String | 類型 | DEPENDENT | |
3 | name | String | 名稱 | ||
4 | params | Object | 自定義參數 | Json 格式 | |
5 | rawScript | String | Shell腳本 | ||
6 | localParams | Array | 自定義參數 | ||
7 | resourceList | Array | 資源文件 | ||
8 | description | String | 描述 | ||
9 | runFlag | String | 運行標識 | ||
10 | conditionResult | Object | 條件分支 | ||
11 | successNode | Array | 成功跳轉節點 | ||
12 | failedNode | Array | 失敗跳轉節點 | ||
13 | dependence | Object | 任務依賴 | 與params互斥 | |
14 | relation | String | 關係 | AND,OR | |
15 | dependTaskList | Array | 依賴任務清單 | ||
16 | maxRetryTimes | String | 最大重試次數 | ||
17 | retryInterval | String | 重試間隔 | ||
18 | timeout | Object | 超時控制 | ||
19 | taskInstancePriority | String | 任務優先級 | ||
20 | workerGroup | String | Worker 分組 | ||
21 | preTasks | Array | 前置任務 |
依賴(DEPENDENT)節點數據樣例:
{ "type":"DEPENDENT", "id":"tasks-57057", "name":"DenpendentTask", "params":{ }, "description":"", "runFlag":"NORMAL", "conditionResult":{ "successNode":[ "" ], "failedNode":[ "" ] }, "dependence":{ "relation":"AND", "dependTaskList":[ { "relation":"AND", "dependItemList":[ { "projectId":1, "definitionId":7, "definitionList":[ { "value":8, "label":"MRTask" }, { "value":7, "label":"FlinkTask" }, { "value":6, "label":"SparkTask" }, { "value":5, "label":"SqlTask-Update" }, { "value":4, "label":"SqlTask-Query" }, { "value":3, "label":"SubProcessTask" }, { "value":2, "label":"Python Task" }, { "value":1, "label":"Shell Task" } ], "depTasks":"ALL", "cycle":"day", "dateValue":"today" } ] }, { "relation":"AND", "dependItemList":[ { "projectId":1, "definitionId":5, "definitionList":[ { "value":8, "label":"MRTask" }, { "value":7, "label":"FlinkTask" }, { "value":6, "label":"SparkTask" }, { "value":5, "label":"SqlTask-Update" }, { "value":4, "label":"SqlTask-Query" }, { "value":3, "label":"SubProcessTask" }, { "value":2, "label":"Python Task" }, { "value":1, "label":"Shell Task" } ], "depTasks":"SqlTask-Update", "cycle":"day", "dateValue":"today" } ] } ] }, "maxRetryTimes":"0", "retryInterval":"1", "timeout":{ "strategy":"", "interval":null, "enable":false }, "taskInstancePriority":"MEDIUM", "workerGroup":"default", "preTasks":[ ] }
Apache DolphinScheduler 是一個很是多樣化的社區,至今貢獻者已近100名, 他們分別來自 30 多家不一樣的公司。 微信羣用戶3000人。
已經有300多家企業和科研機構在使用DolphinScheduler,來處理各種調度和定時任務,另有500多家公司開通了海豚調度的試用:
以DAG圖的方式將Task按照任務的依賴關係關聯起來,可實時可視化監控任務的運行狀態
支持豐富的任務類型:Shell、MR、Spark、Flink、SQL(mysql、postgresql、hive、sparksql)、Python、Http、Sub_Process、Procedure等
支持工做流定時調度、依賴調度、手動調度、手動暫停/中止/恢復,同時支持失敗重試/告警、從指定節點恢復失敗、Kill任務等操做
支持工做流優先級、任務優先級及任務的故障轉移及任務超時告警/失敗
支持工做流全局參數及節點自定義參數設置
支持資源文件的在線上傳/下載,管理等,支持在線文件建立、編輯
支持任務日誌在線查看及滾動、在線下載日誌等
實現集羣HA,經過Zookeeper實現Master集羣和Worker集羣去中心化
支持對Master/Worker cpu load,memory,cpu在線查看
支持工做流運行歷史樹形/甘特圖展現、支持任務狀態統計、流程狀態統計
支持補數
支持多租戶
支持國際化
* Worker實現重構,提高Worker性能 * Master和Worker引入Netty通訊 * 去zookeeper任務隊列 * Worker節點的三種選擇:隨機、循環和CPU和內存的線性加權負載平衡 * Worker去數據庫操做 * 資源中心支持多目錄 * 添加 if/else 條件任務 * 添加 sqoop/datax 任務 * 支持 k8s 部署 * 添加DAG流程圖一鍵格式化 * 流程圖美化 * 支持 ambari 插件安裝 * 批量導出和導入工做流 * 流程定義支持複製 * 大幅簡化配置項,簡化部署
在使用 DolphinScheduler 的過程當中,若是您有任何問題或者想法、建議,均可以經過Apache 郵件列表參與到 DolphinScheduler 的社區建設中來。
歡迎加入貢獻的隊伍,加入開源社區從提交第一個 PR開始,
本文由博客羣發一文多發等運營工具平臺 OpenWrite 發佈