10人將獲贈CNCF商店$100美圓禮券!python
你填了嗎?git
問卷連接(https://www.wjx.cn/jq/9714648...)github
做者:Alex Collins編程
Python 是用戶在 Kubernetes 上編寫機器學習工做流的流行編程語言。segmentfault
開箱即用時,Argo 並無爲 Python 提供一流的支持。相反,咱們提供Java、Golang 和 Python API 客戶端。api
但這對大多數用戶來講還不夠。許多用戶須要一個抽象層來添加組件和特定於用例的特性。服務器
今天你有兩個選擇。微信
Argo 工做流被用做執行 Kubeflow 流水線的引擎。你能夠定義一個 Kubeflow 流水線,並在 Python 中將其直接編譯到 Argo 工做流中。dom
而後你能夠使用Argo Python 客戶端向 Argo 服務器 API 提交工做流。機器學習
這種方法容許你利用現有的 Kubeflow 組件。
安裝:
pip3 install kfp pip3 install argo-workflows
例子:
import kfp as kfp def flip_coin(): return kfp.dsl.ContainerOp( name='Flip a coin', image='python:alpine3.6', command=['python', '-c', """ import random res = "heads" if random.randint(0, 1) == 0 else "tails" with open('/output', 'w') as f: f.write(res) """], file_outputs={'output': '/output'} ) def heads(): return kfp.dsl.ContainerOp(name='Heads', image="alpine:3.6", command=["sh", "-c", 'echo "it was heads"']) def tails(): return kfp.dsl.ContainerOp(name='Tails', image="alpine:3.6", command=["sh", "-c", 'echo "it was tails"']) @kfp.dsl.pipeline(name='Coin-flip', description='Flip a coin') def coin_flip_pipeline(): flip = flip_coin() with kfp.dsl.Condition(flip.output == 'heads'): heads() with kfp.dsl.Condition(flip.output == 'tails'): tails() def main(): kfp.compiler.Compiler().compile(coin_flip_pipeline, __file__ + ".yaml") if __name__ == '__main__': main()
運行這個來建立你的工做流:
apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: generateName: coin-flip- annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.3.0, pipelines.kubeflow.org/pipeline_compilation_time: '2021-01-21T17:17:54.299235', pipelines.kubeflow.org/pipeline_spec: '{"description": "Flip a coin", "name": "Coin-flip"}'} labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.3.0} spec: entrypoint: coin-flip templates: - name: coin-flip dag: tasks: - name: condition-1 template: condition-1 when: '"{{tasks.flip-a-coin.outputs.parameters.flip-a-coin-output}}" == "heads"' dependencies: [flip-a-coin] - name: condition-2 template: condition-2 when: '"{{tasks.flip-a-coin.outputs.parameters.flip-a-coin-output}}" == "tails"' dependencies: [flip-a-coin] - {name: flip-a-coin, template: flip-a-coin} - name: condition-1 dag: tasks: - {name: heads, template: heads} - name: condition-2 dag: tasks: - {name: tails, template: tails} - name: flip-a-coin container: command: - python - -c - "\nimport random\nres = \"heads\" if random.randint(0, 1) == 0 else \"tails\"\ \nwith open('/output', 'w') as f:\n f.write(res) \n " image: python:alpine3.6 outputs: parameters: - name: flip-a-coin-output valueFrom: {path: /output} artifacts: - {name: flip-a-coin-output, path: /output} - name: heads container: command: [sh, -c, echo "it was heads"] image: alpine:3.6 - name: tails container: command: [sh, -c, echo "it was tails"] image: alpine:3.6 arguments: parameters: [] serviceAccountName: pipeline-runner
注意,Kubeflow 不支持這種方法。
你能夠使用客戶端提交上述工做流程以下:
import yaml from argo.workflows.client import (ApiClient, WorkflowServiceApi, Configuration, V1alpha1WorkflowCreateRequest) def main(): config = Configuration(host="http://localhost:2746") client = ApiClient(configuration=config) service = WorkflowServiceApi(api_client=client) with open("coin-flip.py.yaml") as f: manifest: dict = yaml.safe_load(f) del manifest['spec']['serviceAccountName'] service.create_workflow('argo', V1alpha1WorkflowCreateRequest(workflow=manifest)) if __name__ == '__main__': main()
Couler是一個流行的項目,它容許你以一種平臺無感的方式指定工做流,但它主要支持 Argo 工做流(計劃在將來支持 Kubeflow 和 AirFlow):
安裝:
pip3 install git+https://github.com/couler-proj/couler
例子:
import couler.argo as couler from couler.argo_submitter import ArgoSubmitter def random_code(): import random res = "heads" if random.randint(0, 1) == 0 else "tails" print(res) def flip_coin(): return couler.run_script(image="python:alpine3.6", source=random_code) def heads(): return couler.run_container( image="alpine:3.6", command=["sh", "-c", 'echo "it was heads"'] ) def tails(): return couler.run_container( image="alpine:3.6", command=["sh", "-c", 'echo "it was tails"'] ) result = flip_coin() couler.when(couler.equal(result, "heads"), lambda: heads()) couler.when(couler.equal(result, "tails"), lambda: tails()) submitter = ArgoSubmitter() couler.run(submitter=submitter)
這會建立如下工做流程:
apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: generateName: couler-example- spec: templates: - name: couler-example steps: - - name: flip-coin-29 template: flip-coin - - name: heads-31 template: heads when: '{{steps.flip-coin-29.outputs.result}} == heads' - name: tails-32 template: tails when: '{{steps.flip-coin-29.outputs.result}} == tails' - name: flip-coin script: name: '' image: 'python:alpine3.6' command: - python source: | import random res = "heads" if random.randint(0, 1) == 0 else "tails" print(res) - name: heads container: image: 'alpine:3.6' command: - sh - '-c' - echo "it was heads" - name: tails container: image: 'alpine:3.6' command: - sh - '-c' - echo "it was tails" entrypoint: couler-example ttlStrategy: secondsAfterCompletion: 600 activeDeadlineSeconds: 300
CNCF (Cloud Native Computing Foundation)成立於2015年12月,隸屬於Linux Foundation,是非營利性組織。
CNCF(雲原生計算基金會)致力於培育和維護一個廠商中立的開源生態系統,來推廣雲原生技術。咱們經過將最前沿的模式民主化,讓這些創新爲大衆所用。掃描二維碼關注CNCF微信公衆號。