從功能實現上兩級調度,一級調度負責將Leader選舉,二級調度則是worker節點完成每一個成員的任務的分配算法
主要是學習這種架構設計思想,雖然這種方案場景很是有限bash
MemoryQueue: 模擬消息隊列實現消息的分發,充當kafka broker角色 Worker: 任務執行和具體業務二級協調算法 Coordinator: 位於消息隊列內部的一個協調器,用於Leader/Follower選舉 Task: 任務 Assignment: Coordnator根據任務信息和節點信息構建的任務分配結果 GroupRequest: 加入集羣請求 GroupResponse: 響應信息數據結構
// MemoryQueue 內存消息隊列
type MemoryQueue struct {
done chan struct{}
queue chan interface{}
wg sync.WaitGroup
coordinator map[string]*Coordinator
worker map[string]*Worker
}
複製代碼
其中coordinator用於標識每一個Group組的協調器,爲每一個組都創建一個分配器架構
func (mq *MemoryQueue) handleEvent(event interface{}) {
switch event.(type) {
case GroupRequest:
request := event.(GroupRequest)
mq.handleGroupRequest(&request)
case Task:
task := event.(Task)
mq.handleTask(&task)
default:
mq.Notify(event)
}
mq.wg.Done()
}
複製代碼
// getGroupCoordinator 獲取指定組的協調器
func (mq *MemoryQueue) getGroupCoordinator(group string) *Coordinator {
coordinator, ok := mq.coordinator[group]
if ok {
return coordinator
}
coordinator = NewCoordinator(group)
mq.coordinator[group] = coordinator
return coordinator
}
func (mq *MemoryQueue) handleGroupRequest(request *GroupRequest) {
coordinator := mq.getGroupCoordinator(request.Group)
exist := coordinator.addMember(request.ID, &request.Metadata)
// 若是worker以前已經加入該組, 就不作任何操做
if exist {
return
}
// 從新構建請求信息
groupResponse := mq.buildGroupResponse(coordinator)
mq.send(groupResponse)
}
func (mq *MemoryQueue) buildGroupResponse(coordinator *Coordinator) GroupResponse {
return GroupResponse{
Tasks: coordinator.Tasks,
Group: coordinator.Group,
Members: coordinator.AllMembers(),
LeaderID: coordinator.getLeaderID(),
Generation: coordinator.Generation,
Coordinator: coordinator,
}
}
複製代碼
// Coordinator 協調器
type Coordinator struct {
Group string
Generation int
Members map[string]*Metadata
Tasks []string
Heartbeats map[string]int64
}
複製代碼
Coordinator內部經過Members信息,來存儲各個worker節點的元數據信息, 而後Tasks存儲當前group的全部任務, Heartbeats存儲workerd額心跳信息, Generation是一個分代計數器,每次節點變化都會遞增併發
經過存儲的worker的metadata信息,來進行主節點的選舉app
// getLeaderID 根據當前信息獲取leader節點
func (c *Coordinator) getLeaderID() string {
leaderID, maxOffset := "", 0
// 這裏是經過offset大小來斷定,offset大的就是leader, 實際上可能會更加複雜一些
for wid, metadata := range c.Members {
if leaderID == "" || metadata.offset() > maxOffset {
leaderID = wid
maxOffset = metadata.offset()
}
}
return leaderID
}
複製代碼
// Worker 工做者
type Worker struct {
ID string
Group string
Tasks string
done chan struct{}
queue *MemoryQueue
Coordinator *Coordinator
}
複製代碼
worker節點會包含一個coordinator信息,用於後續向該節點進行心跳信息的發送負載均衡
worker接收到不一樣的事件類型,根據類型來進行處理, 其中handleGroupResponse負責接收到服務端Coordinator響應的信息,裏面會包含leader節點和任務信息,由worker 來進行二級分配, handleAssign則是處理分配完後的任務信息分佈式
// Execute 接收到分配的任務進行請求執行
func (w *Worker) Execute(event interface{}) {
switch event.(type) {
case GroupResponse:
response := event.(GroupResponse)
w.handleGroupResponse(&response)
case Assignment:
assign := event.(Assignment)
w.handleAssign(&assign)
}
}
複製代碼
GroupResponse會將節點分割爲兩種:Leader和Follower, Leader節點接收到GroupResponse後須要繼續進行分配任務,而Follower則只須要監聽事件和發送心跳ide
func (w *Worker) handleGroupResponse(response *GroupResponse) {
if w.isLeader(response.LeaderID) {
w.onLeaderJoin(response)
} else {
w.onFollowerJoin(response)
}
}
複製代碼
Follower節點進行心跳發送學習
// onFollowerJoin 當前角色是follower
func (w *Worker) onFollowerJoin(response *GroupResponse) {
w.Coordinator = response.Coordinator
go w.heartbeat()
}
// heartbeat 發送心跳
func (w *Worker) heartbeat() {
// timer := time.NewTimer(time.Second)
// for {
// select {
// case <-timer.C:
// w.Coordinator.heartbeat(w.ID, time.Now().Unix())
// timer.Reset(time.Second)
// case <-w.done:
// return
// }
// }
}
複製代碼
// onLeaderJoin 當前角色是leader, 執行任務分配併發送mq
func (w *Worker) onLeaderJoin(response *GroupResponse) {
fmt.Printf("Generation [%d] leaderID [%s]\n", response.Generation, w.ID)
w.Coordinator = response.Coordinator
go w.heartbeat()
// 進行任務分片
taskSlice := w.performAssign(response)
// 將任務分配給各個worker
memerTasks, index := make(map[string][]string), 0
for _, name := range response.Members {
memerTasks[name] = taskSlice[index]
index++
}
// 分發請求
assign := Assignment{LeaderID: w.ID, Generation: response.Generation, result: memerTasks}
w.queue.send(assign)
}
// performAssign 根據當前成員和任務數
func (w *Worker) performAssign(response *GroupResponse) [][]string {
perWorker := len(response.Tasks) / len(response.Members)
leftOver := len(response.Tasks) - len(response.Members)*perWorker
result := make([][]string, len(response.Members))
taskIndex, memberTaskCount := 0, 0
for index := range result {
if index < leftOver {
memberTaskCount = perWorker + 1
} else {
memberTaskCount = perWorker
}
for i := 0; i < memberTaskCount; i++ {
result[index] = append(result[index], response.Tasks[taskIndex])
taskIndex++
}
}
複製代碼
啓動一個隊列,而後加入任務和worker,觀察分配結果
// 構建隊列
queue := NewMemoryQueue(10)
queue.Start()
// 發送任務
queue.send(Task{Name: "test1", Group: "test"})
queue.send(Task{Name: "test2", Group: "test"})
queue.send(Task{Name: "test3", Group: "test"})
queue.send(Task{Name: "test4", Group: "test"})
queue.send(Task{Name: "test5", Group: "test"})
// 啓動worker, 爲每一個worker分配不一樣的offset觀察是否能將leader正常分配
workerOne := NewWorker("test-1", "test", queue)
workerOne.start(1)
queue.addWorker(workerOne.ID, workerOne)
workerTwo := NewWorker("test-2", "test", queue)
workerTwo.start(2)
queue.addWorker(workerTwo.ID, workerTwo)
workerThree := NewWorker("test-3", "test", queue)
workerThree.start(3)
queue.addWorker(workerThree.ID, workerThree)
time.Sleep(time.Second)
workerThree.stop()
time.Sleep(time.Second)
workerTwo.stop()
time.Sleep(time.Second)
workerOne.stop()
queue.Stop()
複製代碼
運行結果: 首先根據offset, 最終test-3位Leader, 而後查看任務分配結果, 有兩個節點2個任務,一個節點一個任務, 而後隨着worker的退出,又會進行任務的從新分配
Generation [1] leaderID [test-1]
Generation [2] leaderID [test-2]
Generation [3] leaderID [test-3]
Generation [1] worker [test-1] run tasks: [test1||test2||test3||test4||test5]
Generation [1] worker [test-2] run tasks: []
Generation [1] worker [test-3] run tasks: []
Generation [2] worker [test-1] run tasks: [test1||test2||test3]
Generation [2] worker [test-2] run tasks: [test4||test5]
Generation [2] worker [test-3] run tasks: []
Generation [3] worker [test-1] run tasks: [test1||test2]
Generation [3] worker [test-2] run tasks: [test3||test4]
Generation [3] worker [test-3] run tasks: [test5]
Generation [4] leaderID [test-2]
Generation [4] worker [test-1] run tasks: [test1||test2||test3]
Generation [4] worker [test-2] run tasks: [test4||test5]
Generation [5] leaderID [test-1]
Generation [5] worker [test-1] run tasks: [test1||test2||test3||test4||test5]
複製代碼
其實在分佈式場景中,這種Leader/Follower選舉,其實更多的是會選擇基於AP模型的consul、etcd、zk等, 本文的這種設計,與kafka自身的業務場景由很大的關係, 後續有時間,仍是繼續看看別的設計, 從kafka connet借鑑的設計,就到這了
未完待續 關注公共號: 布衣碼農