在不少運維場景下,咱們都會執行一些長時間的任務,好比裝機、部署環境、打包鏡像等長時間任務, 而一般咱們的任務節點數量一般是有限的(排除基於k8s的hpa、或者knative等自動伸縮場景)。算法
那麼當咱們有一個任務如何根據當前的worker和corrdinator和任務來進行合理的分配,分配其實也比較複雜,往復雜裏面作,能夠根據當前系統的負載、每一個任務的執行資源消耗、當前集羣的任務數量等, 這裏咱們就搞一個最簡單的,基於任務和當前worker的RR算法bash
在worker和任務隊列之間,添加一層協調調度層Coordinator, 由它來根據當前集羣任務的狀態來進行任務的分配,同時感知當前集羣worker和task的狀態,協調整個集羣任務的執行、終止等操做架構
members: 表示當前集羣中全部的worker tasks: 就是當前的任務 Coordinator: 就是咱們的協調者, 負責根據members和tasks進行任務的分配 result: 就是分配的結果app
CircularIterator就是咱們的環狀對立迭代器, 擁有兩個方法, 一個是add添加member, 一個Next返回基於rr的下一個member運維
// CircularIterator 環狀迭代器 type CircularIterator struct { list []interface{} // 保存全部的成員變量 next int } // Next 返回下一個元素 func (c *CircularIterator) Next() interface{} { item := c.list[c.next] c.next = (c.next + 1) % len(c.list) return item } // Add 添加任務 func (c *CircularIterator) Add(v interface{}) bool { for _, item := range c.list { if v == item { return false } } c.list = append(c.list, v) return true }
Member就是負責執行任務的worker, 有一個AddTask方法和Execute方法負責任務的執行和添加任務 Task標識一個任務分佈式
// Member 任務組成員 type Member struct { id int tasks []*Task } // ID 返回當前memberID func (m *Member) ID() int { return m.id } // AddTask 爲member添加任務 func (m *Member) AddTask(t *Task) bool { for _, task := range m.tasks { if task == t { return false } } m.tasks = append(m.tasks, t) return true } // Execute 執行任務 func (m *Member) Execute() { for _, task := range m.tasks { fmt.Printf("Member %d run task %s\n", m.ID(), task.Execute()) } } // Task 任務 type Task struct { name string } // Execute 執行task返回結果 func (t *Task) Execute() string { return "Task " + t.name + " run success" }
Coordinator是協調器,負責根據 Member和task進行集羣任務的協調調度ide
// Task 任務 type Task struct { name string } // Execute 執行task返回結果 func (t *Task) Execute() string { return "Task " + t.name + " run success" } // Coordinator 協調者 type Coordinator struct { members []*Member tasks []*Task } // TaskAssignments 爲member分配任務 func (c *Coordinator) TaskAssignments() map[int]*Member { taskAssignments := make(map[int]*Member) // 構建迭代器 memberIt := c.getMemberIterator() for _, task := range c.tasks { member := memberIt.Next().(*Member) _, err := taskAssignments[member.ID()] if err == false { taskAssignments[member.ID()] = member } member.AddTask(task) } return taskAssignments } func (c *Coordinator) getMemberIterator() *CircularIterator { // 經過當前成員, 構形成員隊列 members := make([]interface{}, len(c.members)) for index, member := range c.members { members[index] = member } return NewCircularIterftor(members) } // AddMember 添加member組成員 func (c *Coordinator) AddMember(m *Member) bool { for _, member := range c.members { if member == m { return false } } c.members = append(c.members, m) return true } // AddTask 添加任務 func (c *Coordinator) AddTask(t *Task) bool { for _, task := range c.tasks { if task == t { return false } } c.tasks = append(c.tasks, t) return true }
咱們首先建立一堆member和task, 而後調用coordinator進行任務分配,執行任務結果測試
coordinator := NewCoordinator() for i := 0; i < 10; i++ { m := &Member{id: i} coordinator.AddMember(m) } for i := 0; i < 30; i++ { t := &Task{name: fmt.Sprintf("task %d", i)} coordinator.AddTask(t) } result := coordinator.TaskAssignments() for _, member := range result { member.Execute() }
能夠看到每一個worker均勻的獲得任務分配ui
Member 6 run task Task task 6 run success Member 6 run task Task task 16 run success Member 6 run task Task task 26 run success Member 8 run task Task task 8 run success Member 8 run task Task task 18 run success Member 8 run task Task task 28 run success Member 0 run task Task task 0 run success Member 0 run task Task task 10 run success Member 0 run task Task task 20 run success Member 3 run task Task task 3 run success Member 3 run task Task task 13 run success Member 3 run task Task task 23 run success Member 4 run task Task task 4 run success Member 4 run task Task task 14 run success Member 4 run task Task task 24 run success Member 7 run task Task task 7 run success Member 7 run task Task task 17 run success Member 7 run task Task task 27 run success Member 9 run task Task task 9 run success Member 9 run task Task task 19 run success Member 9 run task Task task 29 run success Member 1 run task Task task 1 run success Member 1 run task Task task 11 run success Member 1 run task Task task 21 run success Member 2 run task Task task 2 run success Member 2 run task Task task 12 run success Member 2 run task Task task 22 run success Member 5 run task Task task 5 run success Member 5 run task Task task 15 run success Member 5 run task Task task 25 run success
package main import "fmt" // CircularIterator 環狀迭代器 type CircularIterator struct { list []interface{} next int } // Next 返回下一個元素 func (c *CircularIterator) Next() interface{} { item := c.list[c.next] c.next = (c.next + 1) % len(c.list) return item } // Add 添加任務 func (c *CircularIterator) Add(v interface{}) bool { for _, item := range c.list { if v == item { return false } } c.list = append(c.list, v) return true } // Member 任務組成員 type Member struct { id int tasks []*Task } // ID 返回當前memberID func (m *Member) ID() int { return m.id } // AddTask 爲member添加任務 func (m *Member) AddTask(t *Task) bool { for _, task := range m.tasks { if task == t { return false } } m.tasks = append(m.tasks, t) return true } // Execute 執行任務 func (m *Member) Execute() { for _, task := range m.tasks { fmt.Printf("Member %d run task %s\n", m.ID(), task.Execute()) } } // Task 任務 type Task struct { name string } // Execute 執行task返回結果 func (t *Task) Execute() string { return "Task " + t.name + " run success" } // Coordinator 協調者 type Coordinator struct { members []*Member tasks []*Task } // TaskAssignments 爲member分配任務 func (c *Coordinator) TaskAssignments() map[int]*Member { taskAssignments := make(map[int]*Member) // 構建迭代器 memberIt := c.getMemberIterator() for _, task := range c.tasks { member := memberIt.Next().(*Member) _, err := taskAssignments[member.ID()] if err == false { taskAssignments[member.ID()] = member } member.AddTask(task) } return taskAssignments } func (c *Coordinator) getMemberIterator() *CircularIterator { // 經過當前成員, 構形成員隊列 members := make([]interface{}, len(c.members)) for index, member := range c.members { members[index] = member } return NewCircularIterftor(members) } // AddMember 添加member組成員 func (c *Coordinator) AddMember(m *Member) bool { for _, member := range c.members { if member == m { return false } } c.members = append(c.members, m) return true } // AddTask 添加任務 func (c *Coordinator) AddTask(t *Task) bool { for _, task := range c.tasks { if task == t { return false } } c.tasks = append(c.tasks, t) return true } // NewCircularIterftor 返回迭代器 func NewCircularIterftor(list []interface{}) *CircularIterator { iterator := CircularIterator{} for _, item := range list { iterator.Add(item) } return &iterator } // NewCoordinator 返回協調器 func NewCoordinator() *Coordinator { c := Coordinator{} return &c } func main() { coordinator := NewCoordinator() for i := 0; i < 10; i++ { m := &Member{id: i} coordinator.AddMember(m) } for i := 0; i < 30; i++ { t := &Task{name: fmt.Sprintf("task %d", i)} coordinator.AddTask(t) } result := coordinator.TaskAssignments() for _, member := range result { member.Execute() } }
任務協調是一個很是複雜的事情, 內部的任務平臺,雖然實現了基於任務的組合和app化,可是任務調度分配着一塊,仍然沒有去作,只是簡單的根據樹形任務去簡單的作一些分支任務的執行,將來有時間再作吧,要繼續研究下一個模塊了設計
這個調度思想來源於kafka connect的DistributedHerder裏面的WorkerCoordinator,感興趣的能夠看看,未完待續
更多文章能夠訪問http://www.sreguide.com/