從moquette源碼看IOT接入協議MQTT的實現

背景

閱讀優秀的代碼是一種享受,將優秀的代碼用本身的世界觀優秀地描述出來就十分痛苦了是要死一億個腦細胞的。html

這篇源碼閱讀筆記早在一年前就有了當時只是簡單的記錄一下本身的總結,最近將她從新整理一下但願能幫助有須要的人。node

隨着移動互聯網快速進入後半場,愈來愈多的企業將注意力轉移到物聯網。好比共享單車和小米的智能家居產品等都是典型的物聯網應用。算法

企業相信藉助於大數據和AI技術能夠得到不少額外的價值產生新的商業模式。海量數據須要經過接入服務才能流向後端產生後續價值,在接入服務中MQTT已成爲物聯網中非明確的標準協議國內外雲廠均有其broker實現。後端

特性

MQTT協議是爲大量計算能力有限,且工做在低帶寬、不可靠的網絡的遠程傳感器和控制設備通信而設計的協議,它具備如下主要的幾項特性:bash

  1. 使用發佈/訂閱消息模式,提供一對多的消息發佈,解除應用程序耦合
  2. 對負載內容屏蔽的消息傳輸
  3. 使用 TCP/IP 提供網絡鏈接
  4. 有三種消息發佈服務質量
    • 「至多一次」,消息發佈徹底依賴底層 TCP/IP 網絡。會發生消息丟失或重複。這一級別可用於以下狀況,環境傳感器數據,丟失一次讀記錄無所謂,由於不久後還會有第二次發送。
    • 「至少一次」,確保消息到達,但消息重複可能會發生。
    • 「只有一次」,確保消息到達一次。這一級別可用於以下狀況,在計費系統中,消息重複或丟失會致使不正確的結果。
  5. 小型傳輸,開銷很小(固定長度的頭部是2字節),協議交換最小化,以下降網絡流量
  6. 使用 Last Will (遺囑)和 Testament 特性通知有關各方客戶端異常中斷的機制

==下文中會對上述特性的實現方式進行講解==服務器

術語

image

客戶端Client

使用MQTT的程序或者設備,如環境監控傳感器、共享單車、共享充電寶等。網絡

服務端Server

一個程序或設備,做爲發送消息的客戶端和請求訂閱的客戶端之間的中介。session

發佈、訂閱流程

客戶端-A 給 客戶端-B 發送消息「hello」流程以下:併發

  1. 客戶端-B訂閱名稱爲msg的主題
  2. 客戶端-A向服務端-Server發送「hello」,並指明發送給名爲msg的主題
  3. 服務端-Server向客戶端-B轉發消息「hello」

有別於HTTP協議的請求響應模式,客戶端-A與客戶端-B不發生直接鏈接關係,他們之間的消息傳遞經過服務端Server進行轉發。 服務端Server又稱 MQTT Broker 即訂閱和發送的中間人app

基於moquette源碼的特性實現分析

在上述的客戶端-A 給 客戶端-B 發送消息「hello」流程中須要有以下動做。

  1. 客戶端-A 、 客戶端-B 鏈接到服務端Server
  2. 客戶端-B 訂閱主題
  3. 客戶端-A 發佈消息
  4. 服務端Server 轉發消息
  5. 客戶端-B 收到消息

下面將基於鏈接、訂閱、發佈這幾個動做進行源碼跟蹤解讀。

鏈接

image

基本概念:

Session:會話即客戶端(由ClientId做爲標示)和服務端之間邏輯層面的通訊;生命週期(存在時間):會話 >= 網絡鏈接。

ClientID:客戶端惟一標識,服務端用於關聯一個Session 只能包含這些 大寫字母,小寫字母 和 數字(0-9a-zA-Z),23個字符之內 若是 ClientID 在屢次 TCP鏈接中保持一致,客戶端和服務器端會保留會話信息(Session) 同一時間內 Server 和同一個 ClientID 只能保持一個 TCP 鏈接,再次鏈接會踢掉前一個。

CleanSession:在Connect時,由客戶端設置

  • 0 開啓會話重用機制。網絡斷開重連後,恢復以前的Session信息。須要客戶端和服務器有相關Session持久化機制;
  • 1 關閉會話重用機制。每次Connect都是一個新Session,會話僅持續和網絡鏈接一樣長的時間。

Keep Alive:目的是保持長鏈接的可靠性,以及雙方對彼此是否在線的確認。 客戶端在Connect的時候設置 Keep Alive 時長。若是服務端在 1.5 * KeepAlive 時間內沒有收到客戶端的報文,它必須斷開客戶端的網絡鏈接 Keep Alive 的值由具體應用指定,通常是幾分鐘。容許的最大值是 18 小時 12 分 15 秒。

Will:遺囑消息(Will Message)存儲在服務端,當網絡鏈接關閉時,服務端必須發佈這個遺囑消息,因此被形象地稱之爲遺囑,可用於通知異常斷線。 客戶端發送 DISCONNECT 關閉連接,遺囑失效並刪除 遺囑消息發佈的條件,包括: 服務端檢測到了一個 I/O 錯誤或者網絡故障 客戶端在保持鏈接(Keep Alive)的時間內未能通信 客戶端沒有先發送 DISCONNECT 報文直接關閉了網絡鏈接 因爲協議錯誤服務端關閉了網絡鏈接 相關設置項,須要在Connect時,由客戶端指定。

Will Flag :遺囑的總開關

  • 0 關閉遺囑功能,Will QoS 和 Will Retain 必須爲 0
  • 1 開啓遺囑功能,須要設置 Will Retain 和 Will QoS

Will QoS: 遺囑消息 QoS可取值 0、一、2,含義與消息QoS相同

Will Retain:遺囑是否保留

  • 0 遺囑消息不保留,後面再訂閱不會收到消息
  • 1 遺囑消息保留,持久存儲

Will Topic:遺囑話題

Will Payload:遺囑消息內容

鏈接流程

  1. 判斷客戶端鏈接時發送的MQTT協議版本號,非3.1和3.1.1版本發送協議不支持響應報文並在發送完成後關閉鏈接
  2. 在客戶端配置了cleanSession=false 或者服務端不容許clientId不存在的狀況下客戶端若是未上傳clientId發送協議不支持響應報文並在發送完成後關閉鏈接
  3. 判斷用戶名和密碼是否合法
  4. 初始化鏈接對象並將鏈接對象引用放入鏈接管理中,若是發現鏈接管理中存在相同客戶端ID的對象則關閉前一個鏈接並將新的鏈接對象放入鏈接管理中
  5. 根據客戶端上傳的心跳時間調整服務端當前鏈接的心跳判斷時間(keepAlive * 1.5f)
  6. 遺囑消息存儲(當鏈接意外斷開時向存儲的主題發佈消息)
  7. 發送鏈接成功響應
  8. 建立當前鏈接session
  9. 當cleanSession=false 發送當前session已經存儲的消息
public void processConnect(Channel channel, MqttConnectMessage msg) {
        MqttConnectPayload payload = msg.payload();
        String clientId = payload.clientIdentifier();
        LOG.debug("Processing CONNECT message. CId={}, username={}", clientId, payload.userName());

        // 1. 判斷客戶端鏈接時發送的MQTT協議版本號,非3.1和3.1.1版本發送協議不支持響應報文並在發送完成後關閉鏈接
        if (msg.variableHeader().version() != MqttVersion.MQTT_3_1.protocolLevel()
                && msg.variableHeader().version() != MqttVersion.MQTT_3_1_1.protocolLevel()) {
            MqttConnAckMessage badProto = connAck(CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION);

            LOG.error("MQTT protocol version is not valid. CId={}", clientId);
            channel.writeAndFlush(badProto).addListener(FIRE_EXCEPTION_ON_FAILURE);
            channel.close().addListener(CLOSE_ON_FAILURE);
            return;
        }

        final boolean cleanSession = msg.variableHeader().isCleanSession();
        if (clientId == null || clientId.length() == 0) {
            // 2. 在客戶端配置了cleanSession=false 或者服務端不容許clientId不存在的狀況下客戶端若是未上傳clientId發送協議不支持響應報文並在發送完成後關閉鏈接
            if (!cleanSession || !this.allowZeroByteClientId) {
                MqttConnAckMessage badId = connAck(CONNECTION_REFUSED_IDENTIFIER_REJECTED);

                channel.writeAndFlush(badId).addListener(FIRE_EXCEPTION_ON_FAILURE);
                channel.close().addListener(CLOSE_ON_FAILURE);
                LOG.error("The MQTT client ID cannot be empty. Username={}", payload.userName());
                return;
            }

            // Generating client id.
            clientId = UUID.randomUUID().toString().replace("-", "");
            LOG.info("Client has connected with a server generated identifier. CId={}, username={}", clientId,
                payload.userName());
        }
        // 3. 判斷用戶名和密碼是否合法
        if (!login(channel, msg, clientId)) {
            channel.close().addListener(CLOSE_ON_FAILURE);
            return;
        }

        // 4. 初始化鏈接對象並將鏈接對象引用放入鏈接管理中,若是發現鏈接管理中存在相同客戶端ID的對象則關閉前一個鏈接並將新的鏈接對象放入鏈接管理中
        ConnectionDescriptor descriptor = new ConnectionDescriptor(clientId, channel, cleanSession);
        final ConnectionDescriptor existing = this.connectionDescriptors.addConnection(descriptor);
        if (existing != null) {
            LOG.info("Client ID is being used in an existing connection, force to be closed. CId={}", clientId);
            existing.abort();
            //return;
            this.connectionDescriptors.removeConnection(existing);
            this.connectionDescriptors.addConnection(descriptor);
        }

        // 5. 根據客戶端上傳的心跳時間調整服務端當前鏈接的心跳判斷時間(keepAlive * 1.5f)
        initializeKeepAliveTimeout(channel, msg, clientId);
        // 6. 遺囑消息存儲(當鏈接意外斷開時向存儲的主題發佈消息)
        storeWillMessage(msg, clientId);
        // 7. 發送鏈接成功響應
        if (!sendAck(descriptor, msg, clientId)) {
            channel.close().addListener(CLOSE_ON_FAILURE);
            return;
        }

        m_interceptor.notifyClientConnected(msg);

        if (!descriptor.assignState(SENDACK, SESSION_CREATED)) {
            channel.close().addListener(CLOSE_ON_FAILURE);
            return;
        }
        // 8. 建立當前鏈接session
        final ClientSession clientSession = this.sessionsRepository.createOrLoadClientSession(clientId, cleanSession);
        // 9. 當cleanSession=false 發送當前session已經存儲的消息
        if (!republish(descriptor, msg, clientSession)) {
            channel.close().addListener(CLOSE_ON_FAILURE);
            return;
        }
        
        int flushIntervalMs = 500/* (keepAlive * 1000) / 2 */;
        setupAutoFlusher(channel, flushIntervalMs);

        final boolean success = descriptor.assignState(MESSAGES_REPUBLISHED, ESTABLISHED);
        if (!success) {
            channel.close().addListener(CLOSE_ON_FAILURE);
        }

        LOG.info("Connected client <{}> with login <{}>", clientId, payload.userName());
    }
複製代碼

訂閱

image

基本概念

訂閱流程

  1. 訂閱的主題校驗(權限、主題path合法性)
  2. 在當前session中存儲訂閱的主題
  3. 採用全局tree結構存儲訂閱信息(主題和訂閱者信息),用於消息轉發時根據主題查找到對應的訂閱者(tree結構和查找算法下一章節中介紹
  4. 發送訂閱迴應
  5. 掃描持久化的消息匹配到當前訂閱主題的當即向此鏈接發送消息
public void processSubscribe(Channel channel, MqttSubscribeMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        int messageID = messageId(msg);
        LOG.debug("Processing SUBSCRIBE message. CId={}, messageId={}", clientID, messageID);

        RunningSubscription executionKey = new RunningSubscription(clientID, messageID);
        SubscriptionState currentStatus = subscriptionInCourse.putIfAbsent(executionKey, SubscriptionState.VERIFIED);
        if (currentStatus != null) {
            LOG.warn("Client sent another SUBSCRIBE message while this one was being processed CId={}, messageId={}",
                clientID, messageID);
            return;
        }
        String username = NettyUtils.userName(channel);
        // 一、訂閱的主題校驗(權限、主題path合法性)
        List<MqttTopicSubscription> ackTopics = doVerify(clientID, username, msg);
        MqttSubAckMessage ackMessage = doAckMessageFromValidateFilters(ackTopics, messageID);
        if (!this.subscriptionInCourse.replace(executionKey, SubscriptionState.VERIFIED, SubscriptionState.STORED)) {
            LOG.warn("Client sent another SUBSCRIBE message while the topic filters were being verified CId={}, " +
                "messageId={}", clientID, messageID);
            return;
        }

        LOG.debug("Creating and storing subscriptions CId={}, messageId={}, topics={}", clientID, messageID, ackTopics);
        // 二、在當前session中存儲訂閱的主題
        List<Subscription> newSubscriptions = doStoreSubscription(ackTopics, clientID);

        // save session, persist subscriptions from session
        // 三、採用全局tree結構存儲訂閱信息(主題和訂閱者信息),用於消息轉發時根據主題查找到對應的訂閱者
        for (Subscription subscription : newSubscriptions) {
            subscriptions.add(subscription);
        }

        LOG.debug("Sending SUBACK response CId={}, messageId={}", clientID, messageID);
        // 四、發送訂閱迴應
        channel.writeAndFlush(ackMessage).addListener(FIRE_EXCEPTION_ON_FAILURE);

        // fire the persisted messages in session
        // 五、掃描持久化的消息匹配到當前訂閱主題的當即向此鏈接發送消息
        for (Subscription subscription : newSubscriptions) {
            publishRetainedMessagesInSession(subscription, username);
        }

        boolean success = this.subscriptionInCourse.remove(executionKey, SubscriptionState.STORED);
        if (!success) {
            LOG.warn("Unable to perform the final subscription state update CId={}, messageId={}", clientID, messageID);
        } else {
            LOG.info("Client <{}> subscribed to topics", clientID);
        }
    }
複製代碼

發佈

基本概念

Packet Identifier:報文標識存在報文的可變報頭部分,非零兩個字節整數 (0-65535]。

一個流程中重複:這些報文包含PacketID,並且在一次通訊流程內保持一致:PUBLISH(QoS>0時)、PUBACK、PUBREC、PUBREL、PUBCOMP、SUBSCRIBE、SUBACK、UNSUBSCIBE、UNSUBACK 。

新的不重複:客戶端每次發送一個新的這些類型的報文時都必須分配一個當前 未使用的PacketID 當客戶端處理完這個報文對應的確認後,這個報文標識符就釋放可重用。

獨立維護:客戶端和服務端彼此獨立地分配報文標識符。所以,客戶端服務端組合使用相同的報文標識符能夠實現併發的消息交換。客戶端和服務端產生的Packet Identifier一致不算異常。

Payload: 有效載荷即消息體最大容許 256MB。 Publish 的 Payload 容許爲空,在不少場合下表明將持久消息(或者遺囑消息)清空。採用UTF-8編碼。

Retain:持久消息(粘性消息)

RETAIN 標記:每一個Publish消息都須要指定的標記

  • 0 服務端不能存儲這個消息,也不能移除或替換任何 現存的保留消息
  • 1 服務端必須存儲這個應用消息和它的QoS等級,以便它能夠被分發給將來的訂閱者

每一個Topic只會保留最多一個 Retain 持久消息 客戶端訂閱帶有持久消息的Topic,會當即受到這條消息。

服務器能夠選擇丟棄持久消息,好比內存或者存儲吃緊的時候。

若是客戶端想要刪除某個Topic 上面的持久消息,能夠向這個Topic發送一個Payload爲空的持久消息 遺囑消息(Will)的Retain持久機制同理。

QoS :服務等級(消息可靠性)

發佈流程

public void processPublish(Channel channel, MqttPublishMessage msg) {
        final MqttQoS qos = msg.fixedHeader().qosLevel();
        final String clientId = NettyUtils.clientID(channel);
        LOG.info("Processing PUBLISH message. CId={}, topic={}, messageId={}, qos={}", clientId,
                msg.variableHeader().topicName(), msg.variableHeader().messageId(), qos);
        switch (qos) {
            case AT_MOST_ONCE:
                this.qos0PublishHandler.receivedPublishQos0(channel, msg);
                break;
            case AT_LEAST_ONCE:
                this.qos1PublishHandler.receivedPublishQos1(channel, msg);
                break;
            case EXACTLY_ONCE:
                this.qos2PublishHandler.receivedPublishQos2(channel, msg);
                break;
            default:
                LOG.error("Unknown QoS-Type:{}", qos);
                break;
        }
    }
複製代碼

從上述代碼的switch語句中能夠看出會根據消息的Qos級別分別進行處理

QoS0 最多一次
sequenceDiagram
ClientA->>ServerBroker: 發送消息
ServerBroker->>ClientB: 發送消息
複製代碼
  1. 權限判斷
  2. 向全部該主題的訂閱者發佈消息
  3. QoS == 0 && retain => clean old retained
void receivedPublishQos0(Channel channel, MqttPublishMessage msg) {
        // verify if topic can be write
        final Topic topic = new Topic(msg.variableHeader().topicName());
        String clientID = NettyUtils.clientID(channel);
        String username = NettyUtils.userName(channel);
        // 1. 權限判斷
        if (!m_authorizator.canWrite(topic, username, clientID)) {
            LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic);
            return;
        }
        // route message to subscribers
        IMessagesStore.StoredMessage toStoreMsg = asStoredMessage(msg);
        toStoreMsg.setClientID(clientID);
        // 2. 向全部該主題的訂閱者發佈消息
        this.publisher.publish2Subscribers(toStoreMsg, topic);

        if (msg.fixedHeader().isRetain()) {
            // 3. QoS == 0 && retain => clean old retained
            m_messagesStore.cleanRetained(topic);
        }

        m_interceptor.notifyTopicPublished(msg, clientID, username);
    }
複製代碼
QoS1 至少一次
sequenceDiagram
ClientA->>ServerBroker: 1.發送消息PUBLISH
ServerBroker->>ServerBroker: 1.1存儲消息
ServerBroker->>ClientA: 1.2發送消息迴應PUBACK
ServerBroker->>ClientB: 2.發送消息
ClientB->>ServerBroker: 2.1發送消息迴應PUBACK
ServerBroker->>ServerBroker: 2.2刪除消息
複製代碼

1.發送消息PUBLISH

  1. 權限判斷
  2. 向全部該主題的訂閱者發佈消息(每一個session中存儲即將要發送的消息)
  3. 發送Ack迴應
  4. retain = true => 存儲消息
void receivedPublishQos1(Channel channel, MqttPublishMessage msg) {
        // verify if topic can be write
        final Topic topic = new Topic(msg.variableHeader().topicName());
        topic.getTokens();
        if (!topic.isValid()) {
            LOG.warn("Invalid topic format, force close the connection");
            channel.close().addListener(CLOSE_ON_FAILURE);
            return;
        }
        String clientID = NettyUtils.clientID(channel);
        String username = NettyUtils.userName(channel);
        // 1. 權限判斷
        if (!m_authorizator.canWrite(topic, username, clientID)) {
            LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic);
            return;
        }

        final int messageID = msg.variableHeader().messageId();

        // route message to subscribers
        IMessagesStore.StoredMessage toStoreMsg = asStoredMessage(msg);
        toStoreMsg.setClientID(clientID);

        // 2. 向全部該主題的訂閱者發佈消息(每一個session中存儲即將要發送的消息)
        this.publisher.publish2Subscribers(toStoreMsg, topic, messageID);

        // 3. 發送Ack迴應
        sendPubAck(clientID, messageID);

        // 4. retain = true => 存儲消息
        if (msg.fixedHeader().isRetain()) {
            if (!msg.payload().isReadable()) {
                m_messagesStore.cleanRetained(topic);
            } else {
                // before wasn't stored m_messagesStore.storeRetained(topic, toStoreMsg); } } m_interceptor.notifyTopicPublished(msg, clientID, username); } 複製代碼

2.1發送消息迴應PUBACK

服務端Server接收到PUBACK消息後將執行:

  1. 刪除存儲在session中的消息
public void processPubAck(Channel channel, MqttPubAckMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        int messageID = msg.variableHeader().messageId();
        String username = NettyUtils.userName(channel);
        LOG.trace("retrieving inflight for messageID <{}>", messageID);

        ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
        StoredMessage inflightMsg = targetSession.inFlightAcknowledged(messageID);

        String topic = inflightMsg.getTopic();
        InterceptAcknowledgedMessage wrapped = new InterceptAcknowledgedMessage(inflightMsg, topic, username,
                                                                                messageID);
        m_interceptor.notifyMessageAcknowledged(wrapped);
    }
複製代碼
QoS2 有且僅有一次
sequenceDiagram
ClientA->>ServerBroker: 1.發送消息PUBLISH
ServerBroker->>ServerBroker: 1.1存儲消息
ServerBroker->>ClientA: 1.2發送消息迴應Rec
ClientA->>ServerBroker: 2.發送消息Rel
ServerBroker->>ServerBroker: 2.1刪除消息
ServerBroker->>ServerBroker: 2.2存儲消息到發送列隊
ServerBroker->>ClientB: 2.3發送消息
ServerBroker->>ClientA: 2.4發送消息迴應Comp
ClientB->>ServerBroker: 3.發送消息迴應Rec
ServerBroker->>ServerBroker: 3.1刪除2.2中存儲的消息(一次確認)
ServerBroker->>ServerBroker: 3.2存儲消息
ServerBroker->>ClientB: 3.3發送消息Rel
ClientB->>ServerBroker: 3.4發送消息迴應Comp
ServerBroker->>ServerBroker: 3.5刪除消息(二次確認)
複製代碼

1.發送消息PUBLISH

  1. 權限判斷
  2. 存儲消息
  3. 發送Rec迴應
void receivedPublishQos2(Channel channel, MqttPublishMessage msg) {
        final Topic topic = new Topic(msg.variableHeader().topicName());
        // check if the topic can be wrote
        String clientID = NettyUtils.clientID(channel);
        String username = NettyUtils.userName(channel);
        // 1. 權限判斷
        if (!m_authorizator.canWrite(topic, username, clientID)) {
            LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic);
            return;
        }
        final int messageID = msg.variableHeader().messageId();

        // 2. 存儲消息
        IMessagesStore.StoredMessage toStoreMsg = asStoredMessage(msg);
        toStoreMsg.setClientID(clientID);

        LOG.info("Sending publish message to subscribers CId={}, topic={}, messageId={}", clientID, topic, messageID);
        if (LOG.isTraceEnabled()) {
            LOG.trace("payload={}, subs Tree={}", payload2Str(toStoreMsg.getPayload()), subscriptions.dumpTree());
        }

        this.sessionsRepository.sessionForClient(clientID).markAsInboundInflight(messageID, toStoreMsg);

        // 3. 發送Rec迴應
        sendPubRec(clientID, messageID);

        // Next the client will send us a pub rel
        // NB publish to subscribers for QoS 2 happen upon PUBREL from publisher

//        if (msg.fixedHeader().isRetain()) {
//            if (msg.payload().readableBytes() == 0) {
//                m_messagesStore.cleanRetained(topic);
//            } else {
//                m_messagesStore.storeRetained(topic, toStoreMsg);
//            }
//        }
        //TODO this should happen on PUB_REL, else we notify false positive
        m_interceptor.notifyTopicPublished(msg, clientID, username);
    }
複製代碼

2.發送消息Rel

  1. 刪除消息
  2. 轉發消息
  3. 發送Comp 迴應給客戶端-A
void processPubRel(Channel channel, MqttMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        int messageID = messageId(msg);
        LOG.info("Processing PUBREL message. CId={}, messageId={}", clientID, messageID);
        ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
        // 1. 刪除消息
        IMessagesStore.StoredMessage evt = targetSession.inboundInflight(messageID);
        if (evt == null) {
            LOG.warn("Can't find inbound inflight message for CId={}, messageId={}", clientID, messageID);
            throw new IllegalArgumentException("Can't find inbound inflight message");
        }
        final Topic topic = new Topic(evt.getTopic());

        // 2. 轉發消息
        this.publisher.publish2Subscribers(evt, topic, messageID);

        if (evt.isRetained()) {
            if (evt.getPayload().readableBytes() == 0) {
                m_messagesStore.cleanRetained(topic);
            } else {
                m_messagesStore.storeRetained(topic, evt);
            }
        }

        //TODO here we should notify to the listeners
        //m_interceptor.notifyTopicPublished(msg, clientID, username);
        // 3.發送Comp 迴應
        sendPubComp(clientID, messageID);
    }
複製代碼

3.發送消息迴應Rec

  1. 刪除消息
  2. 存儲消息(分別存儲在secondPhaseStore和outboundInflightMap)
  3. 發送PUBREL
public void processPubRec(Channel channel, MqttMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        int messageID = messageId(msg);
        LOG.debug("Processing PUBREC message. CId={}, messageId={}", clientID, messageID);
        ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
        // remove from the inflight and move to the QoS2 second phase queue
        // 1. 刪除消息
        StoredMessage ackedMsg = targetSession.inFlightAcknowledged(messageID);
        // 2. 存儲消息(分別存儲在secondPhaseStore和outboundInflightMap)
        targetSession.moveInFlightToSecondPhaseAckWaiting(messageID, ackedMsg);
        // once received a PUBREC reply with a PUBREL(messageID)
        LOG.debug("Processing PUBREC message. CId={}, messageId={}", clientID, messageID);
        // 3. 發送PUBREL
        MqttFixedHeader pubRelHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, AT_LEAST_ONCE, false, 0);
        MqttMessage pubRelMessage = new MqttMessage(pubRelHeader, from(messageID));
        channel.writeAndFlush(pubRelMessage).addListener(FIRE_EXCEPTION_ON_FAILURE);
    }
複製代碼

3.4發送消息迴應Comp

  1. 刪除消息
public void processPubComp(Channel channel, MqttMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        int messageID = messageId(msg);
        LOG.debug("Processing PUBCOMP message. CId={}, messageId={}", clientID, messageID);
        // once received the PUBCOMP then remove the message from the temp memory
        ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
        // 1. 刪除消息
        StoredMessage inflightMsg = targetSession.completeReleasedPublish(messageID);
        String username = NettyUtils.userName(channel);
        String topic = inflightMsg.getTopic();
        final InterceptAcknowledgedMessage interceptAckMsg = new InterceptAcknowledgedMessage(inflightMsg, topic,
            username, messageID);
        m_interceptor.notifyMessageAcknowledged(interceptAckMsg);
    }
複製代碼

Topic & Subcribe

基本概念

Topic 話題 和 TopicFilter 話題過濾器

Pub-Sub消息模型的核心機制 UTF-8 編碼字符串,不能超過 65535 字節。層級數量沒有限制 不能包含任何的下文中提到的特殊符號(/、+、#),必須至少包含一個字符
區分大小寫,能夠包含空格,不能包含空字符 (Unicode U+0000)
在收部或尾部增長 斜槓 「/」,會產生不一樣的Topic和TopicFilter。舉例:

  • 「/A」 和 「A」 是不一樣的
  • 「A」 和 「A/」 是不一樣的

只包含斜槓 「/」 的 Topic 或 TopicFilter 是合法的

TopicFilter中的特殊符號

層級分隔符 / 用於分割主題的每一個層級,爲主題名提供一個分層結構
主題層級分隔符能夠出如今 Topic 或 TopicFilter 的任何位置
特例:相鄰的主題層次分隔符表示一個零長度的主題層級

單層通配符 +

只能用於單個主題層級匹配的通配符。例如,「a/b/+」 匹配 「a/b/c1」 和 「a/b/c2」 ,可是不匹配 「a/b/c/d」
能夠匹配 任意層級,包括第一個和最後一個層級。

例如,「+」 是有效的,「sport/+/player1」 也是有效的。 能夠在多個層級中使用它,也能夠和多層通配符一塊兒使用。

例如,「+/tennis/#」 是有效的。只能匹配本級不能匹配上級。

例如,「sport/+」 不匹配 「sport」 可是卻匹配「sport/」,「/finance」 匹配 「+/+」 和 「/+」 ,可是不匹配 「+」。

多層通配符 #

用於匹配主題中任意層級的通配符 匹配包含自己的層級和子層級。

例如 「a/b/c/#" 能夠匹配 「a/b/c」、「a/b/c/d」 和 「a/b/c/d/e」 必須是最後的結尾。

例如 「sport/tennis/#/ranking」是無效的

「#」是有效的,會收到全部的應用消息。 (服務器端應將此類 TopicFilter禁掉 )

以$開頭的,服務器保留

服務端不能將 $ 字符開頭的 Topic 匹配通配符 (#或+) 開頭的 TopicFilter

服務端應該阻止客戶端使用這種 Topic 與其它客戶端交換消息。

服務端實現能夠將 $ 開頭的主題名用做其餘目的。

SYS/ 被普遍用做包含服務器特定信息或控制接口的主題的前綴
客戶端不特地訂閱開頭的 Topic,就不會收到對應的消息

  • 訂閱 「#」 的客戶端不會收到任何發佈到以 「$」 開頭主題的消息
  • 訂閱 「+/A/B」 的客戶端不會收到任何發佈到 「$SYS/A/B」 的消息
  • 訂閱 「SYS/#」 的客戶端會收到發佈到以 「SYS/」 開頭主題的消息
  • 訂閱 「SYS/A/+」 的客戶端會收到發佈到 「SYS/A/B」 主題的消息

若是客戶端想同時接受以 「SYS/」 開頭主題的消息和不以 開頭主題的消息,它須要同時 訂閱 「#」 和 「$SYS/#」

存儲結構

  • a/b/c
  • a/a
  • a/haha
  • msg

這4個主題會存儲成以下結構:

  1. children 指向下層節點
  2. subscriptions 存儲當前主題全部的訂閱者

image

查找算法

訂閱
@Override
    public void add(Subscription newSubscription) {
        Action res;
        do {
            res = insert(newSubscription.clientId, newSubscription.topicFilter, this.root, newSubscription.topicFilter);
        } while (res == Action.REPEAT);
    }

    private Action insert(String clientId, Topic topic, final INode inode, Topic fullpath) {
        Token token = topic.headToken();
        if (!topic.isEmpty() && inode.mainNode().anyChildrenMatch(token)) {
            Topic remainingTopic = topic.exceptHeadToken();
            INode nextInode = inode.mainNode().childOf(token);
            return insert(clientId, remainingTopic, nextInode, fullpath);
        } else {
            if (topic.isEmpty()) {
                return insertSubscription(clientId, fullpath, inode);
            } else {
                return createNodeAndInsertSubscription(clientId, topic, inode, fullpath);
            }
        }
    }
複製代碼
刪除訂閱
public void removeSubscription(Topic topic, String clientID) {
        Action res;
        do {
            res = remove(clientID, topic, this.root, NO_PARENT);
        } while (res == Action.REPEAT);
    }

    private Action remove(String clientId, Topic topic, INode inode, INode iParent) {
        Token token = topic.headToken();
        if (!topic.isEmpty() && (inode.mainNode().anyChildrenMatch(token))) {
            Topic remainingTopic = topic.exceptHeadToken();
            INode nextInode = inode.mainNode().childOf(token);
            return remove(clientId, remainingTopic, nextInode, inode);
        } else {
            final CNode cnode = inode.mainNode();
            if (cnode instanceof TNode) {
                // this inode is a tomb, has no clients and should be cleaned up
                // Because we implemented cleanTomb below, this should be rare, but possible
                // Consider calling cleanTomb here too
                return Action.OK;
            }
            if (cnode.containsOnly(clientId) && topic.isEmpty() && cnode.allChildren().isEmpty()) {
                // last client to leave this node, AND there are no downstream children, remove via TNode tomb
                if (inode == this.root) {
                    return inode.compareAndSet(cnode, inode.mainNode().copy()) ? Action.OK : Action.REPEAT;
                }
                TNode tnode = new TNode();
                return inode.compareAndSet(cnode, tnode) ? cleanTomb(inode, iParent) : Action.REPEAT;
            } else if (cnode.contains(clientId) && topic.isEmpty()) {
                CNode updatedCnode = cnode.copy();
                updatedCnode.removeSubscriptionsFor(clientId);
                return inode.compareAndSet(cnode, updatedCnode) ? Action.OK : Action.REPEAT;
            } else {
                //someone else already removed
                return Action.OK;
            }
        }
    }
複製代碼
查找
Set<Subscription> recursiveMatch(Topic topic, INode inode) {
        CNode cnode = inode.mainNode();
        if (Token.MULTI.equals(cnode.token)) {
            return cnode.subscriptions;
        }
        if (topic.isEmpty()) {
            return Collections.emptySet();
        }
        if (cnode instanceof TNode) {
            return Collections.emptySet();
        }
        final Token token = topic.headToken();
        if (!(Token.SINGLE.equals(cnode.token) || cnode.token.equals(token) || ROOT.equals(cnode.token))) {
            return Collections.emptySet();
        }
        Topic remainingTopic = (ROOT.equals(cnode.token)) ? topic : topic.exceptHeadToken();
        Set<Subscription> subscriptions = new HashSet<>();
        if (remainingTopic.isEmpty()) {
            subscriptions.addAll(cnode.subscriptions);
        }
        for (INode subInode : cnode.allChildren()) {
            subscriptions.addAll(recursiveMatch(remainingTopic, subInode));
        }
        return subscriptions;
    }
複製代碼

尾巴

相關參考

MQTT協議通俗講解

相關文章
相關標籤/搜索