EMQ X Kuiper 是一款基於 SQL 的輕量級物聯網流式數據處理軟件,提供了一套插件機制用於實現自定義源(source),目標(sink)以及 SQL 函數(function)以擴展流處理功能。本教程詳細介紹了 Kuiper 插件的開發編譯和部署過程。mysql
Kuiper 插件基於 Go 語言的插件機制,用戶能夠構建鬆散耦合的插件程序,在運行時動態加載和綁定。可是,因爲 GO 語言插件系統的限制, Kuiper 插件的編譯和使用也有相應的限制:git
插件編譯環境要求跟 Kuiper 編譯環境儘可能一致,包括但不限於github
這些限制較爲苛刻,幾乎要求插件和 Kuiper 在同一臺機器編譯運行,常常致使開發環境編譯出的插件沒法在生產 Kuiper 上使用。本文詳細介紹了一種切實可用的插件開發環境設置和流程,推薦給 Kuiper 插件開發者使用。插件的開發和使用通常有以下流程:sql
開發docker
部署數據庫
插件開發通常在開發環境中進行。在開發環境調試運行經過後再部署到生產環境中。json
Kuiper 項目源代碼的 plugins 目錄下有一些插件範例。用戶自定義的插件也能夠在 Kuiper 項目中開發。可是爲了便於代碼管理,通常應當在 Kuiper 項目以外另建項目開發自定義插件。插件項目建議使用 Go module,典型的項目目錄以下圖所示:windows
plugin_project sources //源(source)插件源代碼目錄 mysource.go sinks //目標(sink)插件源代碼目錄 mysink.go functions //函數(function)插件源代碼目錄 myfunction.go target //編譯結果目錄 go.mod //go module文件
插件開發須要擴展 Kuiper 內的接口,所以必須依賴於 Kuiper 項目。最簡單的 go.mod 也須要包含對 Kuiper 的依賴。典型的 go.mod 以下:api
module samplePlugin go 1.13 require ( github.com/emqx/kuiper v0.0.0-20200323140757-60d00241372b )
Kuiper 插件有三種類型,源代碼可放入對應的目錄中。插件開發的詳細方法請參看 EMQ X Kuiper 擴展。本文以目標(sink)爲例,介紹插件的開發部署過程。咱們將開發一個最基本的 MySql 目標,用於將流輸出寫入到 MySql 數據庫中。服務器
編輯 mysql.go 文件以實現插件
mysql.go 完整代碼以下
package main import ( "database/sql" "fmt" "github.com/emqx/kuiper/xstream/api" _ "github.com/go-sql-driver/mysql" ) type mysqlSink struct { url string table string db *sql.DB } func (m *mysqlSink) Configure(props map[string]interface{}) error { if i, ok := props["url"]; ok { if i, ok := i.(string); ok { m.url = i } } if i, ok := props["table"]; ok { if i, ok := i.(string); ok { m.table = i } } return nil } func (m *mysqlSink) Open(ctx api.StreamContext) (err error) { logger := ctx.GetLogger() logger.Debug("Opening mysql sink") m.db, err = sql.Open("mysql", m.url) return } func (m *mysqlSink) Collect(ctx api.StreamContext, item interface{}) error { logger := ctx.GetLogger() if v, ok := item.([]byte); ok { logger.Debugf("mysql sink receive %s", item) sql := fmt.Sprintf("INSERT INTO %s (`name`) VALUES ('%s')", m.table, v) logger.Debugf(sql) insert, err := m.db.Query(sql) if err != nil { return err } defer insert.Close() } else { logger.Debug("mysql sink receive non byte data") } return nil } func (m *mysqlSink) Close(ctx api.StreamContext) error { if m.db != nil{ m.db.Close() } return nil } var Mysql mysqlSink
go.mod 完整代碼以下
module samplePlugin go 1.13 require ( github.com/emqx/kuiper v0.0.0-20200323140757-60d00241372b github.com/go-sql-driver/mysql v1.5.0 )
編譯插件應當與編譯 Kuiper 的環境一致。在開發環境中,典型的用法是在本地下載並編譯 Kuiper 和插件,而後在本地 Kuiper 上調試插件功能;也能夠在 Kuiper 的 docker 容器中編譯插件,並用 Kuiper 容器運行調試。
開發者能夠在本地自行編譯 Kuiper 和插件進行調試。其步驟以下:
git clone https://github.com/emqx/kuiper.git
make
編譯插件:
go mod edit -replace github.com/emqx/kuiper=$kuiperPath
,使得 Kuiper 依賴指向本地 Kuiper,請替換$kuiperPath 到步驟1下載目錄,下同。go build --buildmode=plugin -o $kuiperPath/_build/$build/plugins/sinks/Mysql@v1.0.0.so sinks/mysql.go
從0.3.0版本開始,Kuiper 提供了開發版本 docker 鏡像( kuiper:x.x.x-dev
)。與運行版本相比,開發版提供了 go 開發環境,使得用戶能夠在編譯出在 Kuiper 正式發佈版本中徹底兼容的插件。Docker 中編譯步驟以下:
運行 Kuiper 開發版本 docker。須要把本地插件目錄 mount 到 docker 裏的目錄中,這樣才能在 docker 中訪問插件項目並編譯。筆者的插件項目位於本地/var/git
目錄。下面的命令中,咱們把本地的/var/git
目錄映射到docker內的/home
目錄中。
docker run -d --name kuiper-dev --mount type=bind,source=/var/git,target=/home emqx/kuiper:0.3.0-dev
在 docker 環境中編譯插件,其原理與本地編譯一致。編譯出的插件置於插件項目的 target 目錄中
-- In host # docker exec -it kuiper-dev /bin/sh -- In docker instance # cd /home/samplePlugin # go mod edit -replace github.com/emqx/kuiper=/go/kuiper # go build --buildmode=plugin -o /home/samplePlugin/target/plugins/sinks/Mysql@v1.0.0.so sinks/mysql.go
在本地或 Docker 中啓動 Kuiper,建立流和規則,規則的 action 設置爲 mysql 便可對自定義的 mysql sink 插件進行測試。建立流和規則的步驟請參考 Kuiper 文檔。如下提供一個使用了 mysql 插件的規則供參考。
{ "id": "ruleTest", "sql": "SELECT * from demo", "actions": [ { "log": {}, "mysql":{ "url": "user:test@tcp(localhost:3307)/user", "table": "test" } } ] }
須要注意的是,插件從新編譯後須要重啓 Kuiper 才能載入新的版本。
若是生產環境和開發環境若是不一樣,開發的插件須要從新編譯並部署到生產環境。假設生產環境採用 Kuiper docker 進行部署,本節將描述如何部署插件到生產環境中。
插件原則上應該與生產環境 Kuiper 採用相同環境進行編譯。假設生產環境爲 Kuiper docker,則應當採用與生產環境相同版本的 dev docker 環境編譯插件。例如,生產環境採用 emqx/kuiper:0.3.0 的 docker 鏡像,則插件須要在emqx/kuiper:0.3.0-dev 的環境中進行編譯。
編譯過程請參考 Docker 編譯。
能夠採用 REST API 或者 CLI 進行插件管理。下文以 REST API 爲例,將上一節編譯的插件部署到生產環境中。
.so
文件及默認配置文件(只有 source 須要) .yaml
文件一塊兒打包到一個 .zip
文件中,假設爲 mysqlSink.zip
。把該文件放置到生產環境也可訪問的 http 服務器中。使用 REST API 建立插件:
POST http://{$production_kuiper_ip}:9081/plugins/sinks Content-Type: application/json {"name":"mysql","file":"http://{$http_server_ip}/plugins/sinks/mysqlSink.zip"}
驗證插件是否建立成功
GET http://{$production_kuiper_ip}:9081/plugins/sinks/mysql
返回
{ "name": "mysql", "version": "1.0.0" }
至此,插件部署成功。能夠建立帶有 mysql sink 的規則進行驗證。
版權聲明: 本文爲 EMQ 原創,轉載請註明出處。