基於任務量進行k8s集羣的靈活調度處理

前言

最近公司內有個需求是爲了進一步控制某個項目的k8s集羣的資源,避免資源浪費。linux

目前項目須要的資源佔用率很高,須要3核CPU、2G內存。在一開始的時候是沒有作靈活調度處理的。會讓Pod一直處於運行狀態,即便沒有任務的時候也會一直運行,雖說能夠經過k8sResourcesRequestsLimits減小一點資源,可是仍是會照成必定資源的浪費。git

介紹

在正文開始前,須要把流程介紹一下,方便後文的理解。github

首先別的部門會往數據庫裏插入一條數據,而後在由調度器去按期的掃數據庫,掃到一個新數據,則由調度器去調用k8s的api去建立一個Job資源,在Job裏有一個Pod,由Pod去作一些任務。而後結束。docker

看起來比較簡單,可是有幾個須要注意的地方:數據庫

  1. 因爲Pod是須要環境變量的,而Pod是由調度器去建立的。那麼這個時候就須要把變量一步步傳進去
  2. 調度器不能去更改任何的數據,只能從數據庫裏拿,這是爲了更好的解耦。不能讓調度器去關心任何的業務邏輯及數據
  3. 調度器的自己不能存有任何的狀態,由於一旦涉及到狀態,就要去有個地方去存儲它。由於要考慮到調度器自己重啓。這樣作只會帶來更大的負擔。
  4. 須要考慮當前的集羣是否有資源再啓動Pod

開始

調度器使用了GoLang進行開發,因此後文都將使用Go作爲主力語言。api

建立一個可調試的k8s環境

目前由於使用的是Go進行開發,因此使用了k8s官方的client-go這個庫。而這個庫自己就提供了一些建立clientset的方法(能夠把clientset理解成一個能夠和當前集羣master通訊的管道)閉包

package main

import (
  "fmt"

  "k8s.io/client-go/kubernetes"
  "k8s.io/client-go/rest"
  "k8s.io/client-go/tools/clientcmd"
)

func main() {
  // 這個方法裏包含了k8s自身對Cluster的配置操做
  kubeConfig, err := rest.InClusterConfig()

  if err != nil {
    // 若是是開發環境,則使用當前minikube。須要配置KUBECONFIG變量
    // 若是是minikube,KUBECONFIG變量能夠指向$HOME/.kube/config
    kubeConfig, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
      clientcmd.NewDefaultClientConfigLoadingRules(),
      &clientcmd.ConfigOverrides{}).ClientConfig()

    // 若是沒有配置KUBECONFIG變量,且當前也沒有在集羣裏運行
    if err != nil {
      panic("get k8s config fail: " + err.Error())
    }
  }

  // 建立clientset失敗
  clientset, err := kubernetes.NewForConfig(kubeConfig)
  if err != nil {
    panic("failed to create k8s clientset: " + err.Error())
  }

  // 建立成功
  fmt.Println(clientset)
}
複製代碼

其中rest.InClusterConfig()代碼也十分簡單,就是去當前機器下的/var/run/secrets/kubernetes.io/serviceaccount/讀取tokenca。以及讀取KUBERNETES_SERVICE_HOSTKUBERNETES_SERVICE_PORT環境變量,再把他們拼在一塊兒,感興趣的同窗能夠去看下源碼app

根據上文能夠知道rest.InClusterConfig()是針對以及身在集羣中的機器而言的。在本地開發環境是確定不行的。因此咱們須要另外一個方法去解決這個問題。ide

能夠看到上面我已經作了處理,當發現InClusterConfig失敗後,會轉而執行下面的代碼:函數

kubeConfig, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
  clientcmd.NewDefaultClientConfigLoadingRules(),
  &clientcmd.ConfigOverrides{}).ClientConfig()
複製代碼

這段代碼其實也比較簡單,就是去讀取當前環境下的KUBECONFIG獲取本地k8s的配置路徑。若是沒有這個變量,再去獲取當前用戶目錄下的.kube/config文件。最終根據文件改形成所須要的配置。主要源碼可見: NewDefaultClientConfigLoadingRulesClientConfig

如今只要保證你本機有minikube環境就能夠正常調試、開發了。

以上的方法參考rook的寫法

建立Job及Pod

數據庫查詢的這裏就再也不闡述了,能夠根據自身的業務進行適配、開發。這裏只是起到一個拋磚引玉的效果。不止是數據庫,其餘任何東西均可以,主要仍是要看自身的業務適合什麼。

咱們先假設,這裏從數據庫裏拿到了一條數據,咱們須要把數據庫的值傳給Pod。避免Pod裏再作一次查詢。如今咱們須要先把Job定義好:

import (
  batchv1 "k8s.io/api/batch/v1"
  apiv1 "k8s.io/api/core/v1"
  "k8s.io/apimachinery/pkg/api/resource"
  metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// job所需配置
type JobsSpec struct {
  Namespace string
  Image     string
  Prefix    string
}

// 返回指定的cpu、memory資源值
// 寫法參考k8s見:https://github.com/kubernetes/kubernetes/blob/b3875556b0edf3b5eaea32c69678edcf4117d316/pkg/kubelet/cm/helpers_linux_test.go#L36-L53
func getResourceList(cpu, memory string) apiv1.ResourceList {
  res := apiv1.ResourceList{}
  if cpu != "" {
    res[apiv1.ResourceCPU] = resource.MustParse(cpu)
  }
  if memory != "" {
    res[apiv1.ResourceMemory] = resource.MustParse(memory)
  }
  return res
}

// 返回ResourceRequirements對象,詳細見getResourceList函數註釋
func getResourceRequirements(requests, limits apiv1.ResourceList) apiv1.ResourceRequirements {
  res := apiv1.ResourceRequirements{}
  res.Requests = requests
  res.Limits = limits
  return res
}

// 轉爲指針
func newInt64(i int64) *int64 {
  return &i
}

// 建立job的配置
// 返回指定的cpu、memory資源值
// 寫法參考k8s見:https://github.com/kubernetes/kubernetes/blob/b3875556b0edf3b5eaea32c69678edcf4117d316/pkg/kubelet/cm/helpers_linux_test.go#L36-L53
func getResourceList(cpu, memory string) apiv1.ResourceList {
  res := apiv1.ResourceList{}
  if cpu != "" {
    res[apiv1.ResourceCPU] = resource.MustParse(cpu)
  }
  if memory != "" {
    res[apiv1.ResourceMemory] = resource.MustParse(memory)
  }
  return res
}

// 返回ResourceRequirements對象,詳細見getResourceList函數註釋
func getResourceRequirements(requests, limits apiv1.ResourceList) apiv1.ResourceRequirements {
  res := apiv1.ResourceRequirements{}
  res.Requests = requests
  res.Limits = limits
  return res
}

// job所需配置
type jobsSpec struct {
  Namespace string
  Image     string
  Prefix    string
}

// 建立job的配置
func (j *jobsSpec) Create(envMap map[string]string) *batchv1.Job {
  u2 := uuid.NewV4().String()[:8]
  name := fmt.Sprint(j.Prefix, "-", u2)
  
  return &batchv1.Job{
    ObjectMeta: metav1.ObjectMeta{
      Name:      name,
      Namespace: j.Namespace,
    },
    Spec: batchv1.JobSpec{
      Template: apiv1.PodTemplateSpec{
        Spec: apiv1.PodSpec{
          RestartPolicy: "Never",
          Containers: []apiv1.Container{
            {
              Name:            name,
              Image:           j.Image,
              Env:             EnvToVars(envMap),
              ImagePullPolicy: "Always",
              Resources:       getResourceRequirements(getResourceList("2500m", "2048Mi"), getResourceList("3000m", "2048Mi")),
            },
          },
        },
      },
    },
  }
}
複製代碼

這裏沒什麼好說的,基本就是資源定義,以及上門還有註釋。

上面的代碼其實少了一部分,這部分是把變量注入進去的。也就是EnvToVars,核心代碼以下:

// 把對象轉化成k8s所能接受的環境變量格式
func EnvToVars(envMap map[string]string) []v1.EnvVar {
  var envVars []v1.EnvVar
  for k, v := range envMap {
    envVar := v1.EnvVar{
      Name:  k,
      Value: v,
    }
    envVars = append(envVars, envVar)
  }
  return envVars
}

// 獲取當前系統中全部的變量,並轉成map方式
func GetAllEnvToMap() map[string]string {
  item := make(map[string]string)
  for _, k := range os.Environ() {
    splits := strings.Split(k, "=")
    item[splits[0]] = splits[1]
  }

  return item
}

// 合併兩個map,爲了更好的性能,使用閉包的方式,這樣sourceMap只須要調用一次便可
func MergeMap(sourceMap map[string]string) func(insertMap map[string]string) map[string]string {
  return func(insertMap map[string]string) map[string]string {
    for k, v := range insertMap {
      sourceMap[k] = v
    }

    return sourceMap
  }
}
複製代碼

而後在使用的時候就是這樣了:

job := jobsSpec{
  Prefix:    "project-" + "dev" + "-job",
  Image:     "docker image name",
  Namespace: "default",
}

willMergeMap := MergeMap(GetAllEnvToMap())

// dbData是從數據庫裏拿到的數據,格式大體以下
// [ { id: 1, url: 'xxx' }, { id: 2, url: 'yyy' } ]
for _, data := range dbData {
  currentEnvMap := willMergeMap(data)

  // 建立Job
  _, err = api.CreateJob(currentEnvMap)
  
  if err != nil {
    panic("create job fail", err.Error())
  }
}
複製代碼

這樣一來,就實現了把當前環境變量及數據經過變量的方式傳給Pod。這樣的話,只須要保證當前的調度器裏存在一些Pod可能會用到的變量就好了,如:S3 TokenDB Host等。經過這種方式,Pod基本上什麼都不用關係,它所須要的變量,會由調度器傳給它,分工明確。

優化

其實以上其實就已經完成了最核心的東西,自己也不是特別的難。很簡單的邏輯。只不過光有這些是不夠的,還有不少地方須要考慮。

資源判斷

這裏在說以前有個前提,以前說過這個調度器是不能去更改任何數據的,更改數據只能由Pod裏的容器去更改。

那麼這個時候就有問題了。

集羣若是資源不夠分配的話,那Pod將會一直處於Pending狀態,根據上文,變量已經注入到Pod裏了,並且因爲裏面的容器沒有啓動。那就會致使數據沒有更改,而沒有更改的數據,調度器就會一直認爲他的新的。致使會爲這條數據再啓動一個Job,一直循環到當集羣資源足夠後其中的一個Pod去更改了數據。

舉個例子,假設數據庫裏有一個status的字段,當值爲wating時,調度器就認爲這是一條新數據,會把這個數據轉變成環境變量注入到Pod裏,再由Pod去把waiting更改爲process。調度器每3分鐘去掃一次數據,因此Pod必須在3分鐘內把數據更改完畢。

而這時因爲資源不夠,k8s建立了這個Pod,可是裏面的代碼沒有運行,致使沒有去更改數據,就會致使調度器一直去爲同一條數據建立Pod。

解決方案也比較簡單,只要去判斷下Pod的狀態是否爲Pending,若是是,則再也不建立Pod。下面是核心代碼:

func HavePendingPod() (bool, error) {
  // 獲取當前namespace下全部的pod
  pods, err := clientset.CoreV1().Pods(Namespace).List(metaV1.ListOptions{})
  if err != nil {
    return false, err
  }

  // 循環pod,判斷每一個pod是否符合當前的前綴,若是符合,則說明當前的環境已經存在Pending狀態了
  for _, v := range pods.Items {
    phase := v.Status.Phase
    if phase == "Pending" {
      if strings.HasPrefix(v.Name, Prefix) {
        return true, nil
      }
    }
  }

  return false, nil
}
複製代碼

當爲true時,就再也不建立Job

Job數量最大值

集羣的資源也不是無限的,雖然咱們對Pending狀況作了處理,可是這只是一種防護手段。咱們仍是要對數量進行一個管控,當Job數量等於某個值時,不在建立Job了。代碼也很簡單,我這裏就把獲取當前環境下Job數量的代碼放出來:

// 獲取當前namespace下同環境的job Item實例
func GetJobListByNS() ([]v1.Job, error) {
  var jobList, err = clientset.BatchV1().Jobs(Namespace).List(metaV1.ListOptions{})
    if err != nil {
    return nil, err
  }

  // 過濾非同前綴的Job
  var item []v1.Job
  for _, v := range jobList.Items {
    if strings.HasPrefix(v.Name, Prefix) {
      item = append(item, v)
    }
  }

  return item, nil
}

func GetJobLenByNS() (int, error) {
  jobItem, err := api.GetJobListByNS()
  if err != nil {
      return 最大值, err
  }
  
  return len(jobItem), nil
}
複製代碼

刪除已完成和失敗的

上面的代碼實際上是有問題的,k8s的Job資源類型是有一個特性是,當完成或者失敗的時候,並不會刪除自身,也就說即便他完成了,它的數據還會一直停留在那。因此上面的代碼會把一些已經完成或者失敗的Job也統計進去。到最後會出現一直沒法建立Job的窘迫。

解決方案有兩個,第一個是在聲明Job資源時,添加spec.ttlSecondsAfterFinished屬性來作到k8s自動回收完成、失敗的Job。惋惜的是這是高版本纔有的屬性,我司很遺憾是低版本的。那就只能用第二種方法了,就是在每次獲取數量前,調用api把完成、失敗的Job刪除:

func DeleteCompleteJob() error {
  jobItem, err := GetJobListByNS()
  if err != nil {
    return err
  }

  // 若是不指定此屬性,刪除job時,不會刪除pod
  propagationPolicy := metaV1.DeletePropagationForeground
  for _, v := range jobItem {
    // 只刪除已經結束的job
    if v.Status.Failed == 1 || v.Status.Succeeded == 1 {
      err := clientset.BatchV1().Jobs(Namespace).Delete(v.Name, &metaV1.DeleteOptions{
        PropagationPolicy: &propagationPolicy,
      })

      if err != nil {
        return err
      }
    }
  }

  return nil
}
複製代碼

結尾

整個調度器的代碼比較簡單,沒有必要專門抽成一個庫來作。只要知道大概的思路,就能夠根據本身的項目需求作出適合本身項目組的調度器。

感謝我司大佬@qqshfox提供的思路

相關文章
相關標籤/搜索