EMQ X 規則引擎系列(十一)存儲消息到 PostgreSQL 數據庫

畫板 17@2x.png

PostgreSQL 數據庫介紹

做爲開源關係數據庫重要一員,PostgreSQL 標榜本身是世界上最早進的開源數據庫,相比於其餘開源關係數據庫如 MySQL,PostgreSQL 是徹底由社區驅動的開源項目,由全世界超過 1000 名貢獻者所維護。PostgreSQL 提供了單個完整功能的版本,而不像 MySQL 那樣提供了多個不一樣的社區版、商業版與企業版。PostgreSQL 基於自由的 BSD/MIT 許可,組織可使用、複製、修改和從新分發代碼,只須要提供一個版權聲明便可。node

PostgreSQL 具備諸多特性,在 GIS 領域有較多支持,其「無鎖定」特性很是突出,支持函數和條件索引,有成熟的集羣方案。PostgreSQL 還具有及其強悍的 SQL 編程能力如統計函數和統計語法支持,經過 Timescaledb 插件,PostgreSQL 能夠轉變爲功能完備的時序數據庫 Timescaledb 。sql

場景介紹

該場景須要將 EMQ X 指定主題下且知足條件的消息存儲到 PostgreSQL 數據庫。爲了便於後續分析檢索,消息內容須要進行拆分存儲。docker

該場景下客戶端上報數據以下:shell

  • Topic:testtopic
  • Payload:數據庫

    {"msg":"Hello, World!"}

準備工做

建立數據庫

建立 tutorial 數據庫,用戶名爲 postgres,密碼爲 password:編程

$ docker pull postgres

$ docker run --rm --name postgres -p 5432:5432 -e POSTGRES_PASSWORD=password -d postgres:latest

$ docker exec -it postgres psql -U postgres

> CREATE database tutorial;

> \c tutorial

建立數據表

建立 t_mqtt_msg 表:json

CREATE TABLE t_mqtt_msg (
  id SERIAL primary key,
  msgid character varying(64),
  sender character varying(64),
  topic character varying(255),
  qos integer,
  retain integer,
  payload text,
  arrived timestamp without time zone
);

配置說明

建立資源

打開 EMQ X Dashboard,進入左側菜單的 資源 頁面,點擊 新建 按鈕,選擇 PostgreSQL 資源類型並完成相關配置進行資源建立。socket

image-20190725142933513.png

建立規則

進入左側菜單的 規則 頁面,點擊 新建 按鈕,進行規則建立。這裏選擇觸發事件 message.publish,即在 EMQ X 收到 PUBLISH 消息時觸發該規則進行數據處理。函數

選定觸發事件後,咱們可在界面上看到可選字段及示例 SQL:工具

image-20190719112141128.png

篩選所需字段

規則引擎使用 SQL 語句過濾和處理數據。這裏咱們須要 msgid, topic, payload 等數據,同時但願匹配全部主題的消息,所以僅須要在默認 SQL 基礎上刪除 WHERE 子句便可,最終咱們獲得 SQL 以下:

SELECT
  *
FROM
  "message.publish"

SQL 測試

藉助 SQL 測試功能,咱們能夠快速確認剛剛填寫的 SQL 語句可否達成咱們的目的。首先填寫用於測試的 payload 等數據以下:

image-20190725145617081.png

而後點擊 測試 按鈕,咱們獲得如下數據輸出:

{
  "client_id": "c_emqx",
  "event": "message.publish",
  "id": "589A429E9572FB44B0000057C0001",
  "node": "emqx@127.0.0.1",
  "payload": "{\"msg\":\"Hello, World!\"}",
  "peername": "127.0.0.1:50891",
  "qos": 1,
  "retain": 0,
  "timestamp": 1564037750692,
  "topic": "testtopic",
  "username": "u_emqx"
}

測試輸出包含了全部須要的數據,咱們能夠進行後續步驟。

添加響應動做,存儲消息到 PostgreSQL

SQL 條件輸入輸出無誤後,咱們繼續添加相應動做,配置寫入 SQL 語句,將篩選結果存儲到 PostgreSQL。

點擊響應動做中的 添加 按鈕,選擇 保存數據到 PostgreSQL 動做,選取剛剛建立的 PostgreSQL 資源並填寫 SQL 模板以下:

insert into t_mqtt_msg(msgid, topic, qos, retain, payload, arrived) values (${id}, ${topic}, ${qos}, ${retain}, ${payload}, to_timestamp(${timestamp}::double precision /1000)) returning id

最後點擊 新建 按鈕完成規則建立。

image-20190725144256942.png

測試

預期結果

咱們成功建立了一條規則,包含一個處理動做,動做指望效果以下:

  1. 客戶端上報消息時,該消息將命中 SQL,規則列表中 已命中 數字增長 1;
  2. PostgreSQL tutorial 數據庫的 t_mqtt_msg 表中將增長一條數據,數據內容與消息內容一致。

使用 Dashboard 中的 Websocket 工具測試

切換到 工具 --> Websocket 頁面,使用任意信息客戶端鏈接到 EMQ X,鏈接成功後在 消息 卡片中發送以下消息:

  • Topic:testtopic
  • Payload:

    {"msg":"Hello, World!"}

image-20190725145805279.png

點擊 發送 按鈕,發送成功後能夠看到當前規則已命中次數已經變爲了 1。

而後檢查 PostgreSQL,新的 data point 是否添加成功:

image-20190725145107685.png

至此,咱們經過規則引擎實現了使用規則引擎存儲消息到 PostgreSQL 數據庫的業務開發。

版權聲明: 本文爲 EMQ 原創,轉載請註明出處。

原文連接:https://www.emqx.io/cn/blog/e...

相關文章
相關標籤/搜索