CloudEvents三部曲:實踐篇

隨着雲原生的發展(雲原生的下一個五年在哪裏?),逐步進入深水區,業界須要一種統一的事件定義和描述規範,以提供跨服務、跨平臺的交互能力。CloudEvents事件規範應運而生,並獲得了行業的普遍關注,包括主要的雲提供商和 SaaS 公司。 對於CloudEvent的介紹、規範說明及實踐落地,將以三篇系列文章進行說明,本文爲《CloudEvent三部曲:實踐篇》python

1、產品接入

場景

Serverless是一項基於事件驅動的函數計算服務,經過使用函數計算產品,函數以彈性、免運維、高可靠的方式運行,用戶能夠無需採購和維護服務器等基礎設施,能夠更加專一於函數代碼的編寫。 目前 CloudEvents 在函數計算中已有普遍的應用,第三方服務接入函數計算服務,須要使用符合 CloudEvents 規範的消息傳遞數據,方便函數計算平臺方對第三方服務的消息進行分發過濾,同時因爲規範的通用性,第三方服務一次改造後能夠無縫適配到各種符合 CloudEvents 規範的平臺上。 此外消息類產品(例如:消息隊列,消息服務,事件總線等)也可經過支持 CloudEvents 規範,統一雲的事件標準,加速雲原生生態集成。git

開發

一般狀況下,要構建一個CloudEvent,須要使用CloudEvents的軟件開發工具包(SDK),利用SDK能夠極大方便開發人員進行集成開發,截至 CloudEvents v1.0 規範的發佈,CloudEvents 團隊支持和維護如下6種SDK:github

  • CSharp
  • Go SDK
  • Java SDK
  • JavaScript SDK
  • Python SDK
  • Ruby SDK

如下使用 Go,Python SDK構造符合CloudEvent 1.0 規範的消息接收發送,HTTP/JSON請求轉化等功能的範例。shell

Golangjson

  1. 獲取依賴

go get github.com/cloudevents/sdk-go/v2@v2.0.0flask

  1. 依賴引用

import cloudevents "github.com/cloudevents/sdk-go/v2"服務器

  1. 發送事件
package main

import (
    "log"

	cloudevents "github.com/cloudevents/sdk-go/v2"
)

func main() {
	// The default client is HTTP.
	c, err := cloudevents.NewDefaultClient()
	if err != nil {
		log.Fatalf("failed to create client, %v", err)
	}

	// Create an Event.
	event :=  cloudevents.NewEvent()
	event.SetSource("example/uri")
	event.SetType("example.type")
	event.SetData(cloudevents.ApplicationJSON, map[string]string{"hello": "world"})

	// Set a target.
	ctx := cloudevents.ContextWithTarget(context.Background(), "http://localhost:8080/")

	// Send that Event.
	if result := c.Send(ctx, event); !cloudevents.IsACK(result) {
		log.Fatalf("failed to send, %v", result)
	}
}
複製代碼
  1. 接受事件
package main

import (
    "log"

	cloudevents "github.com/cloudevents/sdk-go/v2"
)

func receive(event cloudevents.Event) {
	// do something with event.
    fmt.Printf("%s", event)
}

func main() {
	// The default client is HTTP.
	c, err := cloudevents.NewDefaultClient()
	if err != nil {
		log.Fatalf("failed to create client, %v", err)
	}
	log.Fatal(c.StartReceiver(context.Background(), receive));
}
複製代碼
  1. 序列化

序列化爲JSONmarkdown

event := cloudevents.NewEvent()
event.SetSource("example/uri")
event.SetType("example.type")
event.SetData(cloudevents.ApplicationJSON, map[string]string{"hello": "world"})

bytes, err := json.Marshal(event)
複製代碼

反序列化架構

event :=  cloudevents.NewEvent()
err := json.Marshal(bytes, &event)
複製代碼

Pythonapp

  1. 依賴包安裝

pip install cloudevents

  1. 事件發送者

經過 Python SDK 中的 CloudEvent 類型構造 CloudEvents 事件,再利用 to_binary函數將其序列化爲 JSON 格式的數據,使用 requests框架發送該 JSON 請求。

from cloudevents.http import CloudEvent, to_binary
import requests

# 構建CloudEvent結構體
# - The CloudEvent "id" is generated if omitted. "specversion" defaults to "1.0".
attributes = {
    "type": "com.example.sampletype1",
    "source": "https://example.com/event-producer",
}
data = {"message": "Hello World!"}
event = CloudEvent(attributes, data)

# 利用to_binary函數將其序列化爲 JSON 格式的數據
headers, body = to_binary(event)

# POST
requests.post("<some-url>", data=body, headers=headers)
複製代碼
  1. 接受事件處理

經過 Python SDK 中的 from_http 函數解析出 CloudEvents 事件,並打印事件內容

from flask import Flask, request

from cloudevents.http import from_http

app = Flask(__name__)


# create an endpoint at http://localhost:/3000/
@app.route("/", methods=["POST"])
def home():
    # create a CloudEvent
    event = from_http(request.headers, request.get_data())

    # you can access cloudevent fields as seen below
    print(
        f"Found {event['id']} from {event['source']} with type "
        f"{event['type']} and specversion {event['specversion']}"
    )

    return "", 204


if __name__ == "__main__":
    app.run(port=3000)
複製代碼

2、接入方式

架構

基於事件驅動服務是函數計算的核心功能之一,平臺使用 Knative Eventing 的Broker/Trigger 事件處理模型對事件進行過濾分發,此外爲了確保跨平臺和互操做性,將採用CNCF定義的標準數據格式CloudEvents 進行事件傳輸。

圖 1 產品接入架構

如上圖所示,架構分爲三塊內容,從左到右分別爲事件源,事件接收/轉發者,事件消費者。

  1. 事件源

事件源是一種 Kubernetes 定製資源,它提供了一種機制,用於註冊來自特定服務系統的一類事件。例如:對象存儲事件源,Github事件源等等,所以不一樣的事件源須要的不一樣的自定義資源進行描述。

事件源負責獲取特定服務系統的事件,並將事件轉化爲CloudEvents格式事件發送給 Knative Eventing 平臺(即 Broker/Trigger事件處理模型)。

  1. 事件接受/轉發者

引入Broker/Trigger事件處理模型的目的,是爲了搭建一些黑盒子,將具體的實現隱藏起來,用戶無需關心底層實現細節。

  • Broker如同事件桶,接收各類不一樣的事件,這些事件能夠經過屬性來過濾。
  • Trigger描述了一個過濾器,只有經過了過濾器選擇的事件,能夠被傳送給事件消費者。

如圖1所示,用戶經過 filter指定感興趣紅色小球的事件,最終只有該類事件被傳送給事件消費者(這裏是Knative Service,即 KSvc函數)。

  1. 事件消費者

事件消費者能夠是某個服務或系統,這裏的事件消費者是用戶編寫的KSvc函數(即處理事件的邏輯代碼)。

實現方式
  1. 第三方接入

第三方服務接入基於knative實現的serverless平臺須要提供特定的事件源,Knative社區已維護部分事件源,具體列表請查看:github.com/knative/eve…

若是第三方服務不在社區提供的支持列表中,就須要自定義事件源,有以下經常使用的幾種方式:

ContainerSource 實現簡單,是目前大部分自定義事件源的實現方式,也是KNative平臺推薦的方式。

ContainerSource 是 Kubernetes 中自定義的 CRD(Custom Resource Definition)資源類型,具體定義以下

CRD

主要看如下幾個部分:

  1. sink:事件轉發的目標對象,這裏即圖1中介紹的Borker
  2. image:須要開發的鏡像,包括了監聽具體數據源的事件和轉發事件到sink的實現
  3. arg和env:開發者自定義的一些數據經過 arg 和 env 傳入鏡像

ContainerSource 中 image 鏡像部分即須要自定義實現的部分,實現方式根據獲取第三方服務事件的不一樣分爲如下兩種:

  1. 消息隊列方式

以下圖 2所示,若是第三方服務已適配消息隊列,能夠將產生的事件發往消息隊列,此時 ContainerSource 能夠直接從消息隊列中消費第三方服務的事件。

圖 2 消息隊列方式

  1. 直連方式

以下圖 3所示,若是第三方服務未適配消息隊列,但服務自己提供事件訂閱能力(如 Redis 的鍵空間特性,Keyspace Notifications future),此時 ContainerSource 能夠直接訂閱第三方服務的事件,監聽服務變化。

圖 3 直連方式

注意:不管採用以上哪一種方式,ContainerSource 在生成 CloudEvents 事件時,都須要攜帶 KSVC 目標函數的惟一標識,以供平臺側分發事件時使用。例如:1. 消息隊列方式,因爲全部事件都從同一個消息隊列中獲取,此時須要在第三方服務生產事件時就攜帶目標函數的標識(對象存儲產品接入時採用該方式);2. 直連方式,因爲 ContainerSource 與第三方服務是一對一關係,因此能夠在 ContainerSource 生成 CloudEvents 事件時添加目標函數的標識。

利用 Broker/Trigger 事件處理模型,極大簡化了第三方服務接入函數計算的流程。不管使用消息隊列方式仍是直連方式,產品側只須要提供適配第三方服務的 ContainerSource 鏡像,以供平臺側使用。

  1. 平臺側納管

平臺側的工做主要是納管產品側提供的 ContainerSource,並利用 Trigger 提供事件過濾的能力。

針對 ContainerSource 不一樣的實現方式,平臺側工做內容也有所區別:

消息隊列實現方式

平臺側會建立以下內容:

  1. 一組相同的ContainerSource(用於高可用)
  2. 一個 Broker 類型的資源,用於分發事件
  3. 多個 Trigger 類型資源,用於事件過濾

平臺側會預先建立好 ContainerSource 和 Broker 資源,並提供納管 Trigger 的增刪改查接口用於事件過濾,此時 ContainerSource,Broker,Trigger 對應關係以下圖所示:

直連方式

平臺側會建立以下內容:

  1. 多個 ContainerSource 訂閱監聽不一樣的服務實例
  2. 一個 Broker 類型的資源,用於分發事件
  3. 多個 Trigger 類型資源,用於事件過濾

平臺側會預先提供好 Broker 資源,並提供納管 ContainerSource 和 Trigger 的增刪改查接口,此時 ContainerSource,Broker,Trigger 對應關係以下圖所示:

更多雲原生相關內容,請查看公衆號:DCOS

參考資料

相關文章
相關標籤/搜索