本文以一個常見的物聯網使用場景爲案例,介紹瞭如何利用邊緣計算來實現對業務的快速、低成本和有效地處理。html
在各種物聯網項目中,好比智能樓宇項目,須要將樓宇的數據(好比電梯、燃氣、水電等)進行採集和分析。一種解決方案是將全部的設備直接接入在雲端的物聯網平臺,相似於像 AWS IoT 或者 Azure IoT Hub。這種解決方案的問題在於,前端
爲了解決以上的問題,業界提出了邊緣計算的方案,邊緣計算的核心就在於把數據進行就近處理,避免沒必要要的時延、成本和安全問題。linux
假設現有一組設備,組中的每一個設備有一個 id,經過 MQTT 協議往 MQTT 消息服務器上相應的主題發送數據。主題的設計以下,其中 {device_id} 爲設備的 id。git
devices/{device_id}/messages
每一個設備發送的數據格式爲 JSON,發送的經過該傳感器採集的溫度與溼度數據。github
{ "temperature": 30, "humidity" : 20 }
如今須要實時分析數據,並提出如下的需求:對每一個設備的溫度數據按照每 10 秒鐘計算平均值(t_av
),而且記下 10 秒鐘內的最大值 (t_max
)、最小值(t_min
) 和數據條數(t_count
),計算完畢後將這 4 個結果進行保存,如下爲樣例結果數據:sql
[ { "device_id" : "1", "t_av" : 25, "t_max" : 45, "t_min" : 5, "t_count" : 2 }, { "device_id" : "2", "t_av" : 25, "t_max" : 45, "t_min" : 5, "t_count" : 2 }, ... ]
以下圖所示,採用邊緣分析/流式數據處理的方式,在邊緣端咱們採用了 EMQ X 的方案,最後將計算結果輸出到 AWS IoT 中。docker
寫本文的時候,EMQ X Edge 的最新版本是4.0,用戶能夠經過 Docker 來安裝和啓動 EMQ X Edgeshell
# docker pull emqx/emqx-edge # docker run -d --name emqx -p 1883:1883 emqx/emqx-edge:latest # docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES a348e3ac150c emqx/emqx-edge:latest "/usr/bin/docker-entr" 3 seconds ago Up 2 seconds 4369/tcp, 5369/tcp, 6369/tcp, 8080/tcp, 8083-8084/tcp, 8883/tcp, 11883/tcp, 0.0.0.0:1883->1883/tcp, 18083/tcp emqx
用戶能夠經過 telnet
命令來判斷是否啓動成功,以下所示。數據庫
# telnet localhost 1883 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'.
安裝、啓動 Kuiper編程
點擊這裏下載最新版 Kuiper,並解壓。在寫本文的時候,Kuiper 最新版本爲 0.0.3。
# unzip kuiper-linux-amd64-0.0.3.zip # cd kuiper # bin/server Serving Kuiper server on port 20498
若是沒法啓動,請查看日誌文件 log/stream.log
。
Kuiper 提供了一個命令用於管理流和規則,用戶能夠經過在命令行窗口中敲入 bin/cli
查看有哪些子命令及其幫助。cli
命令缺省鏈接的是本地的 Kuiper 服務器,cli
命令也能夠鏈接到別的 Kuiper 服務器,用戶能夠在 etc/client.yaml
配置文件中修改鏈接的 Kuiper 服務器。用戶若是想了解更多關於命令行的信息,能夠參考這裏。
建立流定義:建立流的目的是爲了定義發送到該流上的數據格式,相似於在關係數據庫中定義表的結構。 Kuiper 中全部支持的數據類型,能夠參考這裏。
# cd kuiper # bin/cli create stream demo '(temperature float, humidity bigint) WITH (FORMAT="JSON", DATASOURCE="devices/+/messages")'
上述語句在 Kuiper 中建立了一個名爲 demo 的流定義,包含了兩個字段,分別爲 temperature 和 humidity,數據源爲訂閱 MQTT 的主題 devices/+/messages
,這裏請注意採用了通配符 +
,用於訂閱不一樣設備的消息。該數據源所對應的 MQTT 服務器地址在配置文件 etc/mqtt_source.yaml
中,能夠根據所在的服務器地址進行配置。以下圖所示,配置 servers
項目。
#Global MQTT configurations default: qos: 1 sharedsubscription: true servers: [tcp://127.0.0.1:1883]
用戶能夠在命令行中敲入 describe
子命令來查看剛建立好的流定義。
# bin/cli describe stream demo Connecting to 127.0.0.1:20498 Fields -------------------------------------------------------------------------------- temperature float humidity bigint FORMAT: JSON DATASOURCE: devices/+/messages
Kuiper 採用 SQL 實現業務邏輯,每10秒鐘統計溫度的平均值、最大值、最小值和次數,並根據設備 ID 進行分組,實現的 SQL 以下所示。
SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), "/", 1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 10)
這裏的 SQL 用了四個聚合函數,用於統計在10秒鐘窗口期內的相關值。
avg
:平均值max
:最大值min
:最小值count
:計數另外還使用了兩個基本的函數
mqtt
:消息中取出 MQTT 協議的信息,mqtt(topic)
就是取得當前取得消息的主題名稱split_value
:該函數將第一個參數使用第二個參數進行分割,而後第三個參數指定下標,取得分割後的值。因此函數 split_value("devices/001/messages", "/", 1)
調用就返回001
GROUP BY
跟的是分組的字段,分別爲計算字段 device_id
;時間窗口 TUMBLINGWINDOW(ss, 10)
,該時間窗口的含義爲每10秒鐘生成一批統計數據。
在正式寫規則以前,咱們須要對規則進行調試,Kuiper 提供了 SQL 的調試工具,可讓用戶很是方便地對 SQL 進行調試。
bin/cli query
在出現的命令行提示符中輸入前面準備好的 SQL 語句。
# bin/cli query Connecting to 127.0.0.1:20498 kuiper > SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), "/", 1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 10) query is submit successfully. kuiper >
在日誌文件 log/stream.log
中,能夠看到建立了一個名爲 internal-kuiper_query_rule
的臨時規則。
... time="2019-11-12T11:56:10+08:00" level=info msg="The connection to server tcp://10.211.55.6:1883 was established successfully" rule=internal-kuiper_query_rule time="2019-11-12T11:56:10+08:00" level=info msg="Successfully subscribe to topic devices/+/messages" rule=internal-kuiper_query_rule
值得注意的是,這個名爲 internal-kuiper_query_rule
的規則是經過 query
建立的,服務器端每5秒鐘會檢測一下 query
客戶端是否在線,若是query
客戶端發現有超過10秒鐘沒有反應(好比被關閉),那麼這個內部建立的 internal-kuiper_query_rule
規則會被自動刪除,被刪除的時候在日誌文件中會打印以下的信息。
... time="2019-11-12T12:04:08+08:00" level=info msg="The client seems no longer fetch the query result, stop the query now." time="2019-11-12T12:04:08+08:00" level=info msg="stop the query." time="2019-11-12T12:04:08+08:00" level=info msg="unary operator project cancelling...." rule=internal-kuiper_query_rule ...
發送測試數據
經過任何的測試工具,向 EMQ X Edge 發送如下的測試數據。筆者在測試過程當中用的是 JMeter 的 MQTT 插件,由於基於 JMeter 能夠作一些比較靈活的自動數據生成,業務邏輯控制,以及大量設備的模擬等。用戶也能夠直接使用 mosquitto
等其它客戶端進行模擬。
devices/$device_id/messages
,其中$device_id
爲下面數據中的第一列{"temperature": $temperature, "humidity" : $humidity}
, 其中$temperature
和 $humidity
分別爲下面數據中的第二列和第三列#device_id, temperature, humidity 1,20,30 2,31,40 1,35,50 2,20,30 1,80,90 2,45,20 1,10,90 2,12,30 1,65,35 2,55,32
咱們能夠發現發送了模擬數據後,在 query
客戶端命令行裏在兩個10秒的時間窗口裏打印了兩組數據。這裏輸出的結果條數跟用戶發送數據的頻率有關係,若是 Kuiper 在一個時間窗口內接受到全部的數據,那麼只打印一條結果。
kuiper > [{"device_id":"1","t_av":45,"t_count":3,"t_max":80,"t_min":20},{"device_id":"2","t_av":25.5,"t_count":2,"t_max":31,"t_min":20}] [{"device_id":"2","t_av":37.333333333333336,"t_count":3,"t_max":55,"t_min":12},{"device_id":"1","t_av":37.5,"t_count":2,"t_max":65,"t_min":10}]
完成了 SQL 的調試以後,開始配置規則文件,將結果數據經過 Kuiper 的 MQTT Sink 發送到遠程的 AWS IoT 中。 在 AWS IoT 中,用戶須要先建立好如下內容,
設備鏈接證書與密鑰:AWS 的物聯網設備經過證書來鏈接,保證其安全性。在建立設備的過程當中,AWS會生成的如下三個文件。這裏會用到的是證書與私鑰。
d3807d9fa5-certificate.pem
d3807d9fa5-private.pem.key
d3807d9fa5-public.pem.key
關於這方面更多的信息,請參考 AWS 建立設備的文檔。
編寫 Kuiper 規則文件
規則文件是一個文本文件,描述了業務處理的邏輯(前面已經調試好的 SQL 語句),以及 sink 的配置(消息處理結果的發送目的地)。鏈接 AWS IoT 的大部分信息都已經在前文中描述。 Kuiper 的測試結果將被髮送到 AWS IoT設備的 devices/result
主題下。
{ "sql": "SELECT avg(temperature) AS t_av, max(temperature) AS t_max, min(temperature) AS t_min, COUNT(*) As t_count, split_value(mqtt(topic), \"/\", 1) AS device_id FROM demo GROUP BY device_id, TUMBLINGWINDOW(ss, 10)", "actions": [ { "log": {} }, { "mqtt": { "server": "ssl://xyz-ats.iot.us-east-1.amazonaws.com:8883", "topic": "devices/result", "qos": 1, "clientId": "demo_001", "certificationPath": "/var/aws/d3807d9fa5-certificate.pem", "privateKeyPath": "/var/aws/d3807d9fa5-private.pem.key" } } ] }
經過 Kuiper 命令行建立規則
# bin/cli create rule rule1 -f rule1.txt Connecting to 127.0.0.1:20498 Creating a new rule from file rule1.txt. Rule rule1 was created.
在日誌文件中能夠查看規則的運行鏈接狀況,若是配置項都正確的話,應該能夠看到到 AWS IoT 的鏈接創建成功。
...... time="2019-11-13T17:41:19+08:00" level=info msg="The connection to server tcp://10.211.55.6:1883 was established successfully" rule=rule1 time="2019-11-13T17:41:19+08:00" level=info msg="Successfully subscribe to topic devices/+/messages" rule=rule1 time="2019-11-13T17:41:20+08:00" level=info msg="The connection to server ssl://xyz-ats.iot.us-east-1.amazonaws.com:8883 was established successfully" rule=rule1 ......
devices/result
主題。並往本地的 EMQ X Edge 上發送模擬數據。通過 Kuiper 處理後,相應的處理結果被髮送到了 AWS IoT 中。以下圖所示,收到了兩次測試結果(第一次結果被摺疊)。
用戶能夠經過 AWS IoT Rule 將分析結果存儲到 Amazon DynamoDB 數據庫或者其它服務中,前端的應用程序能夠經過讀取 DynamoDB 中的數據來呈現給終端用戶,具體請參考 Amazon DynamoDB 文檔。
經過本文,讀者能夠了解到利用 EMQ X 在邊緣端的解決方案能夠很是快速、靈活地開發出基於邊緣數據分析的系統,實現數據低時延、低成本和安全的處理。
AWS IoT 也提供了 Greengrass 的邊緣解決方案,與 AWS Greengrass 相比,Kuiper 方案更加輕量級,業務邏輯的實現方式(基於 SQL)在編程上也相對更加簡單。AWS Greengrass 提供了基於 Lambada 的編程模型,經過提供不一樣編程語言的 SDK 來實現邊緣端的數據分析,以及往 AWS IoT 的分析結果上傳,所以在業務邏輯實現的靈活性上會更好。最後,與 Greengrass 只能鏈接 AWS IoT 不一樣,Kuiper 在與不一樣的第三方 IoT 平臺的集成的靈活性上也更好。
若是有興趣瞭解更多關於邊緣流式數據分析的內容,請參考 Kuiper 開源項目。
更多信息請訪問咱們的官網 emqx.io,或關注咱們的開源項目 github.com/emqx/emqx ,詳細文檔請訪問 官方文檔。