咱們經過 kubectl describe [資源]
命令,能夠在看到Event輸出,而且常常依賴event進行問題定位,從event中能夠分析整個POD的運行軌跡,爲服務的客觀測性提供數據來源,因而可知,event在Kubernetes中起着舉足輕重的做用。node
event並不僅是kubelet中都有的,關於event的操做被封裝在client-go/tools/record包,咱們徹底能夠在寫入自定義的event。git
其實event也是一個資源對象,而且經過apiserver將event存儲在etcd中,因此咱們也能夠經過 kubectl get event
命令查看對應的event對象。算法
如下是一個event的yaml文件:segmentfault
apiVersion: v1 count: 1 eventTime: null firstTimestamp: "2020-03-02T13:08:22Z" involvedObject: apiVersion: v1 kind: Pod name: example-foo-d75d8587c-xsf64 namespace: default resourceVersion: "429837" uid: ce611c62-6c1a-4bd8-9029-136a1adf7de4 kind: Event lastTimestamp: "2020-03-02T13:08:22Z" message: Pod sandbox changed, it will be killed and re-created. metadata: creationTimestamp: "2020-03-02T13:08:30Z" name: example-foo-d75d8587c-xsf64.15f87ea1df862b64 namespace: default resourceVersion: "479466" selfLink: /api/v1/namespaces/default/events/example-foo-d75d8587c-xsf64.15f87ea1df862b64 uid: 9fe6f72a-341d-4c49-960b-e185982d331a reason: SandboxChanged reportingComponent: "" reportingInstance: "" source: component: kubelet host: minikube type: Normal
主要字段說明:**後端
event字段定義能夠看這裏:types.go#L5078api
接下來咱們來看看,整個event是如何下入的。安全
一、這裏以kubelet爲例,看看是如何進行事件寫入的二、文中代碼以Kubernetes 1.17.3爲例進行分析app
先以一幅圖來看下整個的處理流程
dom
建立操做事件的客戶端:
kubelet/app/server.go#L461
// makeEventRecorder sets up kubeDeps.Recorder if it's nil. It's a no-op otherwise. func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) { if kubeDeps.Recorder != nil { return } //事件廣播 eventBroadcaster := record.NewBroadcaster() //建立EventRecorder kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)}) //發送event至log輸出 eventBroadcaster.StartLogging(klog.V(3).Infof) if kubeDeps.EventClient != nil { klog.V(4).Infof("Sending events to api server.") //發送event至apiserver eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")}) } else { klog.Warning("No api server defined - no events will be sent to API server.") } }
經過 makeEventRecorder
建立了 EventRecorder
實例,這是一個事件廣播器,經過它提供了StartLogging和StartRecordingToSink兩個事件處理函數,分別將event發送給log和apiserver。NewRecorder
建立了 EventRecorder
的實例,它提供了 Event
,Eventf
等方法供事件記錄。
咱們來看下EventBroadcaster接口定義:event.go#L113
// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log. type EventBroadcaster interface { // StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface StartRecordingToSink(sink EventSink) watch.Interface StartLogging(logf func(format string, args ...interface{})) watch.Interface NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder Shutdown() }
具體實現是經過 eventBroadcasterImpl struct來實現了各個方法。
其中StartLogging 和 StartRecordingToSink 其實就是完成了對事件的消費,EventRecorder實現對事件的寫入,中間經過channel實現了生產者消費者模型。
EventRecorder
咱們先來看下EventRecorder
接口定義:event.go#L88,提供了一下4個方法
// EventRecorder knows how to record events on behalf of an EventSource. type EventRecorder interface { // Event constructs an event from the given information and puts it in the queue for sending. // 'object' is the object this event is about. Event will make a reference-- or you may also // pass a reference to the object directly. // 'type' of this event, and can be one of Normal, Warning. New types could be added in future // 'reason' is the reason this event is generated. 'reason' should be short and unique; it // should be in UpperCamelCase format (starting with a capital letter). "reason" will be used // to automate handling of events, so imagine people writing switch statements to handle them. // You want to make that easy. // 'message' is intended to be human readable. // // The resulting event will be created in the same namespace as the reference object. Event(object runtime.Object, eventtype, reason, message string) // Eventf is just like Event, but with Sprintf for the message field. Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) // PastEventf is just like Eventf, but with an option to specify the event's 'timestamp' field. PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{}) // AnnotatedEventf is just like eventf, but with annotations attached AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) }
主要參數說明:
object
對應event資源定義中的 involvedObject
eventtype
對應event資源定義中的type,可選Normal,Warning.reason
:事件緣由message
:事件消息咱們來看下當咱們調用 Event(object runtime.Object, eventtype, reason, message string)
的整個過程。
發現最終都調用到了 generateEvent
方法:event.go#L316
func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) { ..... event := recorder.makeEvent(ref, annotations, eventtype, reason, message) event.Source = recorder.source go func() { // NOTE: events should be a non-blocking operation defer utilruntime.HandleCrash() recorder.Action(watch.Added, event) }() }
最終事件在一個 goroutine
中經過調用 recorder.Action
進入處理,這裏保證了每次調用event方法都是非阻塞的。
其中 makeEvent
的做用主要是構造了一個event對象,事件name根據InvolvedObject中的name加上時間戳生成:
注意看:對於一些非namespace資源產生的event,event的namespace是default
func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event { t := metav1.Time{Time: recorder.clock.Now()} namespace := ref.Namespace if namespace == "" { namespace = metav1.NamespaceDefault } return &v1.Event{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()), Namespace: namespace, Annotations: annotations, }, InvolvedObject: *ref, Reason: reason, Message: message, FirstTimestamp: t, LastTimestamp: t, Count: 1, Type: eventtype, } }
進一步跟蹤Action
方法,apimachinery/blob/master/pkg/watch/mux.go#L188:23
// Action distributes the given event among all watchers. func (m *Broadcaster) Action(action EventType, obj runtime.Object) { m.incoming <- Event{action, obj} }
將event寫入到了一個channel裏面。
注意:
這個Action方式是apimachinery包中的方法,由於實現的sturt recorderImpl
將 *watch.Broadcaster
做爲一個匿名struct,而且在 NewRecorder
進行 Broadcaster
賦值,這個Broadcaster
其實就是 eventBroadcasterImpl
中的Broadcaster
。
到此,基本清楚了event最終被寫入到了 Broadcaster
中的 incoming
channel中,下面看下是怎麼進行消費的。
在 makeEventRecorder
調用的 StartLogging
和 StartRecordingToSink
其實就是完成了對事件的消費。
StartLogging
直接將event輸出到日誌StartRecordingToSink
將事件寫入到apiserver兩個方法內部都調用了 StartEventWatcher
方法,而且傳入一個 eventHandler
方法對event進行處理
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface { watcher := e.Watch() go func() { defer utilruntime.HandleCrash() for watchEvent := range watcher.ResultChan() { event, ok := watchEvent.Object.(*v1.Event) if !ok { // This is all local, so there's no reason this should // ever happen. continue } eventHandler(event) } }() return watcher }
其中 watcher.ResultChan
方法就拿到了事件,這裏是在一個goroutine中經過func (m *Broadcaster) loop() ==>func (m *Broadcaster) distribute(event Event) 方法調用將event又寫入了broadcasterWatcher.result
主要看下 StartRecordingToSink
提供的的eventHandler
, recordToSink
方法:
func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, sleepDuration time.Duration) { // Make a copy before modification, because there could be multiple listeners. // Events are safe to copy like this. eventCopy := *event event = &eventCopy result, err := eventCorrelator.EventCorrelate(event) if err != nil { utilruntime.HandleError(err) } if result.Skip { return } tries := 0 for { if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) { break } tries++ if tries >= maxTriesPerEvent { klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event) break } // Randomize the first sleep so that various clients won't all be // synced up if the master goes down. // 第一次重試增長隨機性,防止 apiserver 重啓的時候全部的事件都在同一時間發送事件 if tries == 1 { time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64())) } else { time.Sleep(sleepDuration) } } }
其中event被通過了一個 eventCorrelator.EventCorrelate(event)
方法作預處理,主要是聚合相同的事件(避免產生的事件過多,增長 etcd 和 apiserver 的壓力,也會致使查看 pod 事件很不清晰)
下面一個for循環就是在進行重試,最大重試次數是12次,調用 recordEvent
方法才真正將event寫入到了apiserver。
咱們來看下EventCorrelate
方法:
// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) { if newEvent == nil { return nil, fmt.Errorf("event is nil") } aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent) observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey) if c.filterFunc(observedEvent) { return &EventCorrelateResult{Skip: true}, nil } return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err }
分別調用了 aggregator.EventAggregate
, logger.eventObserve
, filterFunc
三個方法,分別做用是:
aggregator.EventAggregate
:聚合event,若是在最近 10 分鐘出現過 10 個類似的事件(除了 message 和時間戳以外其餘關鍵字段都相同的事件),aggregator 會把它們的 message 設置爲 (combined from similar events)+event.Message
logger.eventObserve
:它會把相同的事件以及包含 aggregator
被聚合了的類似的事件,經過增長 Count
字段來記錄事件發生了多少次。filterFunc
: 這裏實現了一個基於令牌桶的限流算法,若是超過設定的速率則丟棄,保證了apiserver的安全。咱們主要來看下aggregator.EventAggregate
方法:
func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) { now := metav1.NewTime(e.clock.Now()) var record aggregateRecord // eventKey is the full cache key for this event //eventKey 是將除了時間戳外全部字段結合在一塊兒 eventKey := getEventKey(newEvent) // aggregateKey is for the aggregate event, if one is needed. //aggregateKey 是除了message和時間戳外的字段結合在一塊兒,localKey 是message aggregateKey, localKey := e.keyFunc(newEvent) // Do we have a record of similar events in our cache? e.Lock() defer e.Unlock() //從cache中根據aggregateKey查詢是否存在,若是是相同或者相相似的事件會被放入cache中 value, found := e.cache.Get(aggregateKey) if found { record = value.(aggregateRecord) } //判斷上次事件產生的時間是否超過10分鐘,如何操做則從新生成一個localKeys集合(集合中存放message) maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second interval := now.Time.Sub(record.lastTimestamp.Time) if interval > maxInterval { record = aggregateRecord{localKeys: sets.NewString()} } // Write the new event into the aggregation record and put it on the cache //將locakKey也就是message放入集合中,若是message相同就是覆蓋了 record.localKeys.Insert(localKey) record.lastTimestamp = now e.cache.Add(aggregateKey, record) // If we are not yet over the threshold for unique events, don't correlate them //判斷localKeys集合中存放的相似事件是否超過10個, if uint(record.localKeys.Len()) < e.maxEvents { return newEvent, eventKey } // do not grow our local key set any larger than max record.localKeys.PopAny() // create a new aggregate event, and return the aggregateKey as the cache key // (so that it can be overwritten.) eventCopy := &v1.Event{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()), Namespace: newEvent.Namespace, }, Count: 1, FirstTimestamp: now, InvolvedObject: newEvent.InvolvedObject, LastTimestamp: now, //這裏會對message加個前綴:(combined from similar events): Message: e.messageFunc(newEvent), Type: newEvent.Type, Reason: newEvent.Reason, Source: newEvent.Source, } return eventCopy, aggregateKey }
aggregator.EventAggregate
方法中其實就是判斷了經過cache和localKeys判斷事件是否類似,若是最近 10 分鐘出現過 10 個類似的事件就合併並加上前綴,後續經過logger.eventObserve
方法進行count累加,若是message也相同,確定就是直接count++。
好了,event處理的整個流程基本就是這樣,咱們能夠歸納一下,能夠結合文中的圖對比一塊兒看下:
EventRecorder
對象,經過其提供的 Event
等方法,建立好event對象EventBroadcaster
中的channel中EventBroadcaster
經過後臺運行的goroutine,從管道中取出事件,並廣播給提早註冊好的handler處理EventSink
handler收處處理事件就經過預處理以後將事件發送給apiserver回顧event的整個流程,能夠看到event並非保證100%事件寫入(從預處理的過程來看),這樣作是爲了後端服務etcd的可用性,由於event事件在整個集羣中產生是很是頻繁的,尤爲在服務不穩定的時候,而相比Deployment,Pod等其餘資源,又沒那麼的重要。因此這裏作了個取捨。
參考文檔:
原文地址:https://silenceper.com/blog/2...
關注"學點程序"公衆號,瞭解更多幹貨內容 !