本文主要研究一下eventhorizon的EventBusgit
eventhorizon/eventbus.gogithub
type EventBus interface { EventHandler // AddHandler adds a handler for an event. Returns an error if either the // matcher or handler is nil, the handler is already added or there was some // other problem adding the handler (for networked handlers for example). AddHandler(context.Context, EventMatcher, EventHandler) error // Errors returns an error channel where async handling errors are sent. Errors() <-chan EventBusError // Wait wait for all handlers to be cancelled by their context. Wait() } type EventHandler interface { // HandlerType is the type of the handler. HandlerType() EventHandlerType // HandleEvent handles an event. HandleEvent(context.Context, Event) error } type EventMatcher interface { // Match returns true if the matcher matches an event. Match(Event) bool }
EventBus接口內嵌了EventHandler接口,定義了AddHandler、Errors、Wait方法
eventhorizon/eventbus/local/eventbus.goasync
type EventBus struct { group *Group registered map[eh.EventHandlerType]struct{} registeredMu sync.RWMutex errCh chan eh.EventBusError wg sync.WaitGroup codec eh.EventCodec } // HandleEvent implements the HandleEvent method of the eventhorizon.EventHandler interface. func (b *EventBus) HandleEvent(ctx context.Context, event eh.Event) error { data, err := b.codec.MarshalEvent(ctx, event) if err != nil { return fmt.Errorf("could not marshal event: %w", err) } return b.group.publish(ctx, data) }
EventBus定義了group、registered、registeredMu、errCh、wg、codec屬性;HandleEvent方法先序列化event,而後經過group.publish發佈event
eventhorizon/eventbus/local/eventbus.gooop
type Group struct { bus map[string]chan []byte busMu sync.RWMutex } // NewGroup creates a Group. func NewGroup() *Group { return &Group{ bus: map[string]chan []byte{}, } } func (g *Group) publish(ctx context.Context, b []byte) error { g.busMu.RLock() defer g.busMu.RUnlock() for _, ch := range g.bus { // Marshal and unmarshal the context to both simulate only sending data // that would be sent over a network bus and also break any relationship // with the old context. select { case ch <- b: default: log.Printf("eventhorizon: publish queue full in local event bus") } } return nil } // Handles all events coming in on the channel. func (b *EventBus) handle(ctx context.Context, m eh.EventMatcher, h eh.EventHandler, ch <-chan []byte) { defer b.wg.Done() for { select { case data := <-ch: // Artificial delay to simulate network. time.Sleep(10 * time.Millisecond) event, ctx, err := b.codec.UnmarshalEvent(ctx, data) if err != nil { err = fmt.Errorf("could not unmarshal event: %w", err) select { case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx}: default: log.Printf("eventhorizon: missed error in local event bus: %s", err) } return } // Ignore non-matching events. if !m.Match(event) { continue } // Handle the event if it did match. if err := h.HandleEvent(ctx, event); err != nil { err = fmt.Errorf("could not handle event (%s): %s", h.HandlerType(), err.Error()) select { case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx, Event: event}: default: log.Printf("eventhorizon: missed error in local event bus: %s", err) } } case <-ctx.Done(): return } } }
Group的publish方法遍歷bus的channel,經過select寫入event;handle方法循環select讀取event,而後經過m.Match(event)判斷是符合,是的話執行h.HandleEvent
eventhorizon的EventBus接口內嵌了EventHandler接口,定義了AddHandler、Errors、Wait方法。code