MQTT研究之EMQ:【基礎研究】

EMQ版本V2, emqttd-centos7-v2.3.11-1.el7.centos.x86_64.rpmphp

下載地址:http://emqtt.com/downloads/2318/centos7-rpmhtml

機器環境: Linux CentOS7.2node

 

 

安裝完成後,默認是匿名用戶訪問。emq自己不帶發佈定於工具或者命令,須要藉助相似mosquitto_sub、mosquitto_pub或者本身寫客戶端進行鏈接測試。關於mosquitto的環境搭建,請參照mosquitto環境搭建mysql

 

api的http監聽端口:默認8080
默認的web界面URL:http://ip:18083git

 

作些基本的配置,配置API的訪問端口,改爲9909,只有不出現衝突便可。github

##--------------------------------------------------------------------
## HTTP Management API Listener

## The IP Address and Port that the EMQ HTTP API will bind.
##
## Value: IP:Port | Port
##
## Default: 0.0.0.0:8080
listener.api.mgmt = 0.0.0.0:9909

 

配置節點參數:web

#Value: <name>@<host>
node.name = iotbus3@10.95.197.3

注意,這裏格式要用上面的給定風格<name>@<host>,不然會出現啓動失敗。例如,配置成下面:
node.name = iotbus3
就會出現錯誤:redis

[root@ws3 emqttd]# systemctl start emqttd.service
Job for emqttd.service failed because the control process exited with error code. See "systemctl status emqttd.service" and "journalctl -xe" for details.

-- Unit emqttd.service has begun starting up.
12月 11 21:38:35 ws3 sh[12475]: 2018-12-11 21:38:35 Can't set long node name!
12月 11 21:38:35 ws3 sh[12475]: Please check your configuration
12月 11 21:38:35 ws3 sh[12475]: 2018-12-11 21:38:35 crash_report
12月 11 21:38:35 ws3 sh[12475]: initial_call:     pid:     registered_name:     error_info:     ancestors:     message_queue_len:     messages:     links:     dictionary:     trap_exit:     status:     heap_size:     stack_size:     red
12月 11 21:38:35 ws3 sh[12475]: supervisor:     errorContext:     reason:     offender: 2018-12-11 21:38:35 supervisor_report
12月 11 21:38:35 ws3 sh[12475]: supervisor:     errorContext:     reason:     offender: 2018-12-11 21:38:35 crash_report
12月 11 21:38:35 ws3 sh[12475]: initial_call:     pid:     registered_name:     error_info:     ancestors:     message_queue_len:     messages:     links:     dictionary:     trap_exit:     status:     heap_size:     stack_size:     red
12月 11 21:38:36 ws3 sh[12475]: application:     exited:     type: Kernel pid terminated (application_controller) ({application_start_failure,kernel,{{shutdown,{failed_to_start_child,net_sup,{shutdown,{failed_to_start_child,net_kernel,{
12月 11 21:38:36 ws3 sh[12475]: Crash dump is being written to: erl_crash.dump...done
12月 11 21:38:36 ws3 systemd[1]: emqttd.service: control process exited, code=exited status=1
12月 11 21:38:36 ws3 systemd[1]: Failed to start emqtt daemon.
-- Subject: Unit emqttd.service has failed
-- Defined-By: systemd
-- Support: http://lists.freedesktop.org/mailman/listinfo/systemd-devel
-- 
-- Unit emqttd.service has failed.
-- 
-- The result is failed.

修改正確後,啓動,從/var/log/emqttd/erlang.log.1日誌裏面能夠看到:算法

xec: /usr/lib64/emqttd/erts-9.0/bin/erlexec -boot /usr/lib64/emqttd/releases/2.3.11/emqttd -mode embedded -boot_var ERTS_LIB_DIR /usr/lib64/emqttd/erts-9.0/../lib -mnesia dir "/var/lib/emqttd/mnesia/iotbus3@10.95.197.3" -config /var/li
b/emqttd/configs/app.2018.12.11.21.42.27.config -args_file /var/lib/emqttd/configs/vm.2018.12.11.21.42.27.args -vm_args /var/lib/emqttd/configs/vm.2018.12.11.21.42.27.args -- console^M
Root: /usr/lib64/emqttd^M
/usr/lib64/emqttd^M
starting emqttd on node 'iotbus3@10.95.197.3'^M
emqttd ctl is starting...[ok]^M
emqttd hook is starting...[ok]^M
emqttd router is starting...[ok]^M
emqttd pubsub is starting...[ok]^M
emqttd stats is starting...[ok]^M
emqttd metrics is starting...[ok]^M
emqttd pooler is starting...[ok]^M
emqttd trace is starting...[ok]^M
emqttd client manager is starting...[ok]^M
emqttd session manager is starting...[ok]^M
emqttd session supervisor is starting...[ok]^M
emqttd wsclient supervisor is starting...[ok]^M
emqttd broker is starting...[ok]^M
emqttd alarm is starting...[ok]^M
emqttd mod supervisor is starting...[ok]^M
emqttd bridge supervisor is starting...[ok]^M
emqttd access control is starting...[ok]^M
emqttd system monitor is starting...[ok]^M
emqttd 2.3.11 is running now^M
Eshell V9.0  (abort with ^G)^M
(iotbus3@10.95.197.3)1> Load emq_mod_presence module successfully.^M
(iotbus3@10.95.197.3)1> dashboard:http listen on 0.0.0.0:18083 with 4 acceptors.^M (iotbus3@10.95.197.3)1> mqtt:tcp listen on 127.0.0.1:11883 with 4 acceptors.^M (iotbus3@10.95.197.3)1> mqtt:tcp listen on 0.0.0.0:1883 with 16 acceptors.^M (iotbus3@10.95.197.3)1> mqtt:ws listen on 0.0.0.0:8083 with 4 acceptors.^M (iotbus3@10.95.197.3)1> mqtt:ssl listen on 0.0.0.0:8883 with 16 acceptors.^M (iotbus3@10.95.197.3)1> mqtt:wss listen on 0.0.0.0:8084 with 4 acceptors.^M (iotbus3@10.95.197.3)1> mqtt:api listen on 0.0.0.0:9909 with 4 acceptors.^M
(iotbus3@10.95.197.3)1>

重點關注上面紅色的部分,dashboard的訪問模式,以及各類訪問接口的端口和協議類型sql

 

配置ACL,EMQ的ACL作的很是全面,因爲這裏只是研究,和mosquitto進行性能對比,也將其配置成單節點的username/password的訪問方式。
修改/etc/emqtt/emq.conf

#默認這裏是true
mqtt.allow_anonymous = false
#默認這裏是allow
mqtt.acl_nomatch = deny
mqtt.acl_file = /etc/emqttd/acl.conf

接下來,配置/etc/emqttd/acl.conf

%%--------------------------------------------------------------------
%%
%% [ACL](https://github.com/emqtt/emqttd/wiki/ACL)
%%
%% -type who() :: all | binary() |
%%                {ipaddr, esockd_access:cidr()} |
%%                {client, binary()} |
%%                {user, binary()}.
%%
%% -type access() :: subscribe | publish | pubsub.
%%
%% -type topic() :: binary().
%%
%% -type rule() :: {allow, all} |
%%                 {allow, who(), access(), list(topic())} |
%%                 {deny, all} |
%%                 {deny, who(), access(), list(topic())}.
%%
%%--------------------------------------------------------------------

{allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}.

{allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}.

{deny, all, subscribe, ["$SYS/#", {eq, "#"}]}.

這是默認的acl.conf文件,在這裏,添加一個ACL元素:

{allow, {user, "shihuc"}, pubsub, ["$SYS/#", "#"]}.

即最終這個配置好的acl.conf文件內容爲:

{allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}.
{allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}.
{deny, all, subscribe, ["$SYS/#", {eq, "#"}]}.
{allow, {user, "shihuc"}, pubsub, ["$SYS/#", "#"]}.

加載username鑑權插件emq_auth_username:

[root@ws3 emqttd]# emqttd_ctl plugins load emq_auth_username
Start apps: [emq_auth_username]
Plugin emq_auth_username loaded successfully.

添加新的用戶名和密碼:

[root@ws3 emqttd]# emqttd_ctl users add shihuc shihuc
ok

測試:

[root@ws2 logs]# mosquitto_sub -t iotbus -u shihuc -P shihuc -h 10.95.197.3 -p 1883 Error: No route to host

兩種解決方案
1. 在10.95.197.3的機器上執行 iptables -F清除防火牆設置。
2. 在10.95.197.3的機器上執行firewall-cmd --add-port=1883/tcp添加一條防火牆規則。
依照第二條,配置好防火牆後,啓動ws3上的emq服務,進行訂閱沒有問題。

 

基於username/password的方式(emq_auth_username插件)邏輯走通了。

接下來測試,發現一個問題:

[root@ws2 ~]# mosquitto_pub -t iotbus -h 10.95.197.3 -p 1883 -u shihuc -P shihuc -q 2 -m "ACL verification"

訂閱程序啓動沒有問題,可是發佈消息後,訂閱端沒有反應,說明消息被拒絕了。

分析:

1. 指令訂閱和發佈不存在鑑權錯誤,即沒有提示用戶名和密碼錯誤(以下)。
Connection Refused: bad user name or password.
Error: The connection was refused.
2. 發消息後,接收端沒有反應,說明acl裏面對topic的匹配流程鏈路出現問題,再次細讀EMQ文檔:
ACL 訪問控制規則定義:
容許(Allow)|拒絕(Deny) 誰(Who) 訂閱(Subscribe)|發佈(Publish) 主題列表(Topics)
MQTT 客戶端發起訂閱/發佈請求時,EMQ 消息服務器的訪問控制模塊,會逐條匹配 ACL 規則,直到匹配成功爲止:
          ---------              ---------              ---------
Client -> | Rule1 | --nomatch--> | Rule2 | --nomatch--> | Rule3 | --> Default
          ---------              ---------              ---------
              |                      |                      |
            match                  match                  match
             \|/                    \|/                    \|/
        allow | deny           allow | deny           allow | deny

 

結合acl.conf的配置內容:

{allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}.
{allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}.
{deny, all, subscribe, ["$SYS/#", {eq, "#"}]}.
{allow, {user, "shihuc"}, pubsub, ["$SYS/#", "#"]}.

不難發現,新加的一條規則{allow, {user, "shihuc"}, pubsub, ["$SYS/#", "#"]}.被規則{deny, all, subscribe, ["$SYS/#", {eq, "#"}]}.deny掉了,因此出現上述訂閱端沒有反應的問題。將這兩條規則調整一下前後順序便可解決問題(最新的規則):

{allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}.
{allow, {ipaddr, "127.0.0.1"}, pubsub, ["$SYS/#", "#"]}.
{allow, {user, "shihuc"}, pubsub, ["$SYS/#", "#"]}.
{deny, all, subscribe, ["$SYS/#", {eq, "#"}]}.

調整後,再次驗證發佈訂閱邏輯,一切運行正常,到此,基本的環境構建完成。

 

 

 

補充研究基於MySQL的認證和受權

1. 首先配置數據庫信息 /etc/emqtt/plugins/emq_auth_mysql.conf

##--------------------------------------------------------------------
## MySQL Auth/ACL Plugin
##--------------------------------------------------------------------

## MySQL server address.
##
## Value: Port | IP:Port
##
## Examples: 3306, 127.0.0.1:3306, localhost:3306
auth.mysql.server = 10.95.197.10:3306

## MySQL pool size.
##
## Value: Number
auth.mysql.pool = 8

## MySQL username.
##
## Value: String
auth.mysql.username = iotbususer

## MySQL password.
##
## Value: String
auth.mysql.password = iotbus2018!

## MySQL database.
##
## Value: String
auth.mysql.database = mqtt

## Variables: %u = username, %c = clientid

## Authentication query.
##
## Note that column names should be 'password' and 'salt' (if used).
## In case column names differ in your DB - please use aliases,
## e.g. "my_column_name as password".
##
## Value: SQL
##
## Variables:
##  - %u: username
##  - %c: clientid
##
auth.mysql.auth_query = select password from mqtt_user where username = '%u' limit 1
## auth.mysql.auth_query = select password_hash as password from mqtt_user where username = '%u' limit 1

## Password hash.
##
## Value: plain | md5 | sha | sha256 | bcrypt
auth.mysql.password_hash = sha256

## sha256 with salt prefix
## auth.mysql.password_hash = salt,sha256

## bcrypt with salt only prefix
## auth.mysql.password_hash = salt,bcrypt

## sha256 with salt suffix
## auth.mysql.password_hash = sha256,salt

## pbkdf2 with macfun iterations dklen
## macfun: md4, md5, ripemd160, sha, sha224, sha256, sha384, sha512
## auth.mysql.password_hash = pbkdf2,sha256,1000,20


## Superuser query.
##
## Value: SQL
##
## Variables:
##  - %u: username
##  - %c: clientid
auth.mysql.super_query = select is_superuser from mqtt_user where username = '%u' limit 1

## ACL query.
##
## Value: SQL
##
## Variables:
##  - %a: ipaddr
##  - %u: username
##  - %c: clientid
auth.mysql.acl_query = select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'

以上配置中,比較主要配置項是:

auth.mysql.server
auth.mysql.username
auth.mysql.password
auth.mysql.database
auth.mysql.password_hash

 

2. 建立mqtt所需的鑑權表

MQTT 用戶表
CREATE TABLE `mqtt_user` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `username` varchar(100) DEFAULT NULL,
  `password` varchar(100) DEFAULT NULL,
  `salt` varchar(35) DEFAULT NULL,
  `is_superuser` tinyint(1) DEFAULT 0,
  `created` datetime DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `mqtt_username` (`username`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8;
MQTT 訪問控制表
CREATE TABLE `mqtt_acl` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `allow` int(1) DEFAULT NULL COMMENT '0: deny, 1: allow',
  `ipaddr` varchar(60) DEFAULT NULL COMMENT 'IpAddress',
  `username` varchar(100) DEFAULT NULL COMMENT 'Username',
  `clientid` varchar(100) DEFAULT NULL COMMENT 'ClientId',
  `access` int(2) NOT NULL COMMENT '1: subscribe, 2: publish, 3: pubsub',
  `topic` varchar(100) NOT NULL DEFAULT '' COMMENT 'Topic Filter',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

INSERT INTO `mqtt_acl` (`id`, `allow`, `ipaddr`, `username`, `clientid`, `access`, `topic`)
VALUES
    (1,1,NULL,'$all',NULL,2,'#'),
    (2,0,NULL,'$all',NULL,1,'$SYS/#'),
    (3,0,NULL,'$all',NULL,1,'eq #'),
    (5,1,'127.0.0.1',NULL,NULL,2,'$SYS/#'),
    (6,1,'127.0.0.1',NULL,NULL,2,'#'),
    (7,1,NULL,'dashboard',NULL,1,'$SYS/#');

爲了驗證本身所加用戶是否有權限,作個驗證:

mysql> insert into mqtt_acl (allow, ipaddr, username, clientid, access, topic) values (1, NULL, "water", NULL, 1, "/taikang/#");
Query OK, 1 row affected (0.00 sec)
mysql> update mqtt_acl set access=3 where username="water";
Query OK, 1 row affected (0.01 sec)
Rows matched: 1  Changed: 1  Warnings: 0

mysql> 
mysql> select * from mqtt_acl;
+----+-------+-----------+-----------+----------+--------+------------+
| id | allow | ipaddr    | username  | clientid | access | topic      |
+----+-------+-----------+-----------+----------+--------+------------+
|  1 |     1 | NULL      | $all      | NULL     |      2 | #          |
|  2 |     0 | NULL      | $all      | NULL     |      1 | $SYS/#     |
|  3 |     0 | NULL      | $all      | NULL     |      1 | eq #       |
|  5 |     1 | 127.0.0.1 | NULL      | NULL     |      2 | $SYS/#     |
|  6 |     1 | 127.0.0.1 | NULL      | NULL     |      2 | #          |
|  7 |     1 | NULL      | dashboard | NULL     |      1 | $SYS/#     |
|  8 |     1 | NULL      | shihuc    | NULL     |      3 | /taikang/# |
|  9 |     1 | NULL      | water     | NULL     |      3 | /taikang/# |
+----+-------+-----------+-----------+----------+--------+------------+
8 rows in set (0.00 sec)

注意:這裏access是控制訪問級別的:1是sub,2是pub,3是pubsub。

 

mysql> insert into mqtt_user(username, password) values("water", "0f4168490e38b8447e11ba4bd656aa11b925bd22af30bac464bc153fdb608501");
Query OK, 1 row affected (0.00 sec)

mysql> select * from mqtt_user;
+----+----------+------------------------------------------------------------------+------+--------------+---------+
| id | username | password                                                         | salt | is_superuser | created |
+----+----------+------------------------------------------------------------------+------+--------------+---------+
|  1 | water    | 0f4168490e38b8447e11ba4bd656aa11b925bd22af30bac464bc153fdb608501 | NULL |            0 | NULL    |
+----+----------+------------------------------------------------------------------+------+--------------+---------+
1 row in set (0.00 sec)

注意這裏insert用戶記錄的時候,password字段的值是一個sha256加密算法的字符串,我這裏是經過在線加密工具獲取的密碼http://www.ttmd5.com/hash.php?type=9。

 

3. 加載配置

[root@ws3 ~]# emqttd_ctl plugins load emq_auth_mysql

查看目前有多少加載的plugin:

[root@ws3 ~]# emqttd_ctl plugins list
Plugin(emq_auth_clientid, version=2.3.11, description=Authentication with ClientId/Password, active=false)
Plugin(emq_auth_http, version=2.3.11, description=Authentication/ACL with HTTP API, active=false)
Plugin(emq_auth_jwt, version=2.3.11, description=Authentication with JWT, active=false)
Plugin(emq_auth_ldap, version=2.3.11, description=Authentication/ACL with LDAP, active=false)
Plugin(emq_auth_mongo, version=2.3.11, description=Authentication/ACL with MongoDB, active=false)
Plugin(emq_auth_mysql, version=2.3.11, description=Authentication/ACL with MySQL, active=true)
Plugin(emq_auth_pgsql, version=2.3.11, description=Authentication/ACL with PostgreSQL, active=false)
Plugin(emq_auth_redis, version=2.3.11, description=Authentication/ACL with Redis, active=false)
Plugin(emq_auth_username, version=2.3.11, description=Authentication with Username/Password, active=true)
Plugin(emq_coap, version=2.3.11, description=CoAP Gateway, active=false)
Plugin(emq_dashboard, version=2.3.11, description=EMQ Web Dashboard, active=true)
Plugin(emq_lua_hook, version=2.3.11, description=EMQ Hooks in lua, active=false)
Plugin(emq_modules, version=2.3.11, description=EMQ Modules, active=true)
Plugin(emq_plugin_template, version=2.3.11, description=EMQ Plugin Template, active=false)
Plugin(emq_recon, version=2.3.11, description=Recon Plugin, active=true)
Plugin(emq_reloader, version=2.3.11, description=Reloader Plugin, active=false)
Plugin(emq_retainer, version=2.3.11, description=EMQ Retainer, active=true)
Plugin(emq_sn, version=2.3.11, description=MQTT-SN Gateway, active=false)
Plugin(emq_stomp, version=2.3.11, description=Stomp Protocol Plugin, active=false)
Plugin(emq_web_hook, version=2.3.11, description=EMQ Webhook Plugin, active=false)

active是true的是加載運行了的,active是false的,表示未運行的,從下面的圖也能夠看。

 

最後,基於mosquitto_sub和mosquitto_pub進行驗證:

[root@ws2 ~]# mosquitto_pub -t /taikang/iotbus -h 10.95.197.3 -p 1883 -u water -P water -q 2 -m "ACL verification xxxxxxxx" 
[root@ws2 logs]# mosquitto_sub -t /taikang/iotbus -u water -P water -h 10.95.197.3 -p 1883
ACL verification xxxxxxxx

 

基於MySQL的認證和受權,操做很是靈活,具備很大的效用,物聯網中物接入很值得借鑑這個能力。

相關文章
相關標籤/搜索