輕量級邊緣計算 EMQ X Kuiper 與 Azure IoT Hub 集成方案

背景

本文以一個常見的物聯網使用場景爲案例,介紹瞭如何利用邊緣計算來實現對業務的快速、低成本和有效地處理。linux

在各種物聯網項目中,好比智能樓宇項目,須要將樓宇的數據(好比電梯、燃氣、水電等)進行採集和分析。一種解決方案是將全部的設備直接接入在雲端的物聯網平臺,相似於像 Azure IoT Hub 或者 AWS IoT Hub。這種解決方案的問題在於,git

  • 數據處理時延較長:經過 Internet 傳輸和雲端的處理後返回給設備,所需時間較長
  • 數據傳輸和存儲成本:經過 Internet 傳輸須要帶寬,對於大規模鏈接的物聯網項目來講,耗費的帶寬會至關可觀
  • 數據的安全性:有些物聯網的數據會至關敏感,所有經過物聯網傳輸的話會有風險

爲了解決以上的問題,業界提出了邊緣計算的方案,邊緣計算的核心就在於把數據進行就近處理,避免沒必要要的時延、成本和安全問題。github

業務場景

假設現有一組設備,組中的每一個設備有一個 id,經過 MQTT 協議往 MQTT 消息服務器上相應的主題發送數據。主題的設計以下,其中 {device_id} 爲設備的 id。sql

devices/{device_id}/messages

每一個設備發送的數據格式爲 JSON,發送的經過該傳感器採集的溫度與溼度數據。docker

{
    "temperature": 30, 
    "humidity" : 20
}

如今須要實時分析數據,並提出如下的需求:對每一個設備的溫度數據按照每 10 秒鐘計算平均值(t_av),而且記下 10 秒鐘內的最大值 (t_max)、最小值(t_min) 和數據條數(t_count),計算完畢後將這 4 個結果進行保存,如下爲樣例結果數據:shell

[
    {
        "device_id" : "1", "t_av" : 25,  "t_max" : 45, "t_min" : 5, "t_count" : 2
    },
    {
        "device_id" : "2", "t_av" : 25,  "t_max" : 45, "t_min" : 5, "t_count" : 2
    },
    ...
]

方案介紹

以下圖所示,採用邊緣分析/流式數據處理的方式,在邊緣端咱們採用了 EMQ X 的方案,最後將計算結果輸出到 Azure 的 IoT Hub 中。數據庫

emqx_azure.png

  • EMQ X Edge 能夠接入各類協議類型的設備,好比 MQTT、CoAP、LwM2M 等,這樣用戶能夠不須要關心協議適配方面的問題;另外它自己也比較輕量級,適合部署在邊緣設備上。
  • EMQ X Kuiper 是 EMQ 發佈的基於 SQL 的輕量級邊緣流式數據分析引擎,安裝包只有約 7MB,很是適合於運行在邊緣設備端
  • Azure IoT Hub 提供了比較全的設備接入和數據分析的方案,此處用於雲端的結果數據接入,以及應用所需的結果數據分析

實現步驟

安裝 EMQ X Edge & Kuiper

  • 寫本文的時候,EMQ X Edge 的最新版本是4.0,用戶能夠經過 Docker 來安裝和啓動 EMQ X Edgejson

    # docker pull emqx/emqx-edge
    # docker run -d --name emqx -p 1883:1883  emqx/emqx-edge:latest
    # docker ps
    CONTAINER ID        IMAGE                   COMMAND                  CREATED             STATUS              PORTS                                                                                                           NAMES
    a348e3ac150c        emqx/emqx-edge:latest   "/usr/bin/docker-entr"   3 seconds ago       Up 2 seconds        4369/tcp, 5369/tcp, 6369/tcp, 8080/tcp, 8083-8084/tcp, 8883/tcp, 11883/tcp, 0.0.0.0:1883->1883/tcp, 18083/tcp   emqx

    用戶能夠經過 telnet 命令來判斷是否啓動成功,以下所示。api

    # telnet localhost 1883
    Trying 127.0.0.1...
    Connected to localhost.
    Escape character is '^]'.
  • 安裝、啓動 Kuiper安全

    點擊這裏下載最新版 Kuiper,並解壓。在寫本文的時候,Kuiper 最新版本爲 0.0.3。

    # unzip kuiper-linux-amd64-0.0.3.zip
    # cd kuiper
    # bin/server
    Serving Kuiper server on port 20498

    若是沒法啓動,請查看日誌文件 log/stream.log

建立流

Kuiper 提供了一個命令用於管理流和規則,用戶能夠經過在命令行窗口中敲入 bin/cli 查看有哪些子命令及其幫助。cli 命令缺省鏈接的是本地的 Kuiper 服務器,cli 命令也能夠鏈接到別的 Kuiper 服務器,用戶能夠在 etc/client.yaml配置文件中修改鏈接的 Kuiper 服務器。用戶若是想了解更多關於命令行的信息,能夠參考這裏

建立流定義:建立流的目的是爲了定義發送到該流上的數據格式,相似於在關係數據庫中定義表的結構。 Kuiper 中全部支持的數據類型,能夠參考這裏

# cd kuiper
# bin/cli create stream demo '(temperature float, humidity bigint) WITH (FORMAT="JSON", DATASOURCE="devices/+/messages")'

上述語句在 Kuiper 中建立了一個名爲 demo 的流定義,包含了兩個字段,分別爲 temperature 和 humidity,數據源爲訂閱 MQTT 的主題 devices/+/messages,這裏請注意採用了通配符 +,用於訂閱不一樣設備的消息。該數據源所對應的 MQTT 服務器地址在配置文件 etc/mqtt_source.yaml中,能夠根據所在的服務器地址進行配置。以下圖所示,配置 servers 項目。

#Global MQTT configurations
default:
  qos: 1
  sharedsubscription: true
  servers: [tcp://127.0.0.1:1883]

用戶能夠在命令行中敲入 describe 子命令來查看剛建立好的流定義。

# bin/cli describe stream demo
Connecting to 127.0.0.1:20498
Fields
--------------------------------------------------------------------------------
temperature    float
humidity    bigint

FORMAT: JSON
DATASOURCE: devices/+/messages

數據業務邏輯處理

Kuiper 採用 SQL 實現業務邏輯,每10秒鐘統計溫度的平均值、最大值、最小值和次數,並根據設備 ID 進行分組,實現的 SQL 以下所示。

SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), "/", 1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 10)

這裏的 SQL 用了四個聚合函數,用於統計在10秒鐘窗口期內的相關值。

  • avg:平均值
  • max:最大值
  • min:最小值
  • count:計數

另外還使用了兩個基本的函數

  • mqtt:消息中取出 MQTT 協議的信息,mqtt(topic) 就是取得當前取得消息的主題名稱
  • split_value:該函數將第一個參數使用第二個參數進行分割,而後第三個參數指定下標,取得分割後的值。因此函數 split_value("devices/001/messages", "/", 1) 調用就返回001

GROUP BY 跟的是分組的字段,分別爲計算字段 device_id;時間窗口 TUMBLINGWINDOW(ss, 10),該時間窗口的含義爲每10秒鐘生成一批統計數據。

調試 SQL

在正式寫規則以前,咱們須要對規則進行調試,Kuiper 提供了 SQL 的調試工具,可讓用戶很是方便地對 SQL 進行調試。

  • 進入 kuiper 安裝目錄,並運行 bin/cli query
  • 在出現的命令行提示符中輸入前面準備好的 SQL 語句。

    # bin/cli query
    Connecting to 127.0.0.1:20498
    kuiper > SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), "/", 1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 10)
    query is submit successfully.
    kuiper >

    在日誌文件 log/stream.log 中,能夠看到建立了一個名爲 internal-kuiper_query_rule 的臨時規則。

    ...
    time="2019-11-12T11:56:10+08:00" level=info msg="The connection to server tcp://10.211.55.6:1883 was established successfully" rule=internal-kuiper_query_rule
    time="2019-11-12T11:56:10+08:00" level=info msg="Successfully subscribe to topic devices/+/messages" rule=internal-kuiper_query_rule

    值得注意的是,這個名爲 internal-kuiper_query_rule 的規則是經過 query 建立的,服務器端每5秒鐘會檢測一下 query 客戶端是否在線,若是query 客戶端發現有超過10秒鐘沒有反應(好比被關閉),那麼這個內部建立的 internal-kuiper_query_rule 規則會被自動刪除,被刪除的時候在日誌文件中會打印以下的信息。

    ...
    time="2019-11-12T12:04:08+08:00" level=info msg="The client seems no longer fetch the query result, stop the query now."
    time="2019-11-12T12:04:08+08:00" level=info msg="stop the query."
    time="2019-11-12T12:04:08+08:00" level=info msg="unary operator project cancelling...." rule=internal-kuiper_query_rule
    ...
  • 發送測試數據

    經過任何的測試工具,向 EMQ X Edge 發送如下的測試數據。筆者在測試過程當中用的是 JMeter 的 MQTT 插件,由於基於 JMeter 能夠作一些比較靈活的自動數據生成,業務邏輯控制,以及大量設備的模擬等。用戶也能夠直接使用 mosquitto 等其它客戶端進行模擬。

    • 主題:devices/$device_id/messages,其中$device_id 爲下面數據中的第一列
    • 消息:{"temperature": $temperature, "humidity" : $humidity}, 其中$temperature$humidity 分別爲下面數據中的第二列和第三列
    #device_id, temperature, humidity
    1,20,30
    2,31,40
    1,35,50
    2,20,30
    1,80,90
    2,45,20
    1,10,90
    2,12,30
    1,65,35
    2,55,32

咱們能夠發現發送了模擬數據後,在 query 客戶端命令行裏在兩個10秒的時間窗口裏打印了兩組數據。這裏輸出的結果條數跟用戶發送數據的頻率有關係,若是 Kuiper 在一個時間窗口內接受到全部的數據,那麼只打印一條結果。

kuiper > [{"device_id":"1","t_av":45,"t_count":3,"t_max":80,"t_min":20},{"device_id":"2","t_av":25.5,"t_count":2,"t_max":31,"t_min":20}]

[{"device_id":"2","t_av":37.333333333333336,"t_count":3,"t_max":55,"t_min":12},{"device_id":"1","t_av":37.5,"t_count":2,"t_max":65,"t_min":10}]

建立、提交規則

完成了 SQL 的調試以後,開始配置規則文件,將結果數據經過 Kuiper 的 MQTT Sink 發送到遠程的 Azure IoT Hub 中。 在 Azure IoT Hub 中,用戶須要先建立好如下內容,

  • IoT Hub:本文建立的名稱爲 rockydemo,用於接入設備
  • IoT Device:表明了一個設備,此處爲處理設備數據的網關,該網關安裝了 Kuiper,網關在把相關相關數據處理完畢後,將結果發送到 Azure 雲端
  • 設備鏈接用戶名和密碼:請參考 Azure 相關的文檔瞭解 Azure IoT MQTT 鏈接的用戶名和密碼;關於生成 SAS Token,用戶能夠參考此文檔

以下圖所示,在 Azure IoT Hub 中建立完成的相關設備。

azure_iot.jpg

編寫 Kuiper 規則文件

規則文件是一個文本文件,描述了業務處理的邏輯(前面已經調試好的 SQL 語句),以及 sink 的配置(消息處理結果的發送目的地)。鏈接 Azure IoT Hub 的大部分信息都已經在前文中描述,須要注意是必須設置 protocol_version 的值爲 3.1.1,而不能爲 3.1

{
  "sql": "SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), \"/\", 1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 10)",
  "actions": [
    {
      "log": {}
    },
    {
      "mqtt": {
        "server": "ssl://rockydemo.azure-devices.net:8883",
        "topic": "devices/demo_001/messages/events/",
        "protocol_version": "3.1.1",
        "qos": 1,
        "clientId": "demo_001",
        "username": "rockydemo.azure-devices.net/demo_001/?api-version=2018-06-30",
        "password": "SharedAccessSignature sr=*******************"
      }
    }
  ]
}

經過 Kuiper 命令行建立規則

# bin/cli create rule rule1 -f rule1.txt
Connecting to 127.0.0.1:20498
Creating a new rule from file rule1.txt. 
Rule rule1 was created.

在日誌文件中能夠查看規則的運行鏈接狀況,若是配置項都正確的話,應該能夠看到到 Azure IoT Hub 的鏈接創建成功。

......
time="2019-11-12T14:30:34+08:00" level=info msg="The connection to server tcp://10.211.55.6:1883 was established successfully" rule=rule1
time="2019-11-12T14:30:34+08:00" level=info msg="Successfully subscribe to topic devices/+/messages" rule=rule1
time="2019-11-12T14:30:35+08:00" level=info msg="The connection to server ssl://rockydemo.azure-devices.net:8883 was established successfully" rule=rule1
......
  • 經過命令 az iot hub monitor-events -n rockydemo 啓動 Azure IoT Hub 監控,並往本地的 EMQ X Edge 上發送跟調試 SQL 語句同樣的模擬數據。通過 Kuiper 處理後,相應的處理結果被髮送到了 Azure IoT Hub 中。

    #az iot hub monitor-events -n rockydemo
    Starting event monitor, use ctrl-c to stop...
    {
        "event": {
            "origin": "demo_001",
            "payload": "[{\"device_id\":\"2\",\"t_av\":32,\"t_count\":3,\"t_max\":45,\"t_min\":20},{\"device_id\":\"1\",\"t_av\":45,\"t_count\":3,\"t_max\":80,\"t_min\":20}]"
        }
    }
    {
        "event": {
            "origin": "demo_001",
            "payload": "[{\"device_id\":\"2\",\"t_av\":33.5,\"t_count\":2,\"t_max\":55,\"t_min\":12},{\"device_id\":\"1\",\"t_av\":37.5,\"t_count\":2,\"t_max\":65,\"t_min\":10}]"
        }
    }

總結

經過本文,讀者能夠了解到利用 EMQ X 在邊緣端的解決方案能夠很是快速、靈活地開發出基於邊緣數據分析的系統,實現數據低時延、低成本和安全的處理。Azure IoT 也提供了 IoT Edge 方案,與 Azure 的方案相比,

  • Kuiper 的運行時很是輕量級;Azure IoT Edge 方案須要提供相關語言的運行時,安裝包相對來講會比較大。
  • Kuiper 基於 SQL 實現業務邏輯的實現方式更加快速簡單,對複雜的業務邏輯處理缺少必定的靈活性;Azure IoT Edge 在業務實現的靈活度上相對來講更佳。
  • Kuiper 在與第三方的 IoT Hub 進行集成的時候靈活性更好。Azure IoT Edge 通常只跟 Azure IoT Hub 進行對接。

若是有興趣瞭解更多關於邊緣流式數據分析的內容,請參考 Kuiper 開源項目


更多信息請訪問咱們的官網 emqx.io,或關注咱們的開源項目 github.com/emqx/emqx ,詳細文檔請訪問 官方文檔

相關文章
相關標籤/搜索