Kafka內部爲了協調內部的consumer和kafka connector的工做實現了一個複製協議, 主要工做分爲兩個步驟:算法
從功能實現上兩級調度,一級調度負責將Leader選舉,二級調度則是worker節點完成每一個成員的任務的分配bash
主要是學習這種架構設計思想,雖然這種方案場景很是有限數據結構
一級協調器設計:一級協調器主要是指的Coordinator部分,經過記錄成員的元數據信息,來進行Leader選舉,好比根據offset的大小來決定誰是Leader 二級協調器設計:二級協調器主要是指的Leader任務分配部分, worker節點獲取到全部的任務和節點信息,就能夠根據合適的算法來進行任務的分配,最終廣播到消息隊列架構
值得咱們學習的地方, 一般在kafka這種場景下,若是要針對不一樣的業務實現統一調度,仍是蠻麻煩的, 因此好比將具體任務的分配工做從架構中遷移出去, 在broker端只負責通用層的Leader選舉便可, 將具體業務的分配工做,從主業務架構分離出去,由具體業務去實現併發
根據設計,咱們抽象出: MemoryQueue、Worker、 Coordinator、GroupRequest、GroupResponse、Task、Assignment集合核心組件app
MemoryQueue: 模擬消息隊列實現消息的分發,充當kafka broker角色 Worker: 任務執行和具體業務二級協調算法 Coordinator: 位於消息隊列內部的一個協調器,用於Leader/Follower選舉 Task: 任務 Assignment: Coordnator根據任務信息和節點信息構建的任務分配結果 GroupRequest: 加入集羣請求 GroupResponse: 響應信息負載均衡
// MemoryQueue 內存消息隊列 type MemoryQueue struct { done chan struct{} queue chan interface{} wg sync.WaitGroup coordinator map[string]*Coordinator worker map[string]*Worker }
其中coordinator用於標識每一個Group組的協調器,爲每一個組都創建一個分配器分佈式
MemoryQueue 接收事件類型,而後根據事件類型進行分發,若是是GroupRequest事件,則分發給handleGroupRequest進行處理 handleGroupRequest內部先獲取對應group的coordinator,而後根據當前信息buildGroupResponse發回消息隊列ide
func (mq *MemoryQueue) handleEvent(event interface{}) { switch event.(type) { case GroupRequest: request := event.(GroupRequest) mq.handleGroupRequest(&request) case Task: task := event.(Task) mq.handleTask(&task) default: mq.Notify(event) } mq.wg.Done() }
其中Coordnator會調用本身的getLeaderID方法,來根據當前組內的各成員的信息來選舉一個Leader節點學習
// getGroupCoordinator 獲取指定組的協調器 func (mq *MemoryQueue) getGroupCoordinator(group string) *Coordinator { coordinator, ok := mq.coordinator[group] if ok { return coordinator } coordinator = NewCoordinator(group) mq.coordinator[group] = coordinator return coordinator } func (mq *MemoryQueue) handleGroupRequest(request *GroupRequest) { coordinator := mq.getGroupCoordinator(request.Group) exist := coordinator.addMember(request.ID, &request.Metadata) // 若是worker以前已經加入該組, 就不作任何操做 if exist { return } // 從新構建請求信息 groupResponse := mq.buildGroupResponse(coordinator) mq.send(groupResponse) } func (mq *MemoryQueue) buildGroupResponse(coordinator *Coordinator) GroupResponse { return GroupResponse{ Tasks: coordinator.Tasks, Group: coordinator.Group, Members: coordinator.AllMembers(), LeaderID: coordinator.getLeaderID(), Generation: coordinator.Generation, Coordinator: coordinator, } }
// Coordinator 協調器 type Coordinator struct { Group string Generation int Members map[string]*Metadata Tasks []string Heartbeats map[string]int64 }
Coordinator內部經過Members信息,來存儲各個worker節點的元數據信息, 而後Tasks存儲當前group的全部任務, Heartbeats存儲workerd額心跳信息, Generation是一個分代計數器,每次節點變化都會遞增
經過存儲的worker的metadata信息,來進行主節點的選舉
// getLeaderID 根據當前信息獲取leader節點 func (c *Coordinator) getLeaderID() string { leaderID, maxOffset := "", 0 // 這裏是經過offset大小來斷定,offset大的就是leader, 實際上可能會更加複雜一些 for wid, metadata := range c.Members { if leaderID == "" || metadata.offset() > maxOffset { leaderID = wid maxOffset = metadata.offset() } } return leaderID }
// Worker 工做者 type Worker struct { ID string Group string Tasks string done chan struct{} queue *MemoryQueue Coordinator *Coordinator }
worker節點會包含一個coordinator信息,用於後續向該節點進行心跳信息的發送
worker接收到不一樣的事件類型,根據類型來進行處理, 其中handleGroupResponse負責接收到服務端Coordinator響應的信息,裏面會包含leader節點和任務信息,由worker 來進行二級分配, handleAssign則是處理分配完後的任務信息
// Execute 接收到分配的任務進行請求執行 func (w *Worker) Execute(event interface{}) { switch event.(type) { case GroupResponse: response := event.(GroupResponse) w.handleGroupResponse(&response) case Assignment: assign := event.(Assignment) w.handleAssign(&assign) } }
GroupResponse會將節點分割爲兩種:Leader和Follower, Leader節點接收到GroupResponse後須要繼續進行分配任務,而Follower則只須要監聽事件和發送心跳
func (w *Worker) handleGroupResponse(response *GroupResponse) { if w.isLeader(response.LeaderID) { w.onLeaderJoin(response) } else { w.onFollowerJoin(response) } }
Follower節點進行心跳發送
// onFollowerJoin 當前角色是follower func (w *Worker) onFollowerJoin(response *GroupResponse) { w.Coordinator = response.Coordinator go w.heartbeat() } // heartbeat 發送心跳 func (w *Worker) heartbeat() { // timer := time.NewTimer(time.Second) // for { // select { // case <-timer.C: // w.Coordinator.heartbeat(w.ID, time.Now().Unix()) // timer.Reset(time.Second) // case <-w.done: // return // } // } }
Leader節點這個地方我將調度分配分爲兩個步驟: 1)經過節點數和任務數將任務進行分片 2)將分片後的任務分配給各個節點,最終發送回隊列
// onLeaderJoin 當前角色是leader, 執行任務分配併發送mq func (w *Worker) onLeaderJoin(response *GroupResponse) { fmt.Printf("Generation [%d] leaderID [%s]\n", response.Generation, w.ID) w.Coordinator = response.Coordinator go w.heartbeat() // 進行任務分片 taskSlice := w.performAssign(response) // 將任務分配給各個worker memerTasks, index := make(map[string][]string), 0 for _, name := range response.Members { memerTasks[name] = taskSlice[index] index++ } // 分發請求 assign := Assignment{LeaderID: w.ID, Generation: response.Generation, result: memerTasks} w.queue.send(assign) } // performAssign 根據當前成員和任務數 func (w *Worker) performAssign(response *GroupResponse) [][]string { perWorker := len(response.Tasks) / len(response.Members) leftOver := len(response.Tasks) - len(response.Members)*perWorker result := make([][]string, len(response.Members)) taskIndex, memberTaskCount := 0, 0 for index := range result { if index < leftOver { memberTaskCount = perWorker + 1 } else { memberTaskCount = perWorker } for i := 0; i < memberTaskCount; i++ { result[index] = append(result[index], response.Tasks[taskIndex]) taskIndex++ } }
啓動一個隊列,而後加入任務和worker,觀察分配結果
// 構建隊列 queue := NewMemoryQueue(10) queue.Start() // 發送任務 queue.send(Task{Name: "test1", Group: "test"}) queue.send(Task{Name: "test2", Group: "test"}) queue.send(Task{Name: "test3", Group: "test"}) queue.send(Task{Name: "test4", Group: "test"}) queue.send(Task{Name: "test5", Group: "test"}) // 啓動worker, 爲每一個worker分配不一樣的offset觀察是否能將leader正常分配 workerOne := NewWorker("test-1", "test", queue) workerOne.start(1) queue.addWorker(workerOne.ID, workerOne) workerTwo := NewWorker("test-2", "test", queue) workerTwo.start(2) queue.addWorker(workerTwo.ID, workerTwo) workerThree := NewWorker("test-3", "test", queue) workerThree.start(3) queue.addWorker(workerThree.ID, workerThree) time.Sleep(time.Second) workerThree.stop() time.Sleep(time.Second) workerTwo.stop() time.Sleep(time.Second) workerOne.stop() queue.Stop()
運行結果: 首先根據offset, 最終test-3位Leader, 而後查看任務分配結果, 有兩個節點2個任務,一個節點一個任務, 而後隨着worker的退出,又會進行任務的從新分配
Generation [1] leaderID [test-1] Generation [2] leaderID [test-2] Generation [3] leaderID [test-3] Generation [1] worker [test-1] run tasks: [test1||test2||test3||test4||test5] Generation [1] worker [test-2] run tasks: [] Generation [1] worker [test-3] run tasks: [] Generation [2] worker [test-1] run tasks: [test1||test2||test3] Generation [2] worker [test-2] run tasks: [test4||test5] Generation [2] worker [test-3] run tasks: [] Generation [3] worker [test-1] run tasks: [test1||test2] Generation [3] worker [test-2] run tasks: [test3||test4] Generation [3] worker [test-3] run tasks: [test5] Generation [4] leaderID [test-2] Generation [4] worker [test-1] run tasks: [test1||test2||test3] Generation [4] worker [test-2] run tasks: [test4||test5] Generation [5] leaderID [test-1] Generation [5] worker [test-1] run tasks: [test1||test2||test3||test4||test5]
其實在分佈式場景中,這種Leader/Follower選舉,其實更多的是會選擇基於AP模型的consul、etcd、zk等, 本文的這種設計,與kafka自身的業務場景由很大的關係, 後續有時間,仍是繼續看看別的設計, 從kafka connet借鑑的設計,就到這了
未完待續 關注公共號: 布衣碼農
更多精彩內容能夠查看www.sreguide.com