本文基於knative/eventing v.0.23.0git
我的博客github
mt-broker-ingress
是eventing請求的入口,主要功能是監聽端口,獲取事件,將事件轉發給Channel。主要方法在pkg/broker/ingress/ingress_handler.go:
golang
func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request)
複製代碼
首先是請求的校驗:markdown
// validate request method
if request.Method != http.MethodPost {
h.Logger.Warn("unexpected request method", zap.String("method", request.Method))
writer.WriteHeader(http.StatusMethodNotAllowed)
return
}
// validate request URI
if request.RequestURI == "/" {
writer.WriteHeader(http.StatusNotFound)
return
}
複製代碼
從請求的url中獲取Broker的信息oop
nsBrokerName := strings.Split(request.RequestURI, "/")
if len(nsBrokerName) != 3 {
h.Logger.Info("Malformed uri", zap.String("URI", request.RequestURI))
writer.WriteHeader(http.StatusBadRequest)
return
}
複製代碼
將req轉爲message,再將message轉爲Cloud Event格式的event並校驗ui
ctx := request.Context()
message := cehttp.NewMessageFromHttpRequest(request)
defer message.Finish(nil)
event, err := binding.ToEvent(ctx, message)
if err != nil {
h.Logger.Warn("failed to extract event from request", zap.Error(err))
writer.WriteHeader(http.StatusBadRequest)
return
}
// run validation for the extracted event
validationErr := event.Validate()
if validationErr != nil {
h.Logger.Warn("failed to validate extractd event", zap.Error(validationErr))
writer.WriteHeader(http.StatusBadRequest)
return
}
複製代碼
打印trace信息url
ctx, span := trace.StartSpan(ctx, tracing.BrokerMessagingDestination(brokerNamespacedName))
defer span.End()
if span.IsRecordingEvents() {
span.AddAttributes(
tracing.MessagingSystemAttribute,
tracing.MessagingProtocolHTTP,
tracing.BrokerMessagingDestinationAttribute(brokerNamespacedName),
tracing.MessagingMessageIDAttribute(event.ID()),
)
span.AddAttributes(opencensusclient.EventTraceAttributes(event)...)
}
複製代碼
從Broker的status.annotations.knative.dev/channelAddress
中獲取Channel地址,將event發送給Channelspa
statusCode, dispatchTime := h.receive(ctx, request.Header, event, brokerNamespace, brokerName)
複製代碼