超輕量物聯網邊緣流處理 - Kuiper 插件開發教程

EMQ X Kuiper 是一款基於 SQL 的輕量級物聯網流式數據處理軟件,提供了一套插件機制用於實現自定義源(source),目標(sink)以及 SQL 函數(function)以擴展流處理功能。本教程詳細介紹了 Kuiper 插件的開發編譯和部署過程。mysql

概覽

Kuiper 插件基於 Go 語言的插件機制,用戶能夠構建鬆散耦合的插件程序,在運行時動態加載和綁定。可是,因爲 GO 語言插件系統的限制, Kuiper 插件的編譯和使用也有相應的限制:git

  • 插件不支持 windows 系統
  • 插件編譯環境要求跟 Kuiper 編譯環境儘可能一致,包括但不限於github

    • 相同的 GO 版本
    • 插件與 Kuiper 自身依賴的相同包版本必須徹底一致,包括 Kuiper 自身
    • 插件與 Kuiper 編譯環境的 GOPATH 必須徹底一致

這些限制較爲苛刻,幾乎要求插件和 Kuiper 在同一臺機器編譯運行,常常致使開發環境編譯出的插件沒法在生產 Kuiper 上使用。本文詳細介紹了一種切實可用的插件開發環境設置和流程,推薦給 Kuiper 插件開發者使用。插件的開發和使用通常有以下流程:sql

  • 開發docker

    • 建立並開發插件項目
    • 編譯調試插件
  • 部署數據庫

    • 編譯生產環境可用插件
    • 部署插件到生產環境

插件開發

插件開發通常在開發環境中進行。在開發環境調試運行經過後再部署到生產環境中。json

建立並開發插件項目

Kuiper 項目源代碼的 plugins 目錄下有一些插件範例。用戶自定義的插件也能夠在 Kuiper 項目中開發。可是爲了便於代碼管理,通常應當在 Kuiper 項目以外另建項目開發自定義插件。插件項目建議使用 Go module,典型的項目目錄以下圖所示:windows

plugin_project
  sources         //源(source)插件源代碼目錄
    mysource.go
  sinks           //目標(sink)插件源代碼目錄
    mysink.go
  functions       //函數(function)插件源代碼目錄
    myfunction.go
  target          //編譯結果目錄     
  go.mod          //go module文件

插件開發須要擴展 Kuiper 內的接口,所以必須依賴於 Kuiper 項目。最簡單的 go.mod 也須要包含對 Kuiper 的依賴。典型的 go.mod 以下:api

module samplePlugin

go 1.13

require (
    github.com/emqx/kuiper v0.0.0-20200323140757-60d00241372b
)

Kuiper 插件有三種類型,源代碼可放入對應的目錄中。插件開發的詳細方法請參看 EMQ X Kuiper 擴展。本文以目標(sink)爲例,介紹插件的開發部署過程。咱們將開發一個最基本的 MySql 目標,用於將流輸出寫入到 MySql 數據庫中。服務器

  • 新建名爲 samplePlugin 的插件項目,採用上文的目錄結構
  • 在 sinks 目錄下,新建 mysql.go 文件
  • 編輯 mysql.go 文件以實現插件

    • 實現 api.Sink接口
    • 導出 Symbol:Mysql
  • 編輯 go.mod, 添加 mysql 驅動模塊

mysql.go 完整代碼以下

package main

import (
    "database/sql"
    "fmt"
    "github.com/emqx/kuiper/xstream/api"
    _ "github.com/go-sql-driver/mysql"
)

type mysqlSink struct {
    url       string
    table     string

    db        *sql.DB
}

func (m *mysqlSink) Configure(props map[string]interface{}) error {
    if i, ok := props["url"]; ok {
        if i, ok := i.(string); ok {
            m.url = i
        }
    }
    if i, ok := props["table"]; ok {
        if i, ok := i.(string); ok {
            m.table = i
        }
    }
    return nil
}

func (m *mysqlSink) Open(ctx api.StreamContext) (err error) {
    logger := ctx.GetLogger()
    logger.Debug("Opening mysql sink")
    m.db, err = sql.Open("mysql", m.url)
    return
}

func (m *mysqlSink) Collect(ctx api.StreamContext, item interface{}) error {
    logger := ctx.GetLogger()
    if v, ok := item.([]byte); ok {
        logger.Debugf("mysql sink receive %s", item)
        sql := fmt.Sprintf("INSERT INTO %s (`name`) VALUES ('%s')", m.table, v)
        logger.Debugf(sql)
        insert, err := m.db.Query(sql)
        if err != nil {
            return err
        }
        defer insert.Close()
    } else {
        logger.Debug("mysql sink receive non byte data")
    }
    return nil
}

func (m *mysqlSink) Close(ctx api.StreamContext) error {
    if m.db != nil{
        m.db.Close()
    }
    return nil
}

var Mysql mysqlSink

go.mod 完整代碼以下

module samplePlugin

go 1.13

require (
   github.com/emqx/kuiper v0.0.0-20200323140757-60d00241372b
   github.com/go-sql-driver/mysql v1.5.0
)

編譯調試插件

編譯插件應當與編譯 Kuiper 的環境一致。在開發環境中,典型的用法是在本地下載並編譯 Kuiper 和插件,而後在本地 Kuiper 上調試插件功能;也能夠在 Kuiper 的 docker 容器中編譯插件,並用 Kuiper 容器運行調試。

本地編譯

開發者能夠在本地自行編譯 Kuiper 和插件進行調試。其步驟以下:

  1. 下載 Kuiper 源代碼 git clone https://github.com/emqx/kuiper.git
  2. 編譯 Kuiper:在 Kuiper 目錄下,運行 make
  3. 編譯插件:

    1. 在插件項目下,運行go mod edit -replace github.com/emqx/kuiper=$kuiperPath,使得 Kuiper 依賴指向本地 Kuiper,請替換$kuiperPath 到步驟1下載目錄,下同。
    2. 編譯插件 so 到 Kuiper 插件目錄下
    go build --buildmode=plugin -o $kuiperPath/_build/$build/plugins/sinks/Mysql@v1.0.0.so sinks/mysql.go

Docker 編譯

從0.3.0版本開始,Kuiper 提供了開發版本 docker 鏡像( kuiper:x.x.x-dev )。與運行版本相比,開發版提供了 go 開發環境,使得用戶能夠在編譯出在 Kuiper 正式發佈版本中徹底兼容的插件。Docker 中編譯步驟以下:

  1. 運行 Kuiper 開發版本 docker。須要把本地插件目錄 mount 到 docker 裏的目錄中,這樣才能在 docker 中訪問插件項目並編譯。筆者的插件項目位於本地/var/git目錄。下面的命令中,咱們把本地的/var/git目錄映射到docker內的/home目錄中。

    docker run -d --name kuiper-dev --mount type=bind,source=/var/git,target=/home emqx/kuiper:0.3.0-dev
  2. 在 docker 環境中編譯插件,其原理與本地編譯一致。編譯出的插件置於插件項目的 target 目錄中

    -- In host
    # docker exec -it kuiper-dev /bin/sh
    
    -- In docker instance
    # cd /home/samplePlugin
    # go mod edit -replace github.com/emqx/kuiper=/go/kuiper
    # go build --buildmode=plugin -o /home/samplePlugin/target/plugins/sinks/Mysql@v1.0.0.so sinks/mysql.go

調試運行插件

在本地或 Docker 中啓動 Kuiper,建立流和規則,規則的 action 設置爲 mysql 便可對自定義的 mysql sink 插件進行測試。建立流和規則的步驟請參考 Kuiper 文檔。如下提供一個使用了 mysql 插件的規則供參考。

{
  "id": "ruleTest",
  "sql": "SELECT * from demo",
  "actions": [
    {
      "log": {},
      "mysql":{
        "url": "user:test@tcp(localhost:3307)/user",
        "table": "test"
      }
    }
  ]
}

須要注意的是,插件從新編譯後須要重啓 Kuiper 才能載入新的版本。

插件部署

若是生產環境和開發環境若是不一樣,開發的插件須要從新編譯並部署到生產環境。假設生產環境採用 Kuiper docker 進行部署,本節將描述如何部署插件到生產環境中。

插件編譯

插件原則上應該與生產環境 Kuiper 採用相同環境進行編譯。假設生產環境爲 Kuiper docker,則應當採用與生產環境相同版本的 dev docker 環境編譯插件。例如,生產環境採用 emqx/kuiper:0.3.0 的 docker 鏡像,則插件須要在emqx/kuiper:0.3.0-dev 的環境中進行編譯。

編譯過程請參考 Docker 編譯

插件部署

能夠採用 REST API 或者 CLI 進行插件管理。下文以 REST API 爲例,將上一節編譯的插件部署到生產環境中。

  1. 插件打包並放到 http 服務器。將上一節編譯好的插件 .so 文件及默認配置文件(只有 source 須要) .yaml 文件一塊兒打包到一個 .zip 文件中,假設爲 mysqlSink.zip。把該文件放置到生產環境也可訪問的 http 服務器中。
  2. 使用 REST API 建立插件:

    POST http://{$production_kuiper_ip}:9081/plugins/sinks
    Content-Type: application/json
    
    {"name":"mysql","file":"http://{$http_server_ip}/plugins/sinks/mysqlSink.zip"}
  3. 驗證插件是否建立成功

    GET http://{$production_kuiper_ip}:9081/plugins/sinks/mysql

    返回

    {
       "name": "mysql",
       "version": "1.0.0"
    }

至此,插件部署成功。能夠建立帶有 mysql sink 的規則進行驗證。

版權聲明: 本文爲 EMQ 原創,轉載請註明出處。

原文連接:https://www.emqx.io/cn/blog/super-lightweight-iot-edge-stream-processing-kuiper-plugin-development-tutorial

相關文章
相關標籤/搜索