在本文中,我將演示如何構建一個彈性Airflow集羣,當負載低於閾值時,該集羣能夠在高負載下安全地橫向擴展。git
經過水平Pod自動縮放器支持Kubernetes中的自動縮放。使用HPA,能夠直接進行橫向擴展,HPA能夠爲部署增長副本,並能夠建立其餘worker來共享工做負載。可是,伸縮是問題所在,伸縮過程會根據Pod在節點上的位置對它們進行排序,從而選擇要終止的Pod。所以,若是有一個Pod仍在進行某些處理,則沒法保證它不會被終止。github
在彈性Airflow 集羣中,爲了擴大規模,咱們須要確保進行某些處理的worker不會被終止。只有閒置的worker才應考慮終止。redis
爲了實現這一點,我建立了兩個CRD和兩個控制器 - ElasticWorker
和ElasticWorkerAutoscaler
,在本文後面將對它們進行介紹。數據庫
對於此問題,還有其餘解決方案,例如,能夠建立一個Kubernetes做業,該做業能夠完成一組任務。隨着負載的增長,將建立更多的做業。可是,這種方法不是通用解決方案,它很是適合具備相似自動縮放要求的其餘用例。此處描述的方法是一種通用實現,能夠用做全面生產部署的起點。api
組件說明以下,安全
ElasticWorker Controller 監視類型爲ElasticWorker(CRD)
的對象,並將集羣狀態與相應的Elasticworker
對象中的規範協調。在較高級別,如下是該控制者的職責。bash
minReplica
的worker Podscale> 0
,則建立其餘工做容器。可是要確保總的Pod數量不超過maxReplicas
scale <0
時,刪除worker Pod。Pod的刪除由定義的「按比例縮放」策略控制。當前有三個策略-ScaleInImmediately,ScaleInBySelector,ScaleInDisabled,咱們將在此處使用ScaleInBySelector策略。這樣能夠確保控制器僅刪除已定義標籤集的Pod。它還能夠確保不管是否設置標籤,Pod數量都不會低於minReplicas
。
ElasticWorkerAutoscalerController ElasticWorkerAutoscaler控制器監視類型爲ElasticWorkerAutoscaler(CRD)的對象。下面是該控制器的職責,服務器
total_cluster_load
。total_cluster_load> targetValue
,則向外擴展。計算新的工做單元數,以減小目標值的負擔。計算方法與HPA相同。設置引用的ElasticWorker對象的scale
屬性。total_cluster_load <0.70 * targetValue
,則縮容。若是負載低於閾值,則不會當即開始Scale-In,可是scaleInBackOff
週期開始計時。默認狀況下,它設置爲30秒,若是僅在此期間完成,則執行縮容。若是平均時間total_cluster_load
增長,則ScaleInBackOff
週期無效。週期結束後,控制器將選擇那些具備metricload = 0
的worker Pod。而後,它使用請求中的這些pod調用shutdownHttpHook
。hook是對此實現定製的,但能夠推廣。接下來,控制器用終止標籤標記容器,最後用適當的值更新比例,以使ElasticWorker控制器更改集羣狀態。load = 0
,ShutdownHttpHook和TerminationLabelit確保僅終止那些不作任何事情的airflow worker。 HttpShutdown hook很重要,由於它能夠確保標記爲終止的airflow worker在ElasticWorker控制器終止它時不會從RabbitMQ接任任何任務。
ElasticWorker和ElasticWorkerAutoscaler控制器代碼位於-elastic-worker-autoscaler。 架構
自定義指標APIServer適配器代碼位於-elastic-worker-custommetrics-adapter。 併發
請遵循此處的安裝說明-elastic-airflow-cluster-k8s-setup-manifests。
此外,咱們可使用如下命令啓動Kubernetes儀表板並進行驗證,
minikube dashboard
咱們在minikube上的設置以下:
兩個命名空間
命名空間:elasticworker-custommetrics包含與自定義指標相關的pod。
命名空間:elastic-worker-autoscaler-system包含用於ElasticWorker和ElasticWorkerAutoscaler的控制器容器
其他組件在默認名稱空間中建立。
咱們可使用如下命令檢索ElasticWorker對象,kubectl get elasticworkers
咱們可使用如下命令檢索ElasticWorkerAutoscaler對象,kubectl get elasticworkerautoscalers
安裝時,咱們已經使用dag_1測試了airflow cluster。若是沒有,請轉到此處使用此DAG進行測試。咱們將在此處使用相同的DAG進行測試。
爲了進行測試,我在ElasticWorkerAutoscaler對象中將targetValue設置爲60。這意味着,一旦集羣總負載超過60,則將開始向外擴展,若是負載低於〜30,則將開始向內擴展。
咱們將經過登陸調度程序Pod來觸發DAG。
咱們將從測試橫向擴展方案開始。
在咱們的安裝中,每一個airflow worker的併發設置爲2,這意味着咱們總共有2個(併發)* 2(工做人員數量)= 4個可用插槽。所以,觸發4個DAG將使羣集負載達到100%。
在此測試案例中,咱們將同時觸發10個以上的DAG(即,咱們須要> 10個插槽)。這將致使airflow worker羣集擴展到maxReplica(即5個副本)。即便負載保持在100%,ElasticWorker控制器也將確保工人數不會超過maxReplica。
屏幕截圖下方是開始測試以前的airflow worker集羣。目前,咱們有2個worker,全部插槽都空着。
讓咱們經過登陸調度程序Pod來觸發DAG。若是還沒有完成,請記住取消暫停DAG。
#Login to Scheduler POD kubectl exec -it airflow-scheduler-76d5df7b9b-mjp25 bash cd dags #If you have not unpaused dag_1 already airflow unpause dag_1 export COUNT=0 while [[ $COUNT -lt 12 ]]; do airflow trigger_dag dag_1 COUNT=`expr $COUNT + 1` done;
隨着負載的增長,咱們看到建立了額外的airflow worker來處理負載。
當咱們觸發12個DAG時,建立的其餘airflow worker應該不止3個,可是因爲將maxReplicas設置爲5,ElasticWorker控制器不會建立5個以上的airflow worker。
咱們的橫向擴展方案可行!!
接下來咱們測試一下縮容。
基本的擴展方案已經過先前的測試驗證。若是咱們等待一兩分鐘,而後檢查集羣狀態,咱們能夠看到工做線程數已縮減至minReplicas。這是由於負載降至30如下。
咱們還想驗證它是不是安全的橫向擴展,即觸發橫向擴展時,控制器將終止負載爲0的worker,而不是仍在進行某些工做的worker。
爲了測試這種狀況,咱們將使用dag_2,它的任務將休眠30秒,而後將消息HI記錄到文件/home/airflow/logs/count_hi.txt中。咱們將觸發DAG 12次,每觸發4次,咱們將等待40+秒,而後再次觸發。
咱們在二者之間等待以觸發縮容。 Scale-In的默認退避時間爲30秒,這是爲了不抖動。
爲了最終驗證全部任務是否運行正常,而且實際進行處理的全部worker均未終止,咱們僅在輸出文件中計算消息HI。若是它等於咱們觸發的DAG數量(12),則咱們的測試用例將經過。
dag_2以下:
# Lets copy the dag_2.py file into minikube VM **minikube ssh cd dags/ cat>dag_2.py ....PASTE CONTENT FROM SAMPLE DAG.... ctrl-d logout
從調度程序Pod觸發DAG。
#Login to Scheduler POD kubectl exec -it airflow-scheduler-76d5df7b9b-mjp25 bash cd dags #If you have not unpaused dag_2 already airflow unpause dag_2 export COUNT=0 while [[ $COUNT -lt 4 ]]; do airflow trigger_dag dag_2 COUNT=`expr $COUNT + 1` done; sleep 40 export COUNT=0 while [[ $COUNT -lt 4 ]]; do airflow trigger_dag dag_2 COUNT=`expr $COUNT + 1` done; sleep 45 export COUNT=0 while [[ $COUNT -lt 4 ]]; do airflow trigger_dag dag_2 COUNT=`expr $COUNT + 1` done;
處理完全部任務後,咱們將對消息HI計數。咱們將使用調度程序Pod檢查輸出文件。
從上面的屏幕截圖能夠看出,消息HI被打印12次,與任務數相同。
在本文中,咱們看到了如何構建一個彈性airflow集羣,該集羣能夠在負載增長到特定閾值以上時橫向擴展,並在負載低於特定閾值時安全地橫向擴展。
咱們使用了兩個新的CRD-ElasticWorker和ElasticWorkerAutoscaler以及它們各自的控制器來實現此目的。 ElasticWorker Controller管理airflow工做程序副本,並確保它在minReplica和maxReplica之間。 ElasticWorkerAutoscaler控制器輪詢度量的集羣總負載,並計算將集羣負載達到指定targetValue所需的副本。而後,它將引用的ElasticWorker對象更新爲按比例放大或按比例縮小。