Event: 用於標識事件,用戶將業務數據封裝成事件存入到MemoryQueue中 EventListener: 事件回調接口,用於MemoryQueue接收到數據後的回調 事件在發送的時候,須要經過一個前綴來進行事件類型標識,這裏有三種TaskPrefix、CommitTaskPrefix、ClearTaskPrefixbash
const (
// TaskPrefix 任務key前綴
TaskPrefix string = "task-"
// CommitTaskPrefix 提交任務key前綴
CommitTaskPrefix string = "commit-"
// ClearTaskPrefix 清除任務
ClearTaskPrefix string = "clear-"
)
// Event 事件類型
type Event struct {
Key string
Name string
Value interface{}
}
// EventListener 用於接收消息回調
type EventListener interface {
onEvent(event *Event)
}
複製代碼
MemoryQueue內存消息隊列,經過Push接口接收用戶數據,經過AddListener來註冊EventListener, 同時內部經過poll來從chan event取出數據分發給全部的Listener架構
// MemoryQueue 內存消息隊列
type MemoryQueue struct {
done chan struct{}
queue chan Event
listeners []EventListener
wg sync.WaitGroup
}
// Push 添加數據
func (mq *MemoryQueue) Push(eventType, name string, value interface{}) {
mq.queue <- Event{Key: eventType + name, Name: name, Value: value}
mq.wg.Add(1)
}
// AddListener 添加監聽器
func (mq *MemoryQueue) AddListener(listener EventListener) bool {
for _, item := range mq.listeners {
if item == listener {
return false
}
}
mq.listeners = append(mq.listeners, listener)
return true
}
// Notify 分發消息
func (mq *MemoryQueue) Notify(event *Event) {
defer mq.wg.Done()
for _, listener := range mq.listeners {
listener.onEvent(event)
}
}
func (mq *MemoryQueue) poll() {
for {
select {
case <-mq.done:
break
case event := <-mq.queue:
mq.Notify(&event)
}
}
}
// Start 啓動內存隊列
func (mq *MemoryQueue) Start() {
go mq.poll()
}
// Stop 中止內存隊列
func (mq *MemoryQueue) Stop() {
mq.wg.Wait()
close(mq.done)
}
複製代碼
Worker接收MemoryQueue裏面的數據,而後在本地根據不一樣類型來進行對應事件事件類型處理, 主要是經過事件的前綴來進行對應事件回調函數的選擇app
// Worker 工做進程
type Worker struct {
name string
deferredTaskUpdates map[string][]Task
onCommit ConfigUpdateCallback
}
func (w *Worker) onEvent(event *Event) {
switch {
// 獲取任務事件
case strings.Contains(event.Key, TaskPrefix):
w.onTaskEvent(event)
// 清除本地延遲隊列裏面的任務
case strings.Contains(event.Key, ClearTaskPrefix):
w.onTaskClear(event)
// 獲取commit事件
case strings.Contains(event.Key, CommitTaskPrefix):
w.onTaskCommit(event)
}
}
複製代碼
事件處理任務主要分爲:onTaskClear(從本地清楚該數據)、onTaskEvent(數據存儲本地延遲存儲進行暫存)、onTaskCommit(事務提交)分佈式
func (w *Worker) onTaskClear(event *Event) {
task, err := event.Value.(Task)
if !err {
// log
return
}
_, found := w.deferredTaskUpdates[task.Group]
if !found {
return
}
delete(w.deferredTaskUpdates, task.Group)
// 還能夠繼續中止本地已經啓動的任務
}
// onTaskCommit 接收任務提交, 從延遲隊列中取出數據而後進行業務邏輯處理
func (w *Worker) onTaskCommit(event *Event) {
// 獲取以前本地接收的全部任務
tasks, found := w.deferredTaskUpdates[event.Name]
if !found {
return
}
// 獲取配置
config := w.getTasksConfig(tasks)
if w.onCommit != nil {
w.onCommit(config)
}
delete(w.deferredTaskUpdates, event.Name)
}
// onTaskEvent 接收任務數據,此時須要丟到本地暫存不能進行應用
func (w *Worker) onTaskEvent(event *Event) {
task, err := event.Value.(Task)
if !err {
// log
return
}
// 保存任務到延遲更新map
configs, found := w.deferredTaskUpdates[task.Group]
if !found {
configs = make([]Task, 0)
}
configs = append(configs, task)
w.deferredTaskUpdates[task.Group] = configs
}
// getTasksConfig 獲取task任務列表
func (w *Worker) getTasksConfig(tasks []Task) map[string]string {
config := make(map[string]string)
for _, t := range tasks {
config = t.updateConfig(config)
}
return config
}
複製代碼
unc main() {
// 生成一個內存隊列啓動
queue := NewMemoryQueue(10)
queue.Start()
// 生成一個worker
name := "test"
worker := NewWorker(name, func(data map[string]string) {
for key, value := range data {
println("worker get task key: " + key + " value: " + value)
}
})
// 註冊到隊列中
queue.AddListener(worker)
taskName := "test"
// events 發送的任務事件
configs := []map[string]string{
map[string]string{"task1": "SendEmail", "params1": "Hello world"},
map[string]string{"task2": "SendMQ", "params2": "Hello world"},
}
// 分發任務
queue.Push(ClearTaskPrefix, taskName, nil)
for _, conf := range configs {
queue.Push(TaskPrefix, taskName, Task{Name: taskName, Group: taskName, Config: conf})
}
queue.Push(CommitTaskPrefix, taskName, nil)
// 中止隊列
queue.Stop()
}
複製代碼
輸出ide
# go run main.go
worker get task key: params1 value: Hello world
worker get task key: task1 value: SendEmail
worker get task key: params2 value: Hello world
worker get task key: task2 value: SendMQ
複製代碼
在分佈式環境中,不少時候並不須要使用CP模型,更多時候是知足最終一致性便可函數
基於2PC和延遲隊列的這種設計,主要是依賴於事件驅動的架構ui
在kafka connect中, 每次節點變化都會觸發一次任務的重分配,因此延遲存儲直接用的就是內存中的HashMap, 由於即便分配消息的主節點掛了,那就再觸發一次事件,直接將HashMap裏面的數據清掉,進行下一次事務便可,並不須要保證延遲存儲裏面的數據不丟,spa
因此方案因環境、需求不一樣,能夠作一些取捨,不必什麼東西都去加一個CP模型的中間件進來,固然其實那樣更簡單設計
未完待續!3d
更多文章能夠訪問www.sreguide.com/