最近公司內有個需求是爲了進一步控制某個項目的k8s集羣的資源,避免資源浪費。linux
目前項目須要的資源佔用率很高,須要3核CPU、2G內存。在一開始的時候是沒有作靈活調度處理的。會讓Pod
一直處於運行狀態,即便沒有任務的時候也會一直運行,雖說能夠經過k8s
下Resources
的Requests
和Limits
減小一點資源,可是仍是會照成必定資源的浪費。git
在正文開始前,須要把流程介紹一下,方便後文的理解。github
首先別的部門會往數據庫裏插入一條數據,而後在由調度器去按期的掃數據庫,掃到一個新數據,則由調度器去調用k8s的api去建立一個Job
資源,在Job裏有一個Pod
,由Pod
去作一些任務。而後結束。docker
看起來比較簡單,可是有幾個須要注意的地方:數據庫
Pod
是須要環境變量的,而Pod
是由調度器去建立的。那麼這個時候就須要把變量一步步傳進去Pod
了調度器使用了GoLang
進行開發,因此後文都將使用Go
作爲主力語言。api
目前由於使用的是Go
進行開發,因此使用了k8s官方的client-go
這個庫。而這個庫自己就提供了一些建立clientset
的方法(能夠把clientset
理解成一個能夠和當前集羣master通訊的管道)閉包
package main
import (
"fmt"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
// 這個方法裏包含了k8s自身對Cluster的配置操做
kubeConfig, err := rest.InClusterConfig()
if err != nil {
// 若是是開發環境,則使用當前minikube。須要配置KUBECONFIG變量
// 若是是minikube,KUBECONFIG變量能夠指向$HOME/.kube/config
kubeConfig, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
clientcmd.NewDefaultClientConfigLoadingRules(),
&clientcmd.ConfigOverrides{}).ClientConfig()
// 若是沒有配置KUBECONFIG變量,且當前也沒有在集羣裏運行
if err != nil {
panic("get k8s config fail: " + err.Error())
}
}
// 建立clientset失敗
clientset, err := kubernetes.NewForConfig(kubeConfig)
if err != nil {
panic("failed to create k8s clientset: " + err.Error())
}
// 建立成功
fmt.Println(clientset)
}
複製代碼
其中rest.InClusterConfig()
代碼也十分簡單,就是去當前機器下的/var/run/secrets/kubernetes.io/serviceaccount/
讀取token
和ca
。以及讀取KUBERNETES_SERVICE_HOST
和KUBERNETES_SERVICE_PORT
環境變量,再把他們拼在一塊兒,感興趣的同窗能夠去看下源碼。app
根據上文能夠知道rest.InClusterConfig()
是針對以及身在集羣中的機器而言的。在本地開發環境是確定不行的。因此咱們須要另外一個方法去解決這個問題。ide
能夠看到上面我已經作了處理,當發現InClusterConfig
失敗後,會轉而執行下面的代碼:函數
kubeConfig, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
clientcmd.NewDefaultClientConfigLoadingRules(),
&clientcmd.ConfigOverrides{}).ClientConfig()
複製代碼
這段代碼其實也比較簡單,就是去讀取當前環境下的KUBECONFIG
獲取本地k8s的配置路徑。若是沒有這個變量,再去獲取當前用戶目錄下的.kube/config
文件。最終根據文件改形成所須要的配置。主要源碼可見: NewDefaultClientConfigLoadingRules、ClientConfig
如今只要保證你本機有minikube
環境就能夠正常調試、開發了。
以上的方法參考rook的寫法
數據庫查詢的這裏就再也不闡述了,能夠根據自身的業務進行適配、開發。這裏只是起到一個拋磚引玉的效果。不止是數據庫,其餘任何東西均可以,主要仍是要看自身的業務適合什麼。
咱們先假設,這裏從數據庫裏拿到了一條數據,咱們須要把數據庫的值傳給Pod。避免Pod裏再作一次查詢。如今咱們須要先把Job定義好:
import (
batchv1 "k8s.io/api/batch/v1"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// job所需配置
type JobsSpec struct {
Namespace string
Image string
Prefix string
}
// 返回指定的cpu、memory資源值
// 寫法參考k8s見:https://github.com/kubernetes/kubernetes/blob/b3875556b0edf3b5eaea32c69678edcf4117d316/pkg/kubelet/cm/helpers_linux_test.go#L36-L53
func getResourceList(cpu, memory string) apiv1.ResourceList {
res := apiv1.ResourceList{}
if cpu != "" {
res[apiv1.ResourceCPU] = resource.MustParse(cpu)
}
if memory != "" {
res[apiv1.ResourceMemory] = resource.MustParse(memory)
}
return res
}
// 返回ResourceRequirements對象,詳細見getResourceList函數註釋
func getResourceRequirements(requests, limits apiv1.ResourceList) apiv1.ResourceRequirements {
res := apiv1.ResourceRequirements{}
res.Requests = requests
res.Limits = limits
return res
}
// 轉爲指針
func newInt64(i int64) *int64 {
return &i
}
// 建立job的配置
// 返回指定的cpu、memory資源值
// 寫法參考k8s見:https://github.com/kubernetes/kubernetes/blob/b3875556b0edf3b5eaea32c69678edcf4117d316/pkg/kubelet/cm/helpers_linux_test.go#L36-L53
func getResourceList(cpu, memory string) apiv1.ResourceList {
res := apiv1.ResourceList{}
if cpu != "" {
res[apiv1.ResourceCPU] = resource.MustParse(cpu)
}
if memory != "" {
res[apiv1.ResourceMemory] = resource.MustParse(memory)
}
return res
}
// 返回ResourceRequirements對象,詳細見getResourceList函數註釋
func getResourceRequirements(requests, limits apiv1.ResourceList) apiv1.ResourceRequirements {
res := apiv1.ResourceRequirements{}
res.Requests = requests
res.Limits = limits
return res
}
// job所需配置
type jobsSpec struct {
Namespace string
Image string
Prefix string
}
// 建立job的配置
func (j *jobsSpec) Create(envMap map[string]string) *batchv1.Job {
u2 := uuid.NewV4().String()[:8]
name := fmt.Sprint(j.Prefix, "-", u2)
return &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: j.Namespace,
},
Spec: batchv1.JobSpec{
Template: apiv1.PodTemplateSpec{
Spec: apiv1.PodSpec{
RestartPolicy: "Never",
Containers: []apiv1.Container{
{
Name: name,
Image: j.Image,
Env: EnvToVars(envMap),
ImagePullPolicy: "Always",
Resources: getResourceRequirements(getResourceList("2500m", "2048Mi"), getResourceList("3000m", "2048Mi")),
},
},
},
},
},
}
}
複製代碼
這裏沒什麼好說的,基本就是資源定義,以及上門還有註釋。
上面的代碼其實少了一部分,這部分是把變量注入進去的。也就是EnvToVars
,核心代碼以下:
// 把對象轉化成k8s所能接受的環境變量格式
func EnvToVars(envMap map[string]string) []v1.EnvVar {
var envVars []v1.EnvVar
for k, v := range envMap {
envVar := v1.EnvVar{
Name: k,
Value: v,
}
envVars = append(envVars, envVar)
}
return envVars
}
// 獲取當前系統中全部的變量,並轉成map方式
func GetAllEnvToMap() map[string]string {
item := make(map[string]string)
for _, k := range os.Environ() {
splits := strings.Split(k, "=")
item[splits[0]] = splits[1]
}
return item
}
// 合併兩個map,爲了更好的性能,使用閉包的方式,這樣sourceMap只須要調用一次便可
func MergeMap(sourceMap map[string]string) func(insertMap map[string]string) map[string]string {
return func(insertMap map[string]string) map[string]string {
for k, v := range insertMap {
sourceMap[k] = v
}
return sourceMap
}
}
複製代碼
而後在使用的時候就是這樣了:
job := jobsSpec{
Prefix: "project-" + "dev" + "-job",
Image: "docker image name",
Namespace: "default",
}
willMergeMap := MergeMap(GetAllEnvToMap())
// dbData是從數據庫裏拿到的數據,格式大體以下
// [ { id: 1, url: 'xxx' }, { id: 2, url: 'yyy' } ]
for _, data := range dbData {
currentEnvMap := willMergeMap(data)
// 建立Job
_, err = api.CreateJob(currentEnvMap)
if err != nil {
panic("create job fail", err.Error())
}
}
複製代碼
這樣一來,就實現了把當前環境變量及數據經過變量的方式傳給Pod
。這樣的話,只須要保證當前的調度器裏存在一些Pod
可能會用到的變量就好了,如:S3 Token
、DB Host
等。經過這種方式,Pod
基本上什麼都不用關係,它所須要的變量,會由調度器傳給它,分工明確。
其實以上其實就已經完成了最核心的東西,自己也不是特別的難。很簡單的邏輯。只不過光有這些是不夠的,還有不少地方須要考慮。
這裏在說以前有個前提,以前說過這個調度器是不能去更改任何數據的,更改數據只能由Pod
裏的容器去更改。
那麼這個時候就有問題了。
集羣若是資源不夠分配的話,那Pod
將會一直處於Pending
狀態,根據上文,變量已經注入到Pod裏了,並且因爲裏面的容器沒有啓動。那就會致使數據沒有更改,而沒有更改的數據,調度器就會一直認爲他的新的。致使會爲這條數據再啓動一個Job,一直循環到當集羣資源足夠後其中的一個Pod去更改了數據。
舉個例子,假設數據庫裏有一個status
的字段,當值爲wating
時,調度器就認爲這是一條新數據,會把這個數據轉變成環境變量注入到Pod裏,再由Pod去把waiting
更改爲process
。調度器每3分鐘去掃一次數據,因此Pod必須在3分鐘內把數據更改完畢。
而這時因爲資源不夠,k8s建立了這個Pod,可是裏面的代碼沒有運行,致使沒有去更改數據,就會致使調度器一直去爲同一條數據建立Pod。
解決方案也比較簡單,只要去判斷下Pod的狀態是否爲Pending
,若是是,則再也不建立Pod。下面是核心代碼:
func HavePendingPod() (bool, error) {
// 獲取當前namespace下全部的pod
pods, err := clientset.CoreV1().Pods(Namespace).List(metaV1.ListOptions{})
if err != nil {
return false, err
}
// 循環pod,判斷每一個pod是否符合當前的前綴,若是符合,則說明當前的環境已經存在Pending狀態了
for _, v := range pods.Items {
phase := v.Status.Phase
if phase == "Pending" {
if strings.HasPrefix(v.Name, Prefix) {
return true, nil
}
}
}
return false, nil
}
複製代碼
當爲true
時,就再也不建立Job
集羣的資源也不是無限的,雖然咱們對Pending
狀況作了處理,可是這只是一種防護手段。咱們仍是要對數量進行一個管控,當Job數量等於某個值時,不在建立Job了。代碼也很簡單,我這裏就把獲取當前環境下Job數量的代碼放出來:
// 獲取當前namespace下同環境的job Item實例
func GetJobListByNS() ([]v1.Job, error) {
var jobList, err = clientset.BatchV1().Jobs(Namespace).List(metaV1.ListOptions{})
if err != nil {
return nil, err
}
// 過濾非同前綴的Job
var item []v1.Job
for _, v := range jobList.Items {
if strings.HasPrefix(v.Name, Prefix) {
item = append(item, v)
}
}
return item, nil
}
func GetJobLenByNS() (int, error) {
jobItem, err := api.GetJobListByNS()
if err != nil {
return 最大值, err
}
return len(jobItem), nil
}
複製代碼
上面的代碼實際上是有問題的,k8s的Job
資源類型是有一個特性是,當完成或者失敗的時候,並不會刪除自身,也就說即便他完成了,它的數據還會一直停留在那。因此上面的代碼會把一些已經完成或者失敗的Job也統計進去。到最後會出現一直沒法建立Job的窘迫。
解決方案有兩個,第一個是在聲明Job
資源時,添加spec.ttlSecondsAfterFinished
屬性來作到k8s自動回收完成、失敗的Job。惋惜的是這是高版本纔有的屬性,我司很遺憾是低版本的。那就只能用第二種方法了,就是在每次獲取數量前,調用api把完成、失敗的Job刪除:
func DeleteCompleteJob() error {
jobItem, err := GetJobListByNS()
if err != nil {
return err
}
// 若是不指定此屬性,刪除job時,不會刪除pod
propagationPolicy := metaV1.DeletePropagationForeground
for _, v := range jobItem {
// 只刪除已經結束的job
if v.Status.Failed == 1 || v.Status.Succeeded == 1 {
err := clientset.BatchV1().Jobs(Namespace).Delete(v.Name, &metaV1.DeleteOptions{
PropagationPolicy: &propagationPolicy,
})
if err != nil {
return err
}
}
}
return nil
}
複製代碼
整個調度器的代碼比較簡單,沒有必要專門抽成一個庫來作。只要知道大概的思路,就能夠根據本身的項目需求作出適合本身項目組的調度器。
感謝我司大佬@qqshfox提供的思路