輕量級邊緣計算 EMQ X Kuiper 與 AWS IoT 集成方案

背景

本文以一個常見的物聯網使用場景爲案例,介紹瞭如何利用邊緣計算來實現對業務的快速、低成本和有效地處理。html

在各種物聯網項目中,好比智能樓宇項目,須要將樓宇的數據(好比電梯、燃氣、水電等)進行採集和分析。一種解決方案是將全部的設備直接接入在雲端的物聯網平臺,相似於像 AWS IoT 或者 Azure IoT Hub。這種解決方案的問題在於,前端

  • 數據處理時延較長:經過 Internet 傳輸和雲端的處理後返回給設備,所需時間較長
  • 數據傳輸和存儲成本:經過 Internet 傳輸須要帶寬,對於大規模鏈接的物聯網項目來講,耗費的帶寬會至關可觀
  • 數據的安全性:有些物聯網的數據會至關敏感,所有經過物聯網傳輸的話會有風險

爲了解決以上的問題,業界提出了邊緣計算的方案,邊緣計算的核心就在於把數據進行就近處理,避免沒必要要的時延、成本和安全問題。linux

業務場景

假設現有一組設備,組中的每一個設備有一個 id,經過 MQTT 協議往 MQTT 消息服務器上相應的主題發送數據。主題的設計以下,其中 {device_id} 爲設備的 id。git

devices/{device_id}/messages

每一個設備發送的數據格式爲 JSON,發送的經過該傳感器採集的溫度與溼度數據。github

{
    "temperature": 30, 
    "humidity" : 20
}

如今須要實時分析數據,並提出如下的需求:對每一個設備的溫度數據按照每 10 秒鐘計算平均值(t_av),而且記下 10 秒鐘內的最大值 (t_max)、最小值(t_min) 和數據條數(t_count),計算完畢後將這 4 個結果進行保存,如下爲樣例結果數據:sql

[
    {
        "device_id" : "1", "t_av" : 25,  "t_max" : 45, "t_min" : 5, "t_count" : 2
    },
    {
        "device_id" : "2", "t_av" : 25,  "t_max" : 45, "t_min" : 5, "t_count" : 2
    },
    ...
]

方案介紹

以下圖所示,採用邊緣分析/流式數據處理的方式,在邊緣端咱們採用了 EMQ X 的方案,最後將計算結果輸出到 AWS IoT 中。docker

emqx_aws.png

  • EMQ X Edge 能夠接入各類協議類型的設備,好比 MQTT、CoAP、LwM2M 等,這樣用戶能夠不須要關心協議適配方面的問題;另外它自己也比較輕量級,適合部署在邊緣設備上
  • EMQ X Kuiper 是 EMQ 發佈的基於 SQL 的輕量級邊緣流式數據分析引擎,安裝包只有約 7MB,很是適合於運行在邊緣設備端
  • AWS IoT 提供了比較全的設備接入和數據分析的方案,此處用於雲端的結果數據接入,以及應用所需的結果數據分析

實現步驟

安裝 EMQ X Edge & Kuiper

  • 寫本文的時候,EMQ X Edge 的最新版本是4.0,用戶能夠經過 Docker 來安裝和啓動 EMQ X Edgeshell

    # docker pull emqx/emqx-edge
    # docker run -d --name emqx -p 1883:1883  emqx/emqx-edge:latest
    # docker ps
    CONTAINER ID        IMAGE                   COMMAND                  CREATED             STATUS              PORTS                                                                                                           NAMES
    a348e3ac150c        emqx/emqx-edge:latest   "/usr/bin/docker-entr"   3 seconds ago       Up 2 seconds        4369/tcp, 5369/tcp, 6369/tcp, 8080/tcp, 8083-8084/tcp, 8883/tcp, 11883/tcp, 0.0.0.0:1883->1883/tcp, 18083/tcp   emqx

    用戶能夠經過 telnet 命令來判斷是否啓動成功,以下所示。數據庫

    # telnet localhost 1883
    Trying 127.0.0.1...
    Connected to localhost.
    Escape character is '^]'.
  • 安裝、啓動 Kuiper編程

    點擊這裏下載最新版 Kuiper,並解壓。在寫本文的時候,Kuiper 最新版本爲 0.0.3。

    # unzip kuiper-linux-amd64-0.0.3.zip
    # cd kuiper
    # bin/server
    Serving Kuiper server on port 20498

    若是沒法啓動,請查看日誌文件 log/stream.log

建立流

Kuiper 提供了一個命令用於管理流和規則,用戶能夠經過在命令行窗口中敲入 bin/cli 查看有哪些子命令及其幫助。cli 命令缺省鏈接的是本地的 Kuiper 服務器,cli 命令也能夠鏈接到別的 Kuiper 服務器,用戶能夠在 etc/client.yaml配置文件中修改鏈接的 Kuiper 服務器。用戶若是想了解更多關於命令行的信息,能夠參考這裏

建立流定義:建立流的目的是爲了定義發送到該流上的數據格式,相似於在關係數據庫中定義表的結構。 Kuiper 中全部支持的數據類型,能夠參考這裏

# cd kuiper
# bin/cli create stream demo '(temperature float, humidity bigint) WITH (FORMAT="JSON", DATASOURCE="devices/+/messages")'

上述語句在 Kuiper 中建立了一個名爲 demo 的流定義,包含了兩個字段,分別爲 temperature 和 humidity,數據源爲訂閱 MQTT 的主題 devices/+/messages,這裏請注意採用了通配符 +,用於訂閱不一樣設備的消息。該數據源所對應的 MQTT 服務器地址在配置文件 etc/mqtt_source.yaml中,能夠根據所在的服務器地址進行配置。以下圖所示,配置 servers 項目。

#Global MQTT configurations
default:
  qos: 1
  sharedsubscription: true
  servers: [tcp://127.0.0.1:1883]

用戶能夠在命令行中敲入 describe 子命令來查看剛建立好的流定義。

# bin/cli describe stream demo
Connecting to 127.0.0.1:20498
Fields
--------------------------------------------------------------------------------
temperature    float
humidity    bigint

FORMAT: JSON
DATASOURCE: devices/+/messages

數據業務邏輯處理

Kuiper 採用 SQL 實現業務邏輯,每10秒鐘統計溫度的平均值、最大值、最小值和次數,並根據設備 ID 進行分組,實現的 SQL 以下所示。

SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), "/", 1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 10)

這裏的 SQL 用了四個聚合函數,用於統計在10秒鐘窗口期內的相關值。

  • avg:平均值
  • max:最大值
  • min:最小值
  • count:計數

另外還使用了兩個基本的函數

  • mqtt:消息中取出 MQTT 協議的信息,mqtt(topic) 就是取得當前取得消息的主題名稱
  • split_value:該函數將第一個參數使用第二個參數進行分割,而後第三個參數指定下標,取得分割後的值。因此函數 split_value("devices/001/messages", "/", 1) 調用就返回001

GROUP BY 跟的是分組的字段,分別爲計算字段 device_id;時間窗口 TUMBLINGWINDOW(ss, 10),該時間窗口的含義爲每10秒鐘生成一批統計數據。

調試 SQL

在正式寫規則以前,咱們須要對規則進行調試,Kuiper 提供了 SQL 的調試工具,可讓用戶很是方便地對 SQL 進行調試。

  • 進入 kuiper 安裝目錄,並運行 bin/cli query
  • 在出現的命令行提示符中輸入前面準備好的 SQL 語句。

    # bin/cli query
    Connecting to 127.0.0.1:20498
    kuiper > SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), "/", 1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 10)
    query is submit successfully.
    kuiper >

    在日誌文件 log/stream.log 中,能夠看到建立了一個名爲 internal-kuiper_query_rule 的臨時規則。

    ...
    time="2019-11-12T11:56:10+08:00" level=info msg="The connection to server tcp://10.211.55.6:1883 was established successfully" rule=internal-kuiper_query_rule
    time="2019-11-12T11:56:10+08:00" level=info msg="Successfully subscribe to topic devices/+/messages" rule=internal-kuiper_query_rule

    值得注意的是,這個名爲 internal-kuiper_query_rule 的規則是經過 query 建立的,服務器端每5秒鐘會檢測一下 query 客戶端是否在線,若是query 客戶端發現有超過10秒鐘沒有反應(好比被關閉),那麼這個內部建立的 internal-kuiper_query_rule 規則會被自動刪除,被刪除的時候在日誌文件中會打印以下的信息。

    ...
    time="2019-11-12T12:04:08+08:00" level=info msg="The client seems no longer fetch the query result, stop the query now."
    time="2019-11-12T12:04:08+08:00" level=info msg="stop the query."
    time="2019-11-12T12:04:08+08:00" level=info msg="unary operator project cancelling...." rule=internal-kuiper_query_rule
    ...
  • 發送測試數據

    經過任何的測試工具,向 EMQ X Edge 發送如下的測試數據。筆者在測試過程當中用的是 JMeter 的 MQTT 插件,由於基於 JMeter 能夠作一些比較靈活的自動數據生成,業務邏輯控制,以及大量設備的模擬等。用戶也能夠直接使用 mosquitto 等其它客戶端進行模擬。

    • 主題:devices/$device_id/messages,其中$device_id 爲下面數據中的第一列
    • 消息:{"temperature": $temperature, "humidity" : $humidity}, 其中$temperature$humidity 分別爲下面數據中的第二列和第三列
    #device_id, temperature, humidity
    1,20,30
    2,31,40
    1,35,50
    2,20,30
    1,80,90
    2,45,20
    1,10,90
    2,12,30
    1,65,35
    2,55,32

咱們能夠發現發送了模擬數據後,在 query 客戶端命令行裏在兩個10秒的時間窗口裏打印了兩組數據。這裏輸出的結果條數跟用戶發送數據的頻率有關係,若是 Kuiper 在一個時間窗口內接受到全部的數據,那麼只打印一條結果。

kuiper > [{"device_id":"1","t_av":45,"t_count":3,"t_max":80,"t_min":20},{"device_id":"2","t_av":25.5,"t_count":2,"t_max":31,"t_min":20}]

[{"device_id":"2","t_av":37.333333333333336,"t_count":3,"t_max":55,"t_min":12},{"device_id":"1","t_av":37.5,"t_count":2,"t_max":65,"t_min":10}]

建立、提交規則

完成了 SQL 的調試以後,開始配置規則文件,將結果數據經過 Kuiper 的 MQTT Sink 發送到遠程的 AWS IoT 中。 在 AWS IoT 中,用戶須要先建立好如下內容,

  • 設備:表明處理設備數據的網關,該網關安裝了 Kuiper,網關在把相關相關數據處理完畢後,將結果發送到 AWS 雲端。此處建立的名稱爲 demo,以下圖所示。

aws_device.png

  • 設備鏈接證書與密鑰:AWS 的物聯網設備經過證書來鏈接,保證其安全性。在建立設備的過程當中,AWS會生成的如下三個文件。這裏會用到的是證書與私鑰。

    • 證書:相似於 d3807d9fa5-certificate.pem
    • 私鑰:相似於 d3807d9fa5-private.pem.key
    • 公鑰:相似於 d3807d9fa5-public.pem.key

關於這方面更多的信息,請參考 AWS 建立設備的文檔

編寫 Kuiper 規則文件

規則文件是一個文本文件,描述了業務處理的邏輯(前面已經調試好的 SQL 語句),以及 sink 的配置(消息處理結果的發送目的地)。鏈接 AWS IoT 的大部分信息都已經在前文中描述。 Kuiper 的測試結果將被髮送到 AWS IoT設備的 devices/result 主題下。

{
  "sql": "SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), \"/\", 1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 10)",
  "actions": [
    {
      "log": {}
    },
    {
      "mqtt": {
        "server": "ssl://xyz-ats.iot.us-east-1.amazonaws.com:8883",
        "topic": "devices/result",
        "qos": 1,
        "clientId": "demo_001",
        "certificationPath": "/var/aws/d3807d9fa5-certificate.pem",
        "privateKeyPath": "/var/aws/d3807d9fa5-private.pem.key"
      }
    }
  ]
}

經過 Kuiper 命令行建立規則

# bin/cli create rule rule1 -f rule1.txt
Connecting to 127.0.0.1:20498
Creating a new rule from file rule1.txt. 
Rule rule1 was created.

在日誌文件中能夠查看規則的運行鏈接狀況,若是配置項都正確的話,應該能夠看到到 AWS IoT 的鏈接創建成功。

......
time="2019-11-13T17:41:19+08:00" level=info msg="The connection to server tcp://10.211.55.6:1883 was established successfully" rule=rule1
time="2019-11-13T17:41:19+08:00" level=info msg="Successfully subscribe to topic devices/+/messages" rule=rule1
time="2019-11-13T17:41:20+08:00" level=info msg="The connection to server ssl://xyz-ats.iot.us-east-1.amazonaws.com:8883 was established successfully" rule=rule1
......
  • 經過 AWS IoT 提供的 MQTT Client 工具,訂閱設備的devices/result主題。並往本地的 EMQ X Edge 上發送模擬數據。通過 Kuiper 處理後,相應的處理結果被髮送到了 AWS IoT 中。以下圖所示,收到了兩次測試結果(第一次結果被摺疊)。

aws_iot_result.png

用戶能夠經過 AWS IoT Rule 將分析結果存儲到 Amazon DynamoDB 數據庫或者其它服務中,前端的應用程序能夠經過讀取 DynamoDB 中的數據來呈現給終端用戶,具體請參考 Amazon DynamoDB 文檔

總結

經過本文,讀者能夠了解到利用 EMQ X 在邊緣端的解決方案能夠很是快速、靈活地開發出基於邊緣數據分析的系統,實現數據低時延、低成本和安全的處理。

AWS IoT 也提供了 Greengrass 的邊緣解決方案,與 AWS Greengrass 相比,Kuiper 方案更加輕量級,業務邏輯的實現方式(基於 SQL)在編程上也相對更加簡單。AWS Greengrass 提供了基於 Lambada 的編程模型,經過提供不一樣編程語言的 SDK 來實現邊緣端的數據分析,以及往 AWS IoT 的分析結果上傳,所以在業務邏輯實現的靈活性上會更好。最後,與 Greengrass 只能鏈接 AWS IoT 不一樣,Kuiper 在與不一樣的第三方 IoT 平臺的集成的靈活性上也更好。

若是有興趣瞭解更多關於邊緣流式數據分析的內容,請參考 Kuiper 開源項目


更多信息請訪問咱們的官網 emqx.io,或關注咱們的開源項目 github.com/emqx/emqx ,詳細文檔請訪問 官方文檔

相關文章
相關標籤/搜索