隨着雲原生的發展(雲原生的下一個五年在哪裏?),逐步進入深水區,業界須要一種統一的事件定義和描述規範,以提供跨服務、跨平臺的交互能力。CloudEvents事件規範應運而生,並獲得了行業的普遍關注,包括主要的雲提供商和 SaaS 公司。 對於CloudEvent的介紹、規範說明及實踐落地,將以三篇系列文章進行說明,本文爲《CloudEvent三部曲:實踐篇》python
Serverless是一項基於事件驅動的函數計算服務,經過使用函數計算產品,函數以彈性、免運維、高可靠的方式運行,用戶能夠無需採購和維護服務器等基礎設施,能夠更加專一於函數代碼的編寫。 目前 CloudEvents 在函數計算中已有普遍的應用,第三方服務接入函數計算服務,須要使用符合 CloudEvents 規範的消息傳遞數據,方便函數計算平臺方對第三方服務的消息進行分發過濾,同時因爲規範的通用性,第三方服務一次改造後能夠無縫適配到各種符合 CloudEvents 規範的平臺上。 此外消息類產品(例如:消息隊列,消息服務,事件總線等)也可經過支持 CloudEvents 規範,統一雲的事件標準,加速雲原生生態集成。git
一般狀況下,要構建一個CloudEvent,須要使用CloudEvents的軟件開發工具包(SDK),利用SDK能夠極大方便開發人員進行集成開發,截至 CloudEvents v1.0 規範的發佈,CloudEvents 團隊支持和維護如下6種SDK:github
如下使用 Go,Python SDK構造符合CloudEvent 1.0 規範的消息接收發送,HTTP/JSON請求轉化等功能的範例。shell
Golangjson
go get github.com/cloudevents/sdk-go/v2@v2.0.0
flask
import cloudevents "github.com/cloudevents/sdk-go/v2"
服務器
package main
import (
"log"
cloudevents "github.com/cloudevents/sdk-go/v2"
)
func main() {
// The default client is HTTP.
c, err := cloudevents.NewDefaultClient()
if err != nil {
log.Fatalf("failed to create client, %v", err)
}
// Create an Event.
event := cloudevents.NewEvent()
event.SetSource("example/uri")
event.SetType("example.type")
event.SetData(cloudevents.ApplicationJSON, map[string]string{"hello": "world"})
// Set a target.
ctx := cloudevents.ContextWithTarget(context.Background(), "http://localhost:8080/")
// Send that Event.
if result := c.Send(ctx, event); !cloudevents.IsACK(result) {
log.Fatalf("failed to send, %v", result)
}
}
複製代碼
package main
import (
"log"
cloudevents "github.com/cloudevents/sdk-go/v2"
)
func receive(event cloudevents.Event) {
// do something with event.
fmt.Printf("%s", event)
}
func main() {
// The default client is HTTP.
c, err := cloudevents.NewDefaultClient()
if err != nil {
log.Fatalf("failed to create client, %v", err)
}
log.Fatal(c.StartReceiver(context.Background(), receive));
}
複製代碼
序列化爲JSONmarkdown
event := cloudevents.NewEvent()
event.SetSource("example/uri")
event.SetType("example.type")
event.SetData(cloudevents.ApplicationJSON, map[string]string{"hello": "world"})
bytes, err := json.Marshal(event)
複製代碼
反序列化架構
event := cloudevents.NewEvent()
err := json.Marshal(bytes, &event)
複製代碼
Pythonapp
pip install cloudevents
經過 Python SDK 中的 CloudEvent 類型構造 CloudEvents 事件,再利用 to_binary函數將其序列化爲 JSON 格式的數據,使用 requests框架發送該 JSON 請求。
from cloudevents.http import CloudEvent, to_binary
import requests
# 構建CloudEvent結構體
# - The CloudEvent "id" is generated if omitted. "specversion" defaults to "1.0".
attributes = {
"type": "com.example.sampletype1",
"source": "https://example.com/event-producer",
}
data = {"message": "Hello World!"}
event = CloudEvent(attributes, data)
# 利用to_binary函數將其序列化爲 JSON 格式的數據
headers, body = to_binary(event)
# POST
requests.post("<some-url>", data=body, headers=headers)
複製代碼
經過 Python SDK 中的 from_http 函數解析出 CloudEvents 事件,並打印事件內容
from flask import Flask, request
from cloudevents.http import from_http
app = Flask(__name__)
# create an endpoint at http://localhost:/3000/
@app.route("/", methods=["POST"])
def home():
# create a CloudEvent
event = from_http(request.headers, request.get_data())
# you can access cloudevent fields as seen below
print(
f"Found {event['id']} from {event['source']} with type "
f"{event['type']} and specversion {event['specversion']}"
)
return "", 204
if __name__ == "__main__":
app.run(port=3000)
複製代碼
基於事件驅動服務是函數計算的核心功能之一,平臺使用 Knative Eventing 的Broker/Trigger 事件處理模型對事件進行過濾分發,此外爲了確保跨平臺和互操做性,將採用CNCF定義的標準數據格式CloudEvents 進行事件傳輸。
如上圖所示,架構分爲三塊內容,從左到右分別爲事件源,事件接收/轉發者,事件消費者。
事件源是一種 Kubernetes 定製資源,它提供了一種機制,用於註冊來自特定服務系統的一類事件。例如:對象存儲事件源,Github事件源等等,所以不一樣的事件源須要的不一樣的自定義資源進行描述。
事件源負責獲取特定服務系統的事件,並將事件轉化爲CloudEvents格式事件發送給 Knative Eventing 平臺(即 Broker/Trigger事件處理模型)。
引入Broker/Trigger事件處理模型的目的,是爲了搭建一些黑盒子,將具體的實現隱藏起來,用戶無需關心底層實現細節。
如圖1所示,用戶經過 filter指定感興趣紅色小球的事件,最終只有該類事件被傳送給事件消費者(這裏是Knative Service,即 KSvc函數)。
事件消費者能夠是某個服務或系統,這裏的事件消費者是用戶編寫的KSvc函數(即處理事件的邏輯代碼)。
第三方服務接入基於knative實現的serverless平臺須要提供特定的事件源,Knative社區已維護部分事件源,具體列表請查看:github.com/knative/eve…
若是第三方服務不在社區提供的支持列表中,就須要自定義事件源,有以下經常使用的幾種方式:
ContainerSource 實現簡單,是目前大部分自定義事件源的實現方式,也是KNative平臺推薦的方式。
ContainerSource 是 Kubernetes 中自定義的 CRD(Custom Resource Definition)資源類型,具體定義以下
主要看如下幾個部分:
ContainerSource 中 image 鏡像部分即須要自定義實現的部分,實現方式根據獲取第三方服務事件的不一樣分爲如下兩種:
以下圖 2所示,若是第三方服務已適配消息隊列,能夠將產生的事件發往消息隊列,此時 ContainerSource 能夠直接從消息隊列中消費第三方服務的事件。
以下圖 3所示,若是第三方服務未適配消息隊列,但服務自己提供事件訂閱能力(如 Redis 的鍵空間特性,Keyspace Notifications future),此時 ContainerSource 能夠直接訂閱第三方服務的事件,監聽服務變化。
注意:不管採用以上哪一種方式,ContainerSource 在生成 CloudEvents 事件時,都須要攜帶 KSVC 目標函數的惟一標識,以供平臺側分發事件時使用。例如:1. 消息隊列方式,因爲全部事件都從同一個消息隊列中獲取,此時須要在第三方服務生產事件時就攜帶目標函數的標識(對象存儲產品接入時採用該方式);2. 直連方式,因爲 ContainerSource 與第三方服務是一對一關係,因此能夠在 ContainerSource 生成 CloudEvents 事件時添加目標函數的標識。
利用 Broker/Trigger 事件處理模型,極大簡化了第三方服務接入函數計算的流程。不管使用消息隊列方式仍是直連方式,產品側只須要提供適配第三方服務的 ContainerSource 鏡像,以供平臺側使用。
平臺側的工做主要是納管產品側提供的 ContainerSource,並利用 Trigger 提供事件過濾的能力。
針對 ContainerSource 不一樣的實現方式,平臺側工做內容也有所區別:
消息隊列實現方式
平臺側會建立以下內容:
平臺側會預先建立好 ContainerSource 和 Broker 資源,並提供納管 Trigger 的增刪改查接口用於事件過濾,此時 ContainerSource,Broker,Trigger 對應關係以下圖所示:
直連方式
平臺側會建立以下內容:
平臺側會預先提供好 Broker 資源,並提供納管 ContainerSource 和 Trigger 的增刪改查接口,此時 ContainerSource,Broker,Trigger 對應關係以下圖所示: