Knative源碼閱讀:eventing的mt-broker-ingress源碼走讀

本文基於knative/eventing v.0.23.0git

我的博客github

mt-broker-ingress是eventing請求的入口,主要功能是監聽端口,獲取事件,將事件轉發給Channel。主要方法在pkg/broker/ingress/ingress_handler.go:golang

func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request)
複製代碼
  • 首先是請求的校驗:markdown

    // validate request method
        if request.Method != http.MethodPost {
            h.Logger.Warn("unexpected request method", zap.String("method", request.Method))
            writer.WriteHeader(http.StatusMethodNotAllowed)
            return
        }
    
        // validate request URI
        if request.RequestURI == "/" {
            writer.WriteHeader(http.StatusNotFound)
            return
        }
    複製代碼
  • 從請求的url中獲取Broker的信息oop

    nsBrokerName := strings.Split(request.RequestURI, "/")
        if len(nsBrokerName) != 3 {
            h.Logger.Info("Malformed uri", zap.String("URI", request.RequestURI))
            writer.WriteHeader(http.StatusBadRequest)
            return
        }
    複製代碼
  • 將req轉爲message,再將message轉爲Cloud Event格式的event並校驗ui

    ctx := request.Context()
    
        message := cehttp.NewMessageFromHttpRequest(request)
        defer message.Finish(nil)
    
        event, err := binding.ToEvent(ctx, message)
        if err != nil {
            h.Logger.Warn("failed to extract event from request", zap.Error(err))
            writer.WriteHeader(http.StatusBadRequest)
            return
        }
    
        // run validation for the extracted event
        validationErr := event.Validate()
        if validationErr != nil {
            h.Logger.Warn("failed to validate extractd event", zap.Error(validationErr))
            writer.WriteHeader(http.StatusBadRequest)
            return
        }
    複製代碼
  • 打印trace信息url

    ctx, span := trace.StartSpan(ctx, tracing.BrokerMessagingDestination(brokerNamespacedName))
      defer span.End()
    
      if span.IsRecordingEvents() {
      	span.AddAttributes(
      		tracing.MessagingSystemAttribute,
      		tracing.MessagingProtocolHTTP,
      		tracing.BrokerMessagingDestinationAttribute(brokerNamespacedName),
      		tracing.MessagingMessageIDAttribute(event.ID()),
      	)
      	span.AddAttributes(opencensusclient.EventTraceAttributes(event)...)
      }
    複製代碼
  • 從Broker的status.annotations.knative.dev/channelAddress中獲取Channel地址,將event發送給Channelspa

    statusCode, dispatchTime := h.receive(ctx, request.Header, event, brokerNamespace, brokerName)
    複製代碼
相關文章
相關標籤/搜索