In-memory Channel是當前Knative Eventing中默認的Channel, 也是通常剛接觸Knative Eventing首先了解到的Channel。本文經過分析 In-memory Channel 來進一步瞭解 Knative Eventing 中Broker/Trigger事件處理機制。異步
咱們先總體看一下Knative Eventing 工做機制示意圖:性能
經過 namespace 建立默認 Broker 若是不指定Channel,會使用默認的 Inmemory Channel。spa
下面咱們從數據平面開始分析Event事件是如何經過In-memory Channel分發到Knative Service設計
Ingress是事件進入Channel前的第一級過濾,但目前的功能僅僅是接收事件而後轉發到Channel。過濾功能處理TODO狀態。3d
func (h *handler) serveHTTP(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error { tctx := cloudevents.HTTPTransportContextFrom(ctx) if tctx.Method != http.MethodPost { resp.Status = http.StatusMethodNotAllowed return nil } // tctx.URI is actually the path... if tctx.URI != "/" { resp.Status = http.StatusNotFound return nil } ctx, _ = tag.New(ctx, tag.Insert(TagBroker, h.brokerName)) defer func() { stats.Record(ctx, MeasureEventsTotal.M(1)) }() send := h.decrementTTL(&event) if !send { ctx, _ = tag.New(ctx, tag.Insert(TagResult, "droppedDueToTTL")) return nil } // TODO Filter. ctx, _ = tag.New(ctx, tag.Insert(TagResult, "dispatched")) return h.sendEvent(ctx, tctx, event) }
Broker 字面意思爲代理者,那麼它代理的是誰呢?是Channel。爲何要代理Channel呢,而不直接發給訪問Channel。這個其實涉及到Broker/Trigger設計的初衷:對事件過濾處理。咱們知道Channel(消息通道)負責事件傳遞,Subscription(訂閱)負責訂閱事件,一般這兩者的模型以下:代理
這裏就涉及到消息隊列和訂閱分發的實現。那麼在In-memory Channel中如何實現的呢?
其實 In-memory 的核心處理在Fanout Handler中,它負責將接收到的事件分發到不一樣的 Subscription。
In-memory Channel處理示意圖:code
事件接收並分發核心代碼以下:blog
func createReceiverFunction(f *Handler) func(provisioners.ChannelReference, *provisioners.Message) error { return func(_ provisioners.ChannelReference, m *provisioners.Message) error { if f.config.AsyncHandler { go func() { // Any returned error is already logged in f.dispatch(). _ = f.dispatch(m) }() return nil } return f.dispatch(m) } }
當前分發機制默認是異步機制(可經過AsyncHandler參數控制分發機制)。隊列
消息分發機制:事件
// dispatch takes the request, fans it out to each subscription in f.config. If all the fanned out // requests return successfully, then return nil. Else, return an error. func (f *Handler) dispatch(msg *provisioners.Message) error { errorCh := make(chan error, len(f.config.Subscriptions)) for _, sub := range f.config.Subscriptions { go func(s eventingduck.SubscriberSpec) { errorCh <- f.makeFanoutRequest(*msg, s) }(sub) } for range f.config.Subscriptions { select { case err := <-errorCh: if err != nil { f.logger.Error("Fanout had an error", zap.Error(err)) return err } case <-time.After(f.timeout): f.logger.Error("Fanout timed out") return errors.New("fanout timed out") } } // All Subscriptions returned err = nil. return nil }
經過這裏的代碼,咱們能夠看到分發處理超時機制。默認爲60s。也就是說若是分發的請求響應超過60s,那麼In-memory會報錯:Fanout timed out。
通常的消息分發會將消息發送給訂閱的服務,但在 Broker/Trigger 模型中須要對事件進行過濾處理,這個處理的地方就是在Filter 中。
其中過濾規則處理代碼以下:
func (r *Receiver) shouldSendMessage(ctx context.Context, ts *eventingv1alpha1.TriggerSpec, event *cloudevents.Event) bool { if ts.Filter == nil || ts.Filter.SourceAndType == nil { r.logger.Error("No filter specified") ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "empty-fail")) return false } // Record event count and filtering time startTS := time.Now() defer func() { filterTimeMS := int64(time.Now().Sub(startTS) / time.Millisecond) stats.Record(ctx, MeasureTriggerFilterTime.M(filterTimeMS)) }() filterType := ts.Filter.SourceAndType.Type if filterType != eventingv1alpha1.TriggerAnyFilter && filterType != event.Type() { r.logger.Debug("Wrong type", zap.String("trigger.spec.filter.sourceAndType.type", filterType), zap.String("event.Type()", event.Type())) ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "fail")) return false } filterSource := ts.Filter.SourceAndType.Source s := event.Context.AsV01().Source actualSource := s.String() if filterSource != eventingv1alpha1.TriggerAnyFilter && filterSource != actualSource { r.logger.Debug("Wrong source", zap.String("trigger.spec.filter.sourceAndType.source", filterSource), zap.String("message.source", actualSource)) ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "fail")) return false } ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "pass")) return true }
當前的機制是全部的訂閱事件都會經過 Filter 集中進行事件過濾,若是一個Broker有大量的訂閱Trigger,是否會給Filter帶來性能上的壓力? 這個在實際場景 Broker/Trigger 的運用中須要考慮到這個問題。
做爲內置的默認Channel實現,In-memory 能夠說很好的完成了事件接收並轉發的使命,而且 Knative Eventing 在接下來的迭代中會支持部署時指定設置默認的Channel。有興趣的同窗能夠持續關注一下。
原文連接 本文爲雲棲社區原創內容,未經容許不得轉載。