MQTT研究之EMQ:【EMQ之HTTP認證/訪問控制】

今天進行驗證的邏輯是EMQ的http的Auth以及ACL的邏輯。html

 

首先,參照HTTP插件認證配置的說明文檔進行基本的配置, 個人配置內容以下:java

##--------------------------------------------------------------------
## HTTP Auth/ACL Plugin
##--------------------------------------------------------------------

##--------------------------------------------------------------------
## Authentication request.
##
## Variables:
##  - %u: username
##  - %c: clientid
##  - %a: ipaddress
##  - %P: password
##
## Value: URL
auth.http.auth_req = http://10.95.177.137:8899/scc/mqtt/auth
## Value: post | get | put
auth.http.auth_req.method = post
## Value: Params
auth.http.auth_req.params = clientid=%c,username=%u,password=%P

##--------------------------------------------------------------------
## Superuser request.
##
## Variables:
##  - %u: username
##  - %c: clientid
##  - %a: ipaddress
##
## Value: URL
auth.http.super_req = http://10.95.177.137:8899/scc/mqtt/superuser
## Value: post | get | put
auth.http.super_req.method = post
## Value: Params
auth.http.super_req.params = clientid=%c,username=%u

##--------------------------------------------------------------------
## ACL request.
##
## Variables:
##  - %A: 1 | 2, 1 = sub, 2 = pub
##  - %u: username
##  - %c: clientid
##  - %a: ipaddress
##  - %t: topic
##
## Value: URL
auth.http.acl_req = http://10.95.177.137:8899/scc/mqtt/acl
## Value: post | get | put
auth.http.acl_req.method = get
## Value: Params
auth.http.acl_req.params = access=%A,username=%u,clientid=%c,ipaddr=%a,topic=%t

 

這裏,很是須要值得注意的是,這個http(包括其餘的,例如mysql)的auth以及acl控制,都是基於插件的邏輯實現的,即依賴其餘服務進行實現,基於這個服務系統的返回值,EMQ決定auth以及acl的控制。這個理解清楚了,全部的插件相關的auth和acl都好理解了。mysql

我這裏,將auth和acl的服務實如今一個springboot的web項目scc下了,爲了驗證邏輯,我將真實的auth或者acl控制邏輯都簡化了,主要驗證流程。web

 

a) 啓動auth、acl的服務sccredis

package com.taikang.iot.scc.loadbalance.user.controller;

import org.apache.log4j.Logger;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;

import javax.servlet.http.HttpServletResponse;

/**
 * @Author: chengsh05
 * @Date: 2019/4/9 19:40
 */
@Controller
@RequestMapping("/mqtt")
public class EmqAuthHttpController {

    private Logger logger = Logger.getLogger(EmqAuthHttpController.class);

    @RequestMapping("/auth")
    public void mqttAuth(String clientid, String username, String password, HttpServletResponse response) {
        //auth.http.auth_req.params = clientid=%c,username=%u,password=%P
        logger.info("普通用戶;clientid:" + clientid + ";username:" + username + ";password:" + password);
        /**
         * TODO 添加認證的邏輯,控制http的返回碼, 這裏的用戶是否存在,一般是基於數據庫作的。
         * HTTP 認證/鑑權 API
         * 認證/ACL 成功,API 返回200
         * 認證/ACL 失敗,API 返回4xx
         */ response.setStatus(401);
    }

    @RequestMapping("/superuser")
    public void mqttSuperuser(String clientid, String username, HttpServletResponse response) {
        //auth.http.super_req.params = clientid=%c,username=%u
        logger.info("超級用戶;clientid:" + clientid + ";username:" + username);
        response.setStatus(401);
    }

    @RequestMapping("/acl")
    public void mqttAcl(String access, String username, String clientid, String ipaddr, String topic, HttpServletResponse response) {
        //auth.http.acl_req.params = access=%A,username=%u,clientid=%c,ipaddr=%a,topic=%t
        logger.info("access: " + access + ";username: " + username + ";clientid: " + clientid + "; ipaddr: " + ipaddr + ";topic: " + topic);
        response.setStatus(401);
    }
}

 

b) 首先啓動emq服務端spring

固然要emqttd_ctl plugins load emq_auth_http這個插件(服務節點 10.95.200.12).sql

[tkiot@tkwh-kfcs-app2 plugins]$ emqttd_ctl plugins list
Plugin(emq_auth_clientid, version=2.3.11, description=Authentication with ClientId/Password, active=false)
Plugin(emq_auth_http, version=2.3.11, description=Authentication/ACL with HTTP API, active=true)
Plugin(emq_auth_jwt, version=2.3.11, description=Authentication with JWT, active=false)
Plugin(emq_auth_ldap, version=2.3.11, description=Authentication/ACL with LDAP, active=false)
Plugin(emq_auth_mongo, version=2.3.11, description=Authentication/ACL with MongoDB, active=false)
Plugin(emq_auth_mysql, version=2.3.11, description=Authentication/ACL with MySQL, active=false)
Plugin(emq_auth_pgsql, version=2.3.11, description=Authentication/ACL with PostgreSQL, active=false)
Plugin(emq_auth_redis, version=2.3.11, description=Authentication/ACL with Redis, active=false)
Plugin(emq_auth_username, version=2.3.11, description=Authentication with Username/Password, active=false)
Plugin(emq_coap, version=2.3.11, description=CoAP Gateway, active=false)
Plugin(emq_dashboard, version=2.3.11, description=EMQ Web Dashboard, active=true)
Plugin(emq_lua_hook, version=2.3.11, description=EMQ Hooks in lua, active=false)
Plugin(emq_modules, version=2.3.11, description=EMQ Modules, active=true)
Plugin(emq_plugin_template, version=2.3.11, description=EMQ Plugin Template, active=false)
Plugin(emq_recon, version=2.3.11, description=Recon Plugin, active=true)
Plugin(emq_reloader, version=2.3.11, description=Reloader Plugin, active=false)
Plugin(emq_retainer, version=2.3.11, description=EMQ Retainer, active=true)
Plugin(emq_sn, version=2.3.11, description=MQTT-SN Gateway, active=false)
Plugin(emq_stomp, version=2.3.11, description=Stomp Protocol Plugin, active=false)
Plugin(emq_web_hook, version=2.3.11, description=EMQ Webhook Plugin, active=false)

 

c) 而後啓動一個基於mqtt的客戶端數據庫

我這裏是用基於paho的一個消費者(subscriber)。apache

package com.taikang.iot.rulee.security;

import com.taikang.iot.rulee.paho.PushCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import javax.net.ssl.SSLSocketFactory;
import java.util.concurrent.ScheduledExecutorService;

public class MQTTSSLConsumer {
//    public static final String HOST = "tcp://127.0.0.1:61613";
//    public static final String TOPIC1 = "pos_message_all";
//    private static final String clientid = "client11";
//    public static final String HOST = "tcp://10.95.197.1:1883";
    public static final String HOST = "ssl://10.95.200.12:8883";
    public static final String TOPIC1 = "taikang/rulee";
    private static final String clientid = "client11";
    private MqttClient client;
    private MqttConnectOptions options;
    private String userName = "water";    //非必須
    private String passWord = "water";  //非必須
    @SuppressWarnings("unused")
    private ScheduledExecutorService scheduler;
    private String sslPemPath = "E:\\2018\\IOT\\MQTT\\javassl\\java\\";

    private void start() {
        try {
            // host爲主機名,clientid即鏈接MQTT的客戶端ID,通常以惟一標識符表示,MemoryPersistence設置clientid的保存形式,默認爲之內存保存
            client = new MqttClient(HOST, clientid, new MemoryPersistence());
            // MQTT的鏈接設置
            options = new MqttConnectOptions();
            //-----------SSL begin--------------
            SSLSocketFactory factory = OpensslHelper.getSSLSocktet(sslPemPath + "sccCA0.crt",sslPemPath +"sccDevSMP.crt",sslPemPath + "sccDevSMP.key","shihuc");
            options.setSocketFactory(factory);
            //-----------end of SSL ------------
            // 設置是否清空session,這裏若是設置爲false表示服務器會保留客戶端的鏈接記錄,設置爲true表示每次鏈接到服務器都以新的身份鏈接
            options.setCleanSession(false);
            // 設置鏈接的用戶名
            options.setUserName(userName);
            // 設置鏈接的密碼
            options.setPassword(passWord.toCharArray());
            // 設置超時時間 單位爲秒
            options.setConnectionTimeout(10);
            // 設置會話心跳時間 單位爲秒 服務器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並無重連的機制
            options.setKeepAliveInterval(20);
            // 設置重連機制
            options.setAutomaticReconnect(true);
            // 設置回調
            client.setCallback(new PushCallback());
            MqttTopic topic = client.getTopic(TOPIC1);
            //setWill方法,若是項目中須要知道客戶端是否掉線能夠調用該方法。設置最終端口的通知消息
            //options.setWill(topic, "close".getBytes(), 2, true);//遺囑
            client.connect(options);
            //訂閱消息
            int[] Qos  = {1};
            String[] topic1 = {TOPIC1};
            client.subscribe(topic1, Qos);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws MqttException {
        System.setProperty("javax.net.debug", "ssl,handshake");
        MQTTSSLConsumer client = new MQTTSSLConsumer();
        client.start();
    }
}

其實,這裏用什麼方式不是很重要,能夠是paho的客戶端,也能夠是mqtt.fx工具(參照我以前的博文MQTT研究之EMQ:【SSL雙向驗證】springboot

 

 d) 結果分析

按照上述的代碼進行測試,會發現,c)步驟的代碼會遇到錯誤,代表客戶端訂閱接入的時候鑑權不經過。

。。。。。。
verify_data:  { 58, 47, 6, 13, 206, 237, 24, 135, 49, 56, 87, 57 }
***
MQTT Con: client11, WRITE: TLSv1 Change Cipher Spec, length = 1
*** Finished
verify_data:  { 177, 80, 48, 65, 115, 27, 64, 57, 75, 119, 104, 26 }
***
MQTT Con: client11, WRITE: TLSv1 Handshake, length = 48
MQTT Con: client11, setSoTimeout(0) called
MQTT Snd: client11, WRITE: TLSv1 Application Data, length = 64
MQTT Rec: client11, READ: TLSv1 Application Data, length = 32
MQTT Rec: client11, READ: TLSv1 Application Data, length = 32
錯誤的用戶名或密碼 (4)
    at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:28)
    at org.eclipse.paho.client.mqttv3.internal.ClientState.notifyReceivedAck(ClientState.java:988)
    at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:145)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

 

將a)中的response.setStatus(401);代碼調整爲response.setStatus(200);再次運行c)的客戶端代碼,認證經過,代碼執行到acl權限控制。

scc服務的輸出日誌(代表emq執行了auth接口,200的返回值代表成功,而後校驗是否超級用戶,最後acl校驗,由於acl的返回值是4xx,emq認爲acl失敗):

2019-04-09 20:45:05.479  INFO 5644 --- [nio-8899-exec-1] c.t.i.s.l.u.c.EmqAuthHttpController      : 普通用戶;clientid:client11;username:water;password:water
2019-04-09 20:45:05.510  INFO 5644 --- [nio-8899-exec-2] c.t.i.s.l.u.c.EmqAuthHttpController      : 超級用戶;clientid:client11;username:water
2019-04-09 20:45:10.362  INFO 5644 --- [nio-8899-exec-3] c.t.i.s.l.u.c.EmqAuthHttpController      : access: 1;username: water;clientid: client11; ipaddr: 10.95.177.137;topic: taikang/rulee

subscriber的客戶端日誌(acl鑑權時,emq調用scc的服務,獲得401,認爲訂閱者沒有操控權限,因此報錯):

MQTT Snd: client11, WRITE: TLSv1 Application Data, length = 32
MQTT Snd: client11, WRITE: TLSv1 Application Data, length = 48
MqttException (128)
    at org.eclipse.paho.client.mqttv3.MqttClient.subscribe(MqttClient.java:438)
    at com.taikang.iot.rulee.security.MQTTSSLConsumer.start(MQTTSSLConsumer.java:60)
    at com.taikang.iot.rulee.security.MQTTSSLConsumer.main(MQTTSSLConsumer.java:70)
MQTT Rec: client11, READ: TLSv1 Application Data, length = 32

 

補充驗證:

1. 超級用戶返回爲200,看看acl的邏輯

package com.taikang.iot.scc.loadbalance.user.controller;

import org.apache.log4j.Logger;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;

import javax.servlet.http.HttpServletResponse;

/**
 * @Author: chengsh05
 * @Date: 2019/4/9 19:40
 */
@Controller
@RequestMapping("/mqtt")
public class EmqAuthHttpController {

    private Logger logger = Logger.getLogger(EmqAuthHttpController.class);

    @RequestMapping("/auth")
    public void mqttAuth(String clientid, String username, String password, HttpServletResponse response) {
        //auth.http.auth_req.params = clientid=%c,username=%u,password=%P
        logger.info("普通用戶;clientid:" + clientid + ";username:" + username + ";password:" + password);
        /**
         * TODO 添加認證的邏輯,控制http的返回碼, 這裏的用戶是否存在,一般是基於數據庫作的。
         * HTTP 認證/鑑權 API
         * 認證/ACL 成功,API 返回200
         * 認證/ACL 失敗,API 返回4xx
         */ response.setStatus(200);
    }

    @RequestMapping("/superuser")
    public void mqttSuperuser(String clientid, String username, HttpServletResponse response) {
        //auth.http.super_req.params = clientid=%c,username=%u
        logger.info("超級用戶;clientid:" + clientid + ";username:" + username);
        response.setStatus(200);
    }

    @RequestMapping("/acl")
    public void mqttAcl(String access, String username, String clientid, String ipaddr, String topic, HttpServletResponse response) {
        //auth.http.acl_req.params = access=%A,username=%u,clientid=%c,ipaddr=%a,topic=%t
        logger.info("access: " + access + ";username: " + username + ";clientid: " + clientid + "; ipaddr: " + ipaddr + ";topic: " + topic);
        response.setStatus(401);
    }
}

而後,再次啓動subscriber程序,看看scc服務的日誌輸出。獲得下面的結果:

2019-04-09 20:50:52.816  INFO 8412 --- [nio-8899-exec-1] c.t.i.s.l.u.c.EmqAuthHttpController      : 普通用戶;clientid:client11;username:water;password:water
2019-04-09 20:50:52.832  INFO 8412 --- [nio-8899-exec-1] c.t.i.s.l.u.c.EmqAuthHttpController      : 超級用戶;clientid:client11;username:water

 

發現什麼沒有呢?和上面的驗證(auth接口返回200,superuser接口返回401,acl返回401)對比,很顯然的發現,當superuser接口返回200後,acl接口再也不調用,無條件認爲acl是經過的,即爲有權限。 普通用戶經過auth後,須要校驗acl

 

2. 在emq_auth_http已加載的基礎上再加載emq_auth_mysql

[tkiot@tkwh-kfcs-app2 log]$ emqttd_ctl plugins list
Plugin(emq_auth_clientid, version=2.3.11, description=Authentication with ClientId/Password, active=false)
Plugin(emq_auth_http, version=2.3.11, description=Authentication/ACL with HTTP API, active=true)
Plugin(emq_auth_jwt, version=2.3.11, description=Authentication with JWT, active=false)
Plugin(emq_auth_ldap, version=2.3.11, description=Authentication/ACL with LDAP, active=false)
Plugin(emq_auth_mongo, version=2.3.11, description=Authentication/ACL with MongoDB, active=false)
Plugin(emq_auth_mysql, version=2.3.11, description=Authentication/ACL with MySQL, active=true)
Plugin(emq_auth_pgsql, version=2.3.11, description=Authentication/ACL with PostgreSQL, active=false)
Plugin(emq_auth_redis, version=2.3.11, description=Authentication/ACL with Redis, active=false)
Plugin(emq_auth_username, version=2.3.11, description=Authentication with Username/Password, active=false)
Plugin(emq_coap, version=2.3.11, description=CoAP Gateway, active=false)
Plugin(emq_dashboard, version=2.3.11, description=EMQ Web Dashboard, active=true)
Plugin(emq_lua_hook, version=2.3.11, description=EMQ Hooks in lua, active=false)
Plugin(emq_modules, version=2.3.11, description=EMQ Modules, active=true)
Plugin(emq_plugin_template, version=2.3.11, description=EMQ Plugin Template, active=false)
Plugin(emq_recon, version=2.3.11, description=Recon Plugin, active=true)
Plugin(emq_reloader, version=2.3.11, description=Reloader Plugin, active=false)
Plugin(emq_retainer, version=2.3.11, description=EMQ Retainer, active=true)
Plugin(emq_sn, version=2.3.11, description=MQTT-SN Gateway, active=false)
Plugin(emq_stomp, version=2.3.11, description=Stomp Protocol Plugin, active=false)
Plugin(emq_web_hook, version=2.3.11, description=EMQ Webhook Plugin, active=false)

@@@###》》》1) subscriber程序中的username和password配置成mysql數據庫中已經存在的,會發現,http插件認證和acl服務將不會被調用。

@@@###》》》 2)若將subscriber程序中的username和password配置成mysql數據庫中不存在的,會發現,http插件認證和acl服務將會被調用。

 

可否說明,若MySQL/Redis等基礎服務認證和acl控制器和Http認證/ACL控制器同時配置的時候,EMQ優先查詢MySQL/Redis服務???

實驗了MySQL,是這個現象,不知其餘是否如個人猜想,沒有在EMQ的官方文檔看到這個說明。

相關文章
相關標籤/搜索