手擼golang GO與微服務 ES-CQRS模式之1git
最近閱讀 [Go微服務實戰] (劉金亮, 2021.1)
本系列筆記擬採用golang練習之
git地址: https://gitee.com/ioly/learning.gooopgolang
ES(Event Sourcing)事件溯源很是好理解, 指的是將每次的事件都記錄下來, 而不是去記錄對象的狀態。 好比新建、修改等都會做爲事件記錄下來, 當須要最新的狀態時,經過事件的堆疊來計算最新的狀態。 按照事件溯源的模式進行架構設計, 就是事件驅動架構(Event DrivenArchitecture, EDA)。 命令查詢職責分離(CQRS)最先來自Betrand Meyer寫的 Object-OrientedSoftware Construction一書, 指的是命令查詢分離(Command Query Separation,CQS)。 其基本思想是任何一個對象的方法均可以分爲如下兩大類: ▪ 命令(Command):不返回任何結果(void),但會改變對象的狀態。 ▪ 查詢(Query):返回結果,可是不會改變對象的狀態,對系統沒有反作用。 CQRS的核心出發點就是把系統分爲讀和寫兩部分,從而方便分別進行優化。
待辦事宜數值對象json
package todo_app type TodoDTO struct { NO int Title string Content string }
todo事項建立事件架構
package todo_app type TodoCreatedEvent struct { Data *TodoDTO }
todo事項修改事件app
package todo_app type TodoUpdatedEvent struct { Data *TodoDTO }
todo事項刪除事件微服務
package todo_app type TodoRemovedEvent struct { NO int }
事件總線接口oop
package todo_app type EventHandleFunc func(e string, args interface{}) type EventHandler struct { ID string Handler EventHandleFunc } type IEventBus interface { Pub(e string, args interface{}) Sub(e string, id string, handleFunc EventHandleFunc) Unsub(e string, id string) } const EventTodoCreated = "todo.created" const EventTodoUpdated = "todo.updated" const EventTodoRemoved = "todo.removed"
事件序列化到JSON數據的接口優化
package todo_app type iTodoEventSerializer interface { SerializeCreatedEvent(it *TodoCreatedEvent) *tJSONData SerializeUpdatedEvent(it *TodoUpdatedEvent) *tJSONData SerializeRemovedEvent(it *TodoRemovedEvent) *tJSONData }
todo讀取接口架構設計
package todo_app type iTodoReader interface { All() []*TodoDTO }
todo寫入接口設計
package todo_app type iTodoWriter interface { HandleCreated(e *TodoCreatedEvent) HandleUpdated(e *TodoUpdatedEvent) HandleRemoved(e *TodoRemovedEvent) }
json文件讀寫接口
package todo_app type iJSONStore interface { Load() Append(it *tJSONData) }
事件總線的實現
package todo_app import ( "learning/gooop/saga/mqs/logger" "sync" ) type tEventBus struct { rwmutex *sync.RWMutex items map[string][]*EventHandler } func newEventHandler(id string, handleFunc EventHandleFunc) *EventHandler { return &EventHandler{ id, handleFunc, } } func newEventBus() IEventBus { it := new(tEventBus) it.init() return it } func (me *tEventBus) init() { me.rwmutex = new(sync.RWMutex) me.items = make(map[string][]*EventHandler) } func (me *tEventBus) Pub(e string, args interface{}) { me.rwmutex.RLock() defer me.rwmutex.RUnlock() handlers,ok := me.items[e] if ok { for _,it := range handlers { logger.Logf("eventbus.Pub, event=%s, handler=%s", e, it.ID) go it.Handler(e, args) } } } func (me *tEventBus) Sub(e string, id string, handleFunc EventHandleFunc) { me.rwmutex.Lock() defer me.rwmutex.Unlock() handler := newEventHandler(id, handleFunc) handlers,ok := me.items[e] if ok { me.items[e] = append(handlers, handler) } else { me.items[e] = []*EventHandler{handler } } } func (me *tEventBus) Unsub(e string, id string) { me.rwmutex.Lock() defer me.rwmutex.Unlock() handlers,ok := me.items[e] if ok { for i,it := range handlers { if it.ID == id { lastI := len(handlers) - 1 if i != lastI { handlers[i], handlers[lastI] = handlers[lastI], handlers[i] } me.items[e] = handlers[:lastI] } } } } var GlobalEventBus = newEventBus()
事件序列化到JSON的實現
package todo_app type tTodoEventSerializer struct { } func newEventSeiralizer() iTodoEventSerializer { it := new(tTodoEventSerializer) return it } func (me *tTodoEventSerializer) serializeWithTag(tag int, v interface{}) *tJSONData { it := new(tJSONData) err := it.Set(TagCreated, v) if err != nil { return nil } return it } func (me *tTodoEventSerializer) SerializeCreatedEvent(e *TodoCreatedEvent) *tJSONData { return me.serializeWithTag(TagCreated, e) } func (me *tTodoEventSerializer) SerializeUpdatedEvent(e *TodoUpdatedEvent) *tJSONData { return me.serializeWithTag(TagUpdated, e) } func (me *tTodoEventSerializer) SerializeRemovedEvent(e *TodoRemovedEvent) *tJSONData { return me.serializeWithTag(TagRemoved, e) } const TagCreated = 1 const TagUpdated = 2 const TagRemoved = 3 var gDefaultEventSerializer = newEventSeiralizer()
事件寫入器的實現
package todo_app type tTodoWriter struct { } func newTodoWriter() iTodoWriter { it := new(tTodoWriter) it.init() return it } func (me *tTodoWriter) init() { GlobalEventBus.Sub("todo.created", "", me.handleEvent) } func (me *tTodoWriter) handleEvent(e string, args interface{}) { switch e { case EventTodoCreated: if it,ok := args.(*TodoCreatedEvent);ok { me.HandleCreated(it) } break case EventTodoUpdated: if it,ok := args.(*TodoUpdatedEvent);ok { me.HandleUpdated(it) } break case EventTodoRemoved: if it,ok := args.(*TodoRemovedEvent);ok { me.HandleRemoved(it) } break } } func (me *tTodoWriter) HandleCreated(e *TodoCreatedEvent) { j := gDefaultEventSerializer.SerializeCreatedEvent(e) if j != nil { MockJSONStore.Append(j) } } func (me *tTodoWriter) HandleUpdated(e *TodoUpdatedEvent) { j := gDefaultEventSerializer.SerializeUpdatedEvent(e) if j != nil { MockJSONStore.Append(j) } } func (me *tTodoWriter) HandleRemoved(e *TodoRemovedEvent) { j := gDefaultEventSerializer.SerializeRemovedEvent(e) if j != nil { MockJSONStore.Append(j) } }
虛擬的JSON文件讀寫實現
package todo_app import "sync" type tMockJSONStore struct { rwmutex *sync.RWMutex once sync.Once items []*tJSONData } func newMockJSONStore() iJSONStore { it := new(tMockJSONStore) it.init() return it } func (me *tMockJSONStore) init() { me.rwmutex = new(sync.RWMutex) me.items = []*tJSONData{} } func (me *tMockJSONStore) Load() { me.once.Do(func() { me.rwmutex.RLock() defer me.rwmutex.RUnlock() for _,it := range me.items { switch it.Tag { case TagCreated: v := new(TodoCreatedEvent) e := it.Get(v) if e == nil { GlobalEventBus.Pub(EventTodoCreated, e) } break case TagUpdated: v := new(TodoUpdatedEvent) e := it.Get(v) if e == nil { GlobalEventBus.Pub(EventTodoUpdated, e) } break case TagRemoved: v := new(TodoRemovedEvent) e := it.Get(v) if e == nil { GlobalEventBus.Pub(EventTodoRemoved, e) } break } } }) } func (me *tMockJSONStore) Append(it *tJSONData) { me.rwmutex.Lock() defer me.rwmutex.Unlock() me.items = append(me.items, it) } var MockJSONStore = newMockJSONStore()
(未完待續)