在消息隊列使用場景中,有時須要同時下發多條消息,但如今的消息隊列好比kafka只支持單條消息的事務保證,不能保證多條消息,今天說的這個方案就時kafka內部的一個子項目中基於2PC和延遲更新來實現分佈式事務bash
2PC俗稱兩階段提交,經過將一個操做分爲兩個階段:準備階段和提交階段來儘量保證操做的原子執行(實際上不可能,你們有個概念先)架構
延遲更新實際上是一個很經常使用的技術手段,簡單來講,當某個操做條件不知足時,經過必定手段將數據暫存,等條件知足時在進行執行app
實現也蠻簡單的, 在原來的業務消息以後再添加一條事務消息(事務消息能夠經過相似惟一ID來關聯到以前提交的消息), worker未消費到事物提交的消息,就會一直將消息放在本地延遲存儲中,只有當接收到事物提交消息,纔會進行業務邏輯處理分佈式
MemoryQuue: 用於模擬消息隊列,接收事件分發事件 Worker: 模擬具體業務服務,接收消息,存入本地延遲更新存儲,或者提交事務觸發業務回調ide
Event: 用於標識事件,用戶將業務數據封裝成事件存入到MemoryQueue中 EventListener: 事件回調接口,用於MemoryQueue接收到數據後的回調 事件在發送的時候,須要經過一個前綴來進行事件類型標識,這裏有三種TaskPrefix、CommitTaskPrefix、ClearTaskPrefix函數
const ( // TaskPrefix 任務key前綴 TaskPrefix string = "task-" // CommitTaskPrefix 提交任務key前綴 CommitTaskPrefix string = "commit-" // ClearTaskPrefix 清除任務 ClearTaskPrefix string = "clear-" ) // Event 事件類型 type Event struct { Key string Name string Value interface{} } // EventListener 用於接收消息回調 type EventListener interface { onEvent(event *Event) }
MemoryQueue內存消息隊列,經過Push接口接收用戶數據,經過AddListener來註冊EventListener, 同時內部經過poll來從chan event取出數據分發給全部的Listenerui
// MemoryQueue 內存消息隊列 type MemoryQueue struct { done chan struct{} queue chan Event listeners []EventListener wg sync.WaitGroup } // Push 添加數據 func (mq *MemoryQueue) Push(eventType, name string, value interface{}) { mq.queue <- Event{Key: eventType + name, Name: name, Value: value} mq.wg.Add(1) } // AddListener 添加監聽器 func (mq *MemoryQueue) AddListener(listener EventListener) bool { for _, item := range mq.listeners { if item == listener { return false } } mq.listeners = append(mq.listeners, listener) return true } // Notify 分發消息 func (mq *MemoryQueue) Notify(event *Event) { defer mq.wg.Done() for _, listener := range mq.listeners { listener.onEvent(event) } } func (mq *MemoryQueue) poll() { for { select { case <-mq.done: break case event := <-mq.queue: mq.Notify(&event) } } } // Start 啓動內存隊列 func (mq *MemoryQueue) Start() { go mq.poll() } // Stop 中止內存隊列 func (mq *MemoryQueue) Stop() { mq.wg.Wait() close(mq.done) }
Worker接收MemoryQueue裏面的數據,而後在本地根據不一樣類型來進行對應事件事件類型處理, 主要是經過事件的前綴來進行對應事件回調函數的選擇設計
// Worker 工做進程 type Worker struct { name string deferredTaskUpdates map[string][]Task onCommit ConfigUpdateCallback } func (w *Worker) onEvent(event *Event) { switch { // 獲取任務事件 case strings.Contains(event.Key, TaskPrefix): w.onTaskEvent(event) // 清除本地延遲隊列裏面的任務 case strings.Contains(event.Key, ClearTaskPrefix): w.onTaskClear(event) // 獲取commit事件 case strings.Contains(event.Key, CommitTaskPrefix): w.onTaskCommit(event) } }
事件處理任務主要分爲:onTaskClear(從本地清楚該數據)、onTaskEvent(數據存儲本地延遲存儲進行暫存)、onTaskCommit(事務提交)3d
func (w *Worker) onTaskClear(event *Event) { task, err := event.Value.(Task) if !err { // log return } _, found := w.deferredTaskUpdates[task.Group] if !found { return } delete(w.deferredTaskUpdates, task.Group) // 還能夠繼續中止本地已經啓動的任務 } // onTaskCommit 接收任務提交, 從延遲隊列中取出數據而後進行業務邏輯處理 func (w *Worker) onTaskCommit(event *Event) { // 獲取以前本地接收的全部任務 tasks, found := w.deferredTaskUpdates[event.Name] if !found { return } // 獲取配置 config := w.getTasksConfig(tasks) if w.onCommit != nil { w.onCommit(config) } delete(w.deferredTaskUpdates, event.Name) } // onTaskEvent 接收任務數據,此時須要丟到本地暫存不能進行應用 func (w *Worker) onTaskEvent(event *Event) { task, err := event.Value.(Task) if !err { // log return } // 保存任務到延遲更新map configs, found := w.deferredTaskUpdates[task.Group] if !found { configs = make([]Task, 0) } configs = append(configs, task) w.deferredTaskUpdates[task.Group] = configs } // getTasksConfig 獲取task任務列表 func (w *Worker) getTasksConfig(tasks []Task) map[string]string { config := make(map[string]string) for _, t := range tasks { config = t.updateConfig(config) } return config }
unc main() { // 生成一個內存隊列啓動 queue := NewMemoryQueue(10) queue.Start() // 生成一個worker name := "test" worker := NewWorker(name, func(data map[string]string) { for key, value := range data { println("worker get task key: " + key + " value: " + value) } }) // 註冊到隊列中 queue.AddListener(worker) taskName := "test" // events 發送的任務事件 configs := []map[string]string{ map[string]string{"task1": "SendEmail", "params1": "Hello world"}, map[string]string{"task2": "SendMQ", "params2": "Hello world"}, } // 分發任務 queue.Push(ClearTaskPrefix, taskName, nil) for _, conf := range configs { queue.Push(TaskPrefix, taskName, Task{Name: taskName, Group: taskName, Config: conf}) } queue.Push(CommitTaskPrefix, taskName, nil) // 中止隊列 queue.Stop() }
輸出code
# go run main.go worker get task key: params1 value: Hello world worker get task key: task1 value: SendEmail worker get task key: params2 value: Hello world worker get task key: task2 value: SendMQ
在分佈式環境中,不少時候並不須要使用CP模型,更多時候是知足最終一致性便可
基於2PC和延遲隊列的這種設計,主要是依賴於事件驅動的架構
在kafka connect中, 每次節點變化都會觸發一次任務的重分配,因此延遲存儲直接用的就是內存中的HashMap, 由於即便分配消息的主節點掛了,那就再觸發一次事件,直接將HashMap裏面的數據清掉,進行下一次事務便可,並不須要保證延遲存儲裏面的數據不丟,
因此方案因環境、需求不一樣,能夠作一些取捨,不必什麼東西都去加一個CP模型的中間件進來,固然其實那樣更簡單
未完待續!
更多文章能夠訪問http://www.sreguide.com/