Kubernetes部署彈性Airflow集羣

在本文中,我將演示如何構建一個彈性Airflow集羣,當負載低於閾值時,該集羣能夠在高負載下安全地橫向擴展。git

經過水平Pod自動縮放器支持Kubernetes中的自動縮放。使用HPA,能夠直接進行橫向擴展,HPA能夠爲部署增長副本,並能夠建立其餘worker來共享工做負載。可是,伸縮是問題所在,伸縮過程會根據Pod在節點上的位置對它們進行排序,從而選擇要終止的Pod。所以,若是有一個Pod仍在進行某些處理,則沒法保證它不會被終止。github

在彈性Airflow 集羣中,爲了擴大規模,咱們須要確保進行某些處理的worker不會被終止。只有閒置的worker才應考慮終止。redis

爲了實現這一點,我建立了兩個CRD和兩個控制器 - ElasticWorkerElasticWorkerAutoscaler,在本文後面將對它們進行介紹。數據庫

對於此問題,還有其餘解決方案,例如,能夠建立一個Kubernetes做業,該做業能夠完成一組任務。隨着負載的增長,將建立更多的做業。可是,這種方法不是通用解決方案,它很是適合具備相似自動縮放要求的其餘用例。此處描述的方法是一種通用實現,能夠用做全面生產部署的起點。api

彈性 Airflow Cluster 架構

el01.png

組件說明以下,安全

  • Airflow scheduler解析DAG並將必要的任務添加到RabbitMQ隊列。
  • PostgresDB保存有關任務,DAG,變量,鏈接等狀態的信息。
  • RabbitMQ 將要執行的命令存儲在隊列中。
  • Airflow Worker 從RabbitMQ檢索命令並執行它們。
  • Flower 是用於監視和管理Celery worker的基於Web的工具。在咱們的設置中,flower還包含其餘腳本,以獲取每一個airflow worker的指標並將其放入redis db中。
  • Redis DB 存儲每一個airflow worker Pod的負載度量以及彙總的集羣總負載。它還存儲了咱們的自定義指標APIServer適配器的全部已註冊指標。
  • Custom Metric APIServer適配器 是一個基本的自定義Metric API服務器適配器,它分別爲給定的pod和airflow集羣資源服務的load和total_cluster_load度量請求。它從Redis數據庫檢索這些指標。
  • ElasticWorker Controller 監視類型爲ElasticWorker(CRD)的對象,並將集羣狀態與相應的Elasticworker對象中的規範協調。在較高級別,如下是該控制者的職責。bash

    • 建立等於minReplica的worker Pod
    • 若是變量scale> 0,則建立其餘工做容器。可是要確保總的Pod數量不超過maxReplicas
    • scale <0時,刪除worker Pod。Pod的刪除由定義的「按比例縮放」策略控制。當前有三個策略-ScaleInImmediatelyScaleInBySelectorScaleInDisabled,咱們將在此處使用ScaleInBySelector策略。這樣能夠確保控制器僅刪除已定義標籤集的Pod。它還能夠確保不管是否設置標籤,Pod數量都不會低於minReplicas

al02.jpg

  • ElasticWorkerAutoscalerController ElasticWorkerAutoscaler控制器監視類型爲ElasticWorkerAutoscaler(CRD)的對象。下面是該控制器的職責,服務器

    • 檢索名稱爲引用ElasticWorker對象的資源的指標total_cluster_load
    • 若是total_cluster_load> targetValue,則向外擴展。計算新的工做單元數,以減小目標值的負擔。計算方法與HPA相同。設置引用的ElasticWorker對象的scale屬性。
    • 若是total_cluster_load <0.70 * targetValue,則縮容。若是負載低於閾值,則不會當即開始Scale-In,可是scaleInBackOff週期開始計時。默認狀況下,它設置爲30秒,若是僅在此期間完成,則執行縮容。若是平均時間total_cluster_load增長,則ScaleInBackOff週期無效。週期結束後,控制器將選擇那些具備metricload = 0的worker Pod。而後,它使用請求中的這些pod調用shutdownHttpHook。hook是對此實現定製的,但能夠推廣。接下來,控制器用終止標籤標記容器,最後用適當的值更新比例,以使ElasticWorker控制器更改集羣狀態。
    • load = 0,ShutdownHttpHook和TerminationLabelit確保僅終止那些不作任何事情的airflow worker。 HttpShutdown hook很重要,由於它能夠確保標記爲終止的airflow worker在ElasticWorker控制器終止它時不會從RabbitMQ接任任何任務。

al01.jpg

安裝

ElasticWorker和ElasticWorkerAutoscaler控制器代碼位於-elastic-worker-autoscaler架構

自定義指標APIServer適配器代碼位於-elastic-worker-custommetrics-adapter併發

請遵循此處的安裝說明-elastic-airflow-cluster-k8s-setup-manifests

al03.png

此外,咱們可使用如下命令啓動Kubernetes儀表板並進行驗證,

minikube dashboard

咱們在minikube上的設置以下:

兩個命名空間

  • elastic-worker-autoscaler-system
  • elasticworker-custommetrics

命名空間:elasticworker-custommetrics包含與自定義指標相關的pod。
命名空間:elastic-worker-autoscaler-system包含用於ElasticWorker和ElasticWorkerAutoscaler的控制器容器
其他組件在默認名稱空間中建立。

咱們可使用如下命令檢索ElasticWorker對象,
kubectl get elasticworkers

咱們可使用如下命令檢索ElasticWorkerAutoscaler對象,
kubectl get elasticworkerautoscalers

測試

安裝時,咱們已經使用dag_1測試了airflow cluster。若是沒有,請轉到此處使用此DAG進行測試。咱們將在此處使用相同的DAG進行測試。

爲了進行測試,我在ElasticWorkerAutoscaler對象中將targetValue設置爲60。這意味着,一旦集羣總負載超過60,則將開始向外擴展,若是負載低於〜30,則將開始向內擴展。

咱們將經過登陸調度程序Pod來觸發DAG。

咱們將從測試橫向擴展方案開始。

在咱們的安裝中,每一個airflow worker的併發設置爲2,這意味着咱們總共有2個(併發)* 2(工做人員數量)= 4個可用插槽。所以,觸發4個DAG將使羣集負載達到100%。

在此測試案例中,咱們將同時觸發10個以上的DAG(即,咱們須要> 10個插槽)。這將致使airflow worker羣集擴展到maxReplica(即5個副本)。即便負載保持在100%,ElasticWorker控制器也將確保工人數不會超過maxReplica。

屏幕截圖下方是開始測試以前的airflow worker集羣。目前,咱們有2個worker,全部插槽都空着。

al04.png

al05.png

讓咱們經過登陸調度程序Pod來觸發DAG。若是還沒有完成,請記住取消暫停DAG。

#Login to Scheduler POD
kubectl exec -it airflow-scheduler-76d5df7b9b-mjp25 bash
cd dags
#If you have not unpaused dag_1 already
airflow unpause dag_1
export COUNT=0
while [[ $COUNT -lt 12 ]];
do
airflow trigger_dag dag_1
COUNT=`expr $COUNT + 1`
done;

隨着負載的增長,咱們看到建立了額外的airflow worker來處理負載。

al06.png

al07.png

當咱們觸發12個DAG時,建立的其餘airflow worker應該不止3個,可是因爲將maxReplicas設置爲5,ElasticWorker控制器不會建立5個以上的airflow worker。

咱們的橫向擴展方案可行!!

接下來咱們測試一下縮容。

基本的擴展方案已經過先前的測試驗證。若是咱們等待一兩分鐘,而後檢查集羣狀態,咱們能夠看到工做線程數已縮減至minReplicas。這是由於負載降至30如下。

al08.png

al09.png

咱們還想驗證它是不是安全的橫向擴展,即觸發橫向擴展時,控制器將終止負載爲0的worker,而不是仍在進行某些工做的worker。

爲了測試這種狀況,咱們將使用dag_2,它的任務將休眠30秒,而後將消息HI記錄到文件/home/airflow/logs/count_hi.txt中。咱們將觸發DAG 12次,每觸發4次,咱們將等待40+秒,而後再次觸發。

咱們在二者之間等待以觸發縮容。 Scale-In的默認退避時間爲30秒,這是爲了不抖動。

爲了最終驗證全部任務是否運行正常,而且實際進行處理的全部worker均未終止,咱們僅在輸出文件中計算消息HI。若是它等於咱們觸發的DAG數量(12),則咱們的測試用例將經過。

dag_2以下:

# Lets copy the dag_2.py file into minikube VM  
**minikube ssh  
cd dags/  
cat>dag_2.py  
....PASTE CONTENT FROM SAMPLE DAG....  
ctrl-d  
logout

從調度程序Pod觸發DAG。

#Login to Scheduler POD
kubectl exec -it airflow-scheduler-76d5df7b9b-mjp25 bash
cd dags
#If you have not unpaused dag_2 already
airflow unpause dag_2
export COUNT=0
while [[ $COUNT -lt 4 ]];
do
airflow trigger_dag dag_2
COUNT=`expr $COUNT + 1`
done;
sleep 40
export COUNT=0
while [[ $COUNT -lt 4 ]];
do
airflow trigger_dag dag_2
COUNT=`expr $COUNT + 1`
done;
sleep 45
export COUNT=0
while [[ $COUNT -lt 4 ]];
do
airflow trigger_dag dag_2
COUNT=`expr $COUNT + 1`
done;

處理完全部任務後,咱們將對消息HI計數。咱們將使用調度程序Pod檢查輸出文件。

al10.png

從上面的屏幕截圖能夠看出,消息HI被打印12次,與任務數相同。

結論

在本文中,咱們看到了如何構建一個彈性airflow集羣,該集羣能夠在負載增長到特定閾值以上時橫向擴展,並在負載低於特定閾值時安全地橫向擴展。

咱們使用了兩個新的CRD-ElasticWorker和ElasticWorkerAutoscaler以及它們各自的控制器來實現此目的。 ElasticWorker Controller管理airflow工做程序副本,並確保它在minReplica和maxReplica之間。 ElasticWorkerAutoscaler控制器輪詢度量的集羣總負載,並計算將集羣負載達到指定targetValue所需的副本。而後,它將引用的ElasticWorker對象更新爲按比例放大或按比例縮小。

相關文章
相關標籤/搜索