物聯網協議之MQTT源碼分析(二)

此篇文章繼上一篇物聯網協議之MQTT源碼分析(一)而寫的第二篇MQTT發佈消息以及接收Broker消息的源碼分析,想看MQTT鏈接的小夥伴能夠去看我上一篇哦。bash

juejin.im/post/5cd66c…異步

MQTT發佈消息

MQTT發佈消息是由MqttAndroidClient類的publish函數執行的,咱們來看看這個函數:socket

// MqttAndroidClient類:
    @Override
    public IMqttDeliveryToken publish(String topic, byte[] payload, int qos,
                                      boolean retained, Object userContext,
                                      IMqttActionListener callback)
            throws MqttException, MqttPersistenceException {
        // 將消息內容、qos消息等級、retained消息是否保留封裝成MqttMessage
        MqttMessage message = new MqttMessage(payload);
        message.setQos(qos);
        message.setRetained(retained);
        // 每一條消息都有本身的token
        MqttDeliveryTokenAndroid token = new MqttDeliveryTokenAndroid(
                this, userContext, callback, message);
        String activityToken = storeToken(token);
        IMqttDeliveryToken internalToken = mqttService.publish(clientHandle,
                topic, payload, qos, retained, null, activityToken);
        token.setDelegate(internalToken);
        return token;
    }
複製代碼

從上面代碼能夠看出,發佈消息須要topic消息主題、payload消息內容、callback回調監聽等,經由mqttService.publish繼續執行發佈操做:async

// MqttService類:MQTT惟一組件
    public IMqttDeliveryToken publish(String clientHandle, String topic,
                                      byte[] payload, int qos, boolean retained,
                                      String invocationContext, String activityToken)
            throws MqttPersistenceException, MqttException {
        MqttConnection client = getConnection(clientHandle);
        return client.publish(topic, payload, qos, retained, invocationContext,
                activityToken);
    }
複製代碼

MqttConnection在上一篇中講解過,MQTT的鏈接會初始化一個MqttConnection,並保存在一個Map集合connections中,並經過getConnection(clientHandle)方法獲取。很明顯咱們要接着看client.publish函數啦:ide

// MqttConnection類:
    public IMqttDeliveryToken publish(String topic, byte[] payload, int qos,
                                      boolean retained, String invocationContext,
									  String activityToken) {
		// 用於發佈消息,是否發佈成功的回調
        final Bundle resultBundle = new Bundle();
        resultBundle.putString(MqttServiceConstants.CALLBACK_ACTION,
                MqttServiceConstants.SEND_ACTION);
        resultBundle.putString(MqttServiceConstants.CALLBACK_ACTIVITY_TOKEN,
                activityToken);
        resultBundle.putString(
                MqttServiceConstants.CALLBACK_INVOCATION_CONTEXT,
                invocationContext);

        IMqttDeliveryToken sendToken = null;

        if ((myClient != null) && (myClient.isConnected())) {
            // 攜帶resultBundle數據,用於監聽回調發布消息是否成功
            IMqttActionListener listener = new MqttConnectionListener(
                    resultBundle);
            try {
                MqttMessage message = new MqttMessage(payload);
                message.setQos(qos);
                message.setRetained(retained);
                sendToken = myClient.publish(topic, payload, qos, retained,
                        invocationContext, listener);
                storeSendDetails(topic, message, sendToken, invocationContext,
                        activityToken);
            } catch (Exception e) {
                handleException(resultBundle, e);
            }
        } else {
            resultBundle.putString(MqttServiceConstants.CALLBACK_ERROR_MESSAGE,
                    NOT_CONNECTED);
            service.traceError(MqttServiceConstants.SEND_ACTION, NOT_CONNECTED);
            service.callbackToActivity(clientHandle, Status.ERROR, resultBundle);
        }

        return sendToken;
    }
複製代碼

這段代碼中很明顯能夠看出發佈的操做又交給了myClient.publish方法,那myClient是誰呢?上一篇文章中講過myClient是MqttAsyncClient,是在MQTT鏈接時在MqttConnection類的connect方法中初始化的,詳情請看上一篇。函數

// MqttAsyncClient類:
    public IMqttDeliveryToken publish(String topic, byte[] payload, int qos
        , boolean retained,Object userContext,
        IMqttActionListener callback) throws MqttException,MqttPersistenceException {
        MqttMessage message = new MqttMessage(payload);
        message.setQos(qos);
        message.setRetained(retained);
        return this.publish(topic, message, userContext, callback);
    }
    
    public IMqttDeliveryToken publish(String topic, MqttMessage message
        , Object userContext,
        IMqttActionListener callback) throws MqttException,MqttPersistenceException {
        final String methodName = "publish";
        // @TRACE 111=< topic={0} message={1}userContext={1} callback={2}
        log.fine(CLASS_NAME, methodName, "111", new Object[]{topic, userContext, callback});

        // Checks if a topic is valid when publishing a message.
        MqttTopic.validate(topic, false/* wildcards NOT allowed */);

        MqttDeliveryToken token = new MqttDeliveryToken(getClientId());
        token.setActionCallback(callback);
        token.setUserContext(userContext);
        token.setMessage(message);
        token.internalTok.setTopics(new String[]{topic});

        MqttPublish pubMsg = new MqttPublish(topic, message);
        comms.sendNoWait(pubMsg, token);

        // @TRACE 112=<
        log.fine(CLASS_NAME, methodName, "112");

        return token;
    }
複製代碼

從這段代碼中能夠看到,如今把把topic和message封裝成了MqttPublish類型的消息,並繼續由comms.sendNoWait執行,comms是ClientComms,ClientComms是在初始化MqttAsyncClient的構造方法中初始化的,詳情看上一篇。源碼分析

// ClientComms類:
    public void sendNoWait(MqttWireMessage message, MqttToken token) throws MqttException {
        final String methodName = "sendNoWait";
        // 判斷狀態或者消息類型
        if (isConnected() ||
                (!isConnected() && message instanceof MqttConnect) ||
                (isDisconnecting() && message instanceof MqttDisconnect)) {
            if (disconnectedMessageBuffer != null && disconnectedMessageBuffer.getMessageCount() != 0) {
                //@TRACE 507=Client Connected, Offline Buffer available, but not empty. Adding 
                // message to buffer. message={0}
                log.fine(CLASS_NAME, methodName, "507", new Object[]{message.getKey()});
                if (disconnectedMessageBuffer.isPersistBuffer()) {
                    this.clientState.persistBufferedMessage(message);
                }
                disconnectedMessageBuffer.putMessage(message, token);
            } else {
                // 如今不是disconnect所以,邏輯走這裏
                this.internalSend(message, token);
            }
        } else if (disconnectedMessageBuffer != null) {
            //@TRACE 508=Offline Buffer available. Adding message to buffer. message={0}
            log.fine(CLASS_NAME, methodName, "508", new Object[]{message.getKey()});
            if (disconnectedMessageBuffer.isPersistBuffer()) {
                this.clientState.persistBufferedMessage(message);
            }
            disconnectedMessageBuffer.putMessage(message, token);
        } else {
            //@TRACE 208=failed: not connected
            log.fine(CLASS_NAME, methodName, "208");
            throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
        }
    }
    
    void internalSend(MqttWireMessage message, MqttToken token) throws MqttException {
        final String methodName = "internalSend";
        ...
        try {
            // Persist if needed and send the message
            this.clientState.send(message, token);
        } catch (MqttException e) {
            // 注意此處代碼***
            if (message instanceof MqttPublish) {
                this.clientState.undo((MqttPublish) message);
            }
            throw e;
        }
    }
複製代碼

comms.sendNoWait方法中又調用了本類中的internalSend方法,而且在internalSend方法中又調用了clientState.send(message, token)方法繼續發佈。ClientState對象是在ClientComms初始化的構造方法中初始化的。此處須要注意一下catch裏的代碼,下面會具體說明。post

// ClientState類:
    public void send(MqttWireMessage message, MqttToken token) throws MqttException {
        final String methodName = "send";
        ...

        if (message instanceof MqttPublish) {
            synchronized (queueLock) {
                /**
                 * 注意這裏:actualInFlight實際飛行中>maxInflight最大飛行中
                 * maxInflight:是咱們在本身代碼中經過鏈接選項MqttConnectOptions.setMaxInflight();設置的,默認大小爲10
                */
                if (actualInFlight >= this.maxInflight) {
                    //@TRACE 613= sending {0} msgs at max inflight window
                    log.fine(CLASS_NAME, methodName, "613",
                            new Object[]{new Integer(actualInFlight)});

                    throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT);
                }

                MqttMessage innerMessage = ((MqttPublish) message).getMessage();
                //@TRACE 628=pending publish key={0} qos={1} message={2}
                log.fine(CLASS_NAME, methodName, "628",
                        new Object[]{new Integer(message.getMessageId()),
                                new Integer(innerMessage.getQos()), message});
                /**
                 * 根據本身設置的qos等級,來決定是否須要恢復消息
                 * 這裏須要說明一下qos等級區別:
                 *  qos==0,至多發送一次,不進行重試,Broker不會返回確認消息。
                 *  qos==1,至少發送一次,確保消息到達Broker,Broker須要返回確認消息PUBACK
                 *  qos==2,Broker確定會收到消息,且只收到一次,qos==1可能會發送重複消息
                */
                switch (innerMessage.getQos()) {
                    case 2:
                        outboundQoS2.put(new Integer(message.getMessageId()), message);
                        persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
                        break;
                    case 1:
                        outboundQoS1.put(new Integer(message.getMessageId()), message);
                        persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
                        break;
                }
                tokenStore.saveToken(token, message);
                pendingMessages.addElement(message);
                queueLock.notifyAll();
            }
        } else {
            ...
        }
    }
複製代碼

這段代碼中咱們發現了一個可能須要咱們本身設置的屬性maxInflight,若是實際發送中的消息大於maxInflight約束的最大的話就會拋出MqttException異常,那麼這個異常catch裏是怎麼處理的呢,這就要往回看一步代碼啦,上面已經提示過須要注意ClientComms類中internalSend方法中的catch裏的代碼:ui

if (message instanceof MqttPublish) {
        this.clientState.undo((MqttPublish) message);
    }
複製代碼

能夠很明確的看出若消息類型是MqttPublish,則執行clientState.undo((MqttPublish) message)方法,咱們前面說過消息已經在MqttAsyncClient類的publish方法中把topic和message封裝成了MqttPublish類型的消息,所以此處會執行undo方法:this

// ClientState類:
    protected void undo(MqttPublish message) throws MqttPersistenceException {
        final String methodName = "undo";
        synchronized (queueLock) {
            //@TRACE 618=key={0} QoS={1} 
            log.fine(CLASS_NAME, methodName, "618",
                    new Object[]{new Integer(message.getMessageId()),
                            new Integer(message.getMessage().getQos())});

            if (message.getMessage().getQos() == 1) {
                outboundQoS1.remove(new Integer(message.getMessageId()));
            } else {
                outboundQoS2.remove(new Integer(message.getMessageId()));
            }
            pendingMessages.removeElement(message);
            persistence.remove(getSendPersistenceKey(message));
            tokenStore.removeToken(message);
            if (message.getMessage().getQos() > 0) {
                //Free this message Id so it can be used again
                releaseMessageId(message.getMessageId());
                message.setMessageId(0);
            }

            checkQuiesceLock();
        }
    }
複製代碼

代碼已經很明顯了,就是把大於maxInflight這部分消息remove移除掉,所以在實際操做中要注意本身的Mqtt消息的發佈會不會在短期內達到maxInflight默認的10的峯值,若能達到,則須要手動設置一個適合本身項目的範圍閥值啦。

咱們繼續說clientState.send(message, token)方法裏的邏輯,代碼中註釋中也說明了Mqtt會根據qos等級來決定消息到達機制

qos等級

  • qos==0,至多發送一次,不進行重試,Broker不會返回確認消息,消息可能會丟失。
  • qos==1,至少發送一次,確保消息到達Broker,Broker須要返回確認消息PUBACK,可能會發送重複消息
  • qos==2,Broker確定會收到消息,且只收到一次

根據qos等級,若qos等於1和2,則講消息分別加入Hashtable類型的outboundQoS1和outboundQoS2中,已在後續邏輯中確保消息發送成功併到達。

注:qos等級優先級沒有maxInflight高,從代碼中能夠看出,會先判斷maxInflight再區分qos等級

代碼的最後講消息添加進Vector類型的pendingMessages裏,在上一篇中咱們能夠了解到MQTT的發射器是輪詢檢查pendingMessages裏是否存在數據,若存在則經過socket的OutputStream發送出去。而且會經過接收器接收從Broker發送回來的數據。

監聽Broker返回的消息之數據

發送咱們就不看源碼啦,接收咱們再看一下源碼,經過源碼看一看數據是怎麼回到咱們本身的回調裏的:

// CommsReceiver類中:
    public void run() {
        recThread = Thread.currentThread();
        recThread.setName(threadName);
        final String methodName = "run";
        MqttToken token = null;

        try {
            runningSemaphore.acquire();
        } catch (InterruptedException e) {
            running = false;
            return;
        }

        while (running && (in != null)) {
            try {
                //@TRACE 852=network read message
                log.fine(CLASS_NAME, methodName, "852");
                receiving = in.available() > 0;
                MqttWireMessage message = in.readMqttWireMessage();
                receiving = false;

                // 消息是否屬於Mqtt確認類型
                if (message instanceof MqttAck) {
                    token = tokenStore.getToken(message);
                    // token通常不會爲空,前面已經保存過
                    if (token != null) {
                        synchronized (token) {
                            // ...
                            clientState.notifyReceivedAck((MqttAck) message);
                        }
                    } 
                    ...
            } finally {
                receiving = false;
                runningSemaphore.release();
            }
        }
    }
複製代碼

從代碼中能夠看出,Broker返回來的數據交給了clientState.notifyReceivedAck方法:

// ClientState類:
    protected void notifyReceivedAck(MqttAck ack) throws MqttException {
        final String methodName = "notifyReceivedAck";
        ...

        MqttToken token = tokenStore.getToken(ack);
        MqttException mex = null;

        if (token == null) {
            ...
        } else if (ack instanceof MqttPubRec) {
            // qos==2 是返回
            MqttPubRel rel = new MqttPubRel((MqttPubRec) ack);
            this.send(rel, token);
        } else if (ack instanceof MqttPubAck || ack instanceof MqttPubComp) {
            // qos==1/2 消息移除前通知的結果
            notifyResult(ack, token, mex);
            // Do not remove publish / delivery token at this stage
            // do this when the persistence is removed later 
        } else if (ack instanceof MqttPingResp) {
            // 鏈接心跳數據消息
            ...
        } else if (ack instanceof MqttConnack) {
            // MQTT鏈接消息
            ...
        } else {
            notifyResult(ack, token, mex);
            releaseMessageId(ack.getMessageId());
            tokenStore.removeToken(ack);
        }

        checkQuiesceLock();
    }
複製代碼

從上面註釋可知,發佈的消息qos==0,返回結果是直接走else,而qos==1/2,確認消息也最終會走到notifyResult(ack, token, mex)方法中:

protected void notifyResult(MqttWireMessage ack, MqttToken token, MqttException ex) {
        final String methodName = "notifyResult";
        // 取消阻止等待令牌的任何線程,並保存ack
        token.internalTok.markComplete(ack, ex);
        // 通知此令牌已收到響應消息,設置已完成狀態,並經過isComplete()獲取狀態
        token.internalTok.notifyComplete();

        // 讓用戶知道異步操做已完成,而後刪除令牌
        if (ack != null && ack instanceof MqttAck && !(ack instanceof MqttPubRec)) {
            //@TRACE 648=key{0}, msg={1}, excep={2}
            log.fine(CLASS_NAME, methodName, "648", new Object[]{token.internalTok.getKey(), ack,ex});
            // CommsCallback類
            callback.asyncOperationComplete(token);
        }
        // 有些狀況下,因爲操做失敗,所以沒有確認
        if (ack == null) {
            //@TRACE 649=key={0},excep={1}
            log.fine(CLASS_NAME, methodName, "649", new Object[]{token.internalTok.getKey(), ex});
            callback.asyncOperationComplete(token);
        }
    }
    
// Token類:
    protected void markComplete(MqttWireMessage msg, MqttException ex) {
        final String methodName = "markComplete";
        //@TRACE 404=>key={0} response={1} excep={2}
        log.fine(CLASS_NAME, methodName, "404", new Object[]{getKey(), msg, ex});

        synchronized (responseLock) {
            // ACK means that everything was OK, so mark the message for garbage collection.
            if (msg instanceof MqttAck) {
                this.message = null;
            }
            this.pendingComplete = true;
            // 將消息保存在response成員變量中,並經過getWireMessage()方法獲取消息msg
            this.response = msg;
            this.exception = ex;
        }
    }
// Token類:
    protected void notifyComplete() {
        ...
        synchronized (responseLock) {
            ...
            if (exception == null && pendingComplete) {
                // 設置已完成,並經過isComplete()獲取狀態
                completed = true;
                pendingComplete = false;
            } else {
                pendingComplete = false;
            }

            responseLock.notifyAll();
        }
        ...
    }
複製代碼

此時已將MqttWireMessage消息保存到token中,異步操做已完成,調用回調監聽CommsCallback裏的asyncOperationComplete方法:

// CommsCallback類:
    public void asyncOperationComplete(MqttToken token) {
        final String methodName = "asyncOperationComplete";

        if (running) {
            // invoke callbacks on callback thread
            completeQueue.addElement(token);
            synchronized (workAvailable) {
                // @TRACE 715=new workAvailable. key={0}
                log.fine(CLASS_NAME, methodName, "715", new Object[]{token.internalTok.getKey()});
                workAvailable.notifyAll();
            }
        } else {
            // invoke async callback on invokers thread
            try {
                handleActionComplete(token);
            } catch (Throwable ex) {
                // Users code could throw an Error or Exception e.g. in the case
                // of class NoClassDefFoundError
                // @TRACE 719=callback threw ex:
                log.fine(CLASS_NAME, methodName, "719", null, ex);

                // Shutdown likely already in progress but no harm to confirm
                clientComms.shutdownConnection(null, new MqttException(ex));
            }
        }
    }
複製代碼

CommsCallback是Mqtt鏈接就已經開始一直運行,所以running爲true,因此如今已經將token添加進了completeQueue完成隊列中,CommsCallback跟發射器同樣,一直輪詢等待數據,所以此時completeQueue已有數據,此時CommsCallback的run函數則會有接下來的操做:

// CommsCallback類:
    public void run() {
        ...
        while (running) {
            try {
                ...
                if (running) {
                    // Check for deliveryComplete callbacks...
                    MqttToken token = null;
                    synchronized (completeQueue) {
                        // completeQueue不爲空
                        if (!completeQueue.isEmpty()) {
                            // 獲取第一個token
                            token = (MqttToken) completeQueue.elementAt(0);
                            completeQueue.removeElementAt(0);
                        }
                    }
                    if (null != token) {
                        // token不爲null,執行handleActionComplete
                        handleActionComplete(token);
                    }
                    ...
                }

                if (quiescing) {
                    clientState.checkQuiesceLock();
                }

            } catch (Throwable ex) {
                ...
            } finally {
                ...
            }
        }
    }
    
    private void handleActionComplete(MqttToken token)
            throws MqttException {
        final String methodName = "handleActionComplete";
        synchronized (token) {
            // 由上面已經,isComplete()已設置爲true
            if (token.isComplete()) {
                // Finish by doing any post processing such as delete 
                // from persistent store but only do so if the action
                // is complete
                clientState.notifyComplete(token);
            }
            // 取消阻止任何服務員,若是待完成,如今設置完成
            token.internalTok.notifyComplete();
            if (!token.internalTok.isNotified()) {
 				...
				// 如今調用異步操做完成回調
				fireActionEvent(token);
			}
			...
        }
    }
複製代碼

run中調用了handleActionComplete函數,接着後調用了clientState.notifyComplete()方法和fireActionEvent(token)方法,先看notifyComplete():

// ClientState類:
   protected void notifyComplete(MqttToken token) throws MqttException {

       final String methodName = "notifyComplete";
       // 獲取保存到Token中的Broker返回的消息,上面有說明
       MqttWireMessage message = token.internalTok.getWireMessage();

       if (message != null && message instanceof MqttAck) {
           ...
           MqttAck ack = (MqttAck) message;

           if (ack instanceof MqttPubAck) {
               // qos==1,用戶通知如今從持久性中刪除
               persistence.remove(getSendPersistenceKey(message));
               persistence.remove(getSendBufferedPersistenceKey(message));
               outboundQoS1.remove(new Integer(ack.getMessageId()));
               decrementInFlight();
               releaseMessageId(message.getMessageId());
               tokenStore.removeToken(message);
               // @TRACE 650=removed Qos 1 publish. key={0}
               log.fine(CLASS_NAME, methodName, "650",
                       new Object[]{new Integer(ack.getMessageId())});
           } else if (ack instanceof MqttPubComp) {
               ...
           }

           checkQuiesceLock();
       }
   }
複製代碼

再來看fireActionEvent(token)方法:

// CommsCallback類:
   public void fireActionEvent(MqttToken token) {
       final String methodName = "fireActionEvent";

       if (token != null) {
           IMqttActionListener asyncCB = token.getActionCallback();
           if (asyncCB != null) {
               if (token.getException() == null) {
                   ...
                   asyncCB.onSuccess(token);
               } else {
                   ...
                   asyncCB.onFailure(token, token.getException());
               }
           }
       }
   }
複製代碼

從這段代碼中終於能看到回調onSuccess和onFailure的方法啦,那asyncCB是誰呢?

// MqttToken類:
   public IMqttActionListener getActionCallback() {
       return internalTok.getActionCallback();
   }
// Token類:
   public IMqttActionListener getActionCallback() {
   	return callback;
   }
複製代碼

看到這,一臉懵逼,這究竟是誰呢,其實咱們能夠直接看這個回調設置方法,看看是從哪設置進來的就能夠啦:

// Token類:
   public void setActionCallback(IMqttActionListener listener) {
   	this.callback  = listener;
   }
// MqttToken類:
   public void setActionCallback(IMqttActionListener listener) {
   	internalTok.setActionCallback(listener);
   }
// ConnectActionListener類:
   public void connect() throws MqttPersistenceException {
       // 初始化MqttToken
       MqttToken token = new MqttToken(client.getClientId());
       // 將此類設置成回調類
       token.setActionCallback(this);
       token.setUserContext(this);

       ...
   }
複製代碼

其實早在MQTT鏈接時,就已經將此callback設置好,所以asyncCB就是ConnectActionListener,因此此時就已經走到了ConnectActionListener類裏的onSuccess和onFailure的方法中,咱們只挑一個onSuccess看一看:

// ConnectActionListener類:
   public void onSuccess(IMqttToken token) {
       if (originalMqttVersion == MqttConnectOptions.MQTT_VERSION_DEFAULT) {
           options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_DEFAULT);
       }
       // 此時將Broker的數據保存進了userToken裏
       userToken.internalTok.markComplete(token.getResponse(), null);
       userToken.internalTok.notifyComplete();
       userToken.internalTok.setClient(this.client); 

       comms.notifyConnect();

       if (userCallback != null) {
           userToken.setUserContext(userContext);
           userCallback.onSuccess(userToken);
       }

       if (mqttCallbackExtended != null) {
           String serverURI =
                   comms.getNetworkModules()[comms.getNetworkModuleIndex()].getServerURI();
           mqttCallbackExtended.connectComplete(reconnect, serverURI);
       }

   }
複製代碼

這裏的userCallback又是誰呢?上一篇其實說過的,userCallback其實就是MqttConnection.connect函數中IMqttActionListener listener,因此此時又來到了MqttConnection類裏connect方法裏的listener監聽回調內:

// MqttConnection類:
   public void connect(MqttConnectOptions options, String invocationContext,
                       String activityToken) {
       ...
       service.traceDebug(TAG, "Connecting {" + serverURI + "} as {" + clientId + "}");
       final Bundle resultBundle = new Bundle();
       resultBundle.putString(MqttServiceConstants.CALLBACK_ACTIVITY_TOKEN,
               activityToken);
       resultBundle.putString(
               MqttServiceConstants.CALLBACK_INVOCATION_CONTEXT,
               invocationContext);
       resultBundle.putString(MqttServiceConstants.CALLBACK_ACTION,
               MqttServiceConstants.CONNECT_ACTION);
       try {
            ...
           // 此時邏輯已經來到這裏
           IMqttActionListener listener = new MqttConnectionListener(
                   resultBundle) {

               @Override
               public void onSuccess(IMqttToken asyncActionToken) {
                   // 執行以下代碼:
                   doAfterConnectSuccess(resultBundle);
                   service.traceDebug(TAG, "connect success!");
               }

               @Override
               public void onFailure(IMqttToken asyncActionToken,
                                     Throwable exception) {
                   resultBundle.putString(
                           MqttServiceConstants.CALLBACK_ERROR_MESSAGE,
                           exception.getLocalizedMessage());
                   resultBundle.putSerializable(
                           MqttServiceConstants.CALLBACK_EXCEPTION, exception);
                   service.traceError(TAG,
                           "connect fail, call connect to reconnect.reason:"
                                   + exception.getMessage());

                   doAfterConnectFail(resultBundle);

               }
           };

           if (myClient != null) {
               if (isConnecting) {
                   ...
               } else {
                   service.traceDebug(TAG, "myClient != null and the client is not connected");
                   service.traceDebug(TAG, "Do Real connect!");
                   setConnectingState(true);
                   myClient.connect(connectOptions, invocationContext, listener);
               }
           }

           // if myClient is null, then create a new connection
           else {
               ...
               myClient.connect(connectOptions, invocationContext, listener);
           }
       } catch (Exception e) {
           ...
       }
   }
複製代碼

由這段代碼以及註釋能夠知道,如今以及執行到了MqttConnection類裏的doAfterConnectSuccess方法裏:

// MqttConnection類:
   private void doAfterConnectSuccess(final Bundle resultBundle) {
       // 獲取喚醒鎖
       acquireWakeLock();
       service.callbackToActivity(clientHandle, Status.OK, resultBundle);
       deliverBacklog();
       setConnectingState(false);
       disconnected = false;
       // 釋放喚醒鎖
       releaseWakeLock();
   }
   
   private void deliverBacklog() {
       Iterator<StoredMessage> backlog = service.messageStore
               .getAllArrivedMessages(clientHandle);
       while (backlog.hasNext()) {
           StoredMessage msgArrived = backlog.next();
           Bundle resultBundle = messageToBundle(msgArrived.getMessageId(),
                   msgArrived.getTopic(), msgArrived.getMessage());
           // 關注下這個action,下面會用到
           resultBundle.putString(MqttServiceConstants.CALLBACK_ACTION,
                   MqttServiceConstants.MESSAGE_ARRIVED_ACTION);
           service.callbackToActivity(clientHandle, Status.OK, resultBundle);
       }
   }
複製代碼

能夠看到這個函數中調用了幾個方法中的其中兩個service.callbackToActivity(clientHandle, Status.OK, resultBundle);和deliverBacklog();,deliverBacklog()方法最後也是調用的service.callbackToActivity方法。因此直接看service.callbackToActivity:

// MqttService類:
   void callbackToActivity(String clientHandle, Status status,
                           Bundle dataBundle) {
       // 發送廣播
       Intent callbackIntent = new Intent(
               MqttServiceConstants.CALLBACK_TO_ACTIVITY);
       if (clientHandle != null) {
           callbackIntent.putExtra(
                   MqttServiceConstants.CALLBACK_CLIENT_HANDLE, clientHandle);
       }
       callbackIntent.putExtra(MqttServiceConstants.CALLBACK_STATUS, status);
       if (dataBundle != null) {
           callbackIntent.putExtras(dataBundle);
       }
       LocalBroadcastManager.getInstance(this).sendBroadcast(callbackIntent);
   }
複製代碼

service.callbackToActivity方法其實就是發送廣播,那誰來接收廣播呢?其實接收廣播的就在最開始的MqttAndroidClient,MqttAndroidClient繼承自BroadcastReceiver,因此說MqttAndroidClient自己就是一個廣播接收者,因此咱們來看它的onReceive方法:

// MqttAndroidClient類:
   @Override
   public void onReceive(Context context, Intent intent) {
       Bundle data = intent.getExtras();

       String handleFromIntent = data
               .getString(MqttServiceConstants.CALLBACK_CLIENT_HANDLE);

       if ((handleFromIntent == null)
               || (!handleFromIntent.equals(clientHandle))) {
           return;
       }

       String action = data.getString(MqttServiceConstants.CALLBACK_ACTION);
       // 判斷消息的action類型
       if (MqttServiceConstants.CONNECT_ACTION.equals(action)) {
           connectAction(data);
       } else if (MqttServiceConstants.CONNECT_EXTENDED_ACTION.equals(action)) {
           connectExtendedAction(data);
       } else if (MqttServiceConstants.MESSAGE_ARRIVED_ACTION.equals(action)) {
           messageArrivedAction(data);
       } else if (MqttServiceConstants.SUBSCRIBE_ACTION.equals(action)) {
           subscribeAction(data);
       } else if (MqttServiceConstants.UNSUBSCRIBE_ACTION.equals(action)) {
           unSubscribeAction(data);
       } else if (MqttServiceConstants.SEND_ACTION.equals(action)) {
           // 發佈成功與否的回調
           sendAction(data);
       } else if (MqttServiceConstants.MESSAGE_DELIVERED_ACTION.equals(action)) {
           messageDeliveredAction(data);
       } else if (MqttServiceConstants.ON_CONNECTION_LOST_ACTION
               .equals(action)) {
           connectionLostAction(data);
       } else if (MqttServiceConstants.DISCONNECT_ACTION.equals(action)) {
           disconnected(data);
       } else if (MqttServiceConstants.TRACE_ACTION.equals(action)) {
           traceAction(data);
       } else {
           mqttService.traceError(MqttService.TAG, "Callback action doesn't exist.");
       }
   }
複製代碼

從代碼和註釋以及上面的deliverBacklog方法中能夠知道,咱們如今須要關注的action爲MESSAGE_ARRIVED_ACTION,因此就能夠調用方法messageArrivedAction(data):

// MqttAndroidClient類:
   private void messageArrivedAction(Bundle data) {
       if (callback != null) {
           String messageId = data
                   .getString(MqttServiceConstants.CALLBACK_MESSAGE_ID);
           String destinationName = data
                   .getString(MqttServiceConstants.CALLBACK_DESTINATION_NAME);

           ParcelableMqttMessage message = data
                   .getParcelable(MqttServiceConstants.CALLBACK_MESSAGE_PARCEL);
           try {
               if (messageAck == Ack.AUTO_ACK) {
                   callback.messageArrived(destinationName, message);
                   mqttService.acknowledgeMessageArrival(clientHandle, messageId);
               } else {
                   message.messageId = messageId;
                   callback.messageArrived(destinationName, message);
               }

               // let the service discard the saved message details
           } catch (Exception e) {
               // Swallow the exception
           }
       }
   }
   
   @Override
   public void setCallback(MqttCallback callback) {
       this.callback = callback;
   }
複製代碼

在messageArrivedAction方法中能夠看到,咱們最後調用了callback回調了messageArrived方法,那麼 callback經過上面下部分代碼能夠知道,其實這個callback就是咱們上一篇文章中所說的咱們初始化MqttAndroidClient後,經過方法setCallback設置的咱們本身定義的實現MqttCallback接口的回調類。

監聽Broker返回的消息之發佈消息成功與否

再看下sendAction(data)方法:

private void sendAction(Bundle data) {
       IMqttToken token = getMqttToken(data); 
       // remove on delivery
       simpleAction(token, data);
   }
   
   private void simpleAction(IMqttToken token, Bundle data) {
       if (token != null) {
           Status status = (Status) data
                   .getSerializable(MqttServiceConstants.CALLBACK_STATUS);
           if (status == Status.OK) {
               // 若是發佈成功回調此方法
               ((MqttTokenAndroid) token).notifyComplete();
           } else {
               Exception exceptionThrown =
                       (Exception) data.getSerializable(MqttServiceConstants.CALLBACK_EXCEPTION);
               // 發佈失敗回調
               ((MqttTokenAndroid) token).notifyFailure(exceptionThrown);
           }
       } else {
           if (mqttService != null) {
               mqttService.traceError(MqttService.TAG, "simpleAction : token is null");
           }
       }
   }
複製代碼

接下來再看一看發佈成功回調的MqttTokenAndroid的notifyComplete函數:

// MqttTokenAndroid類:
   void notifyComplete() {
       synchronized (waitObject) {
           isComplete = true;
           waitObject.notifyAll();
           if (listener != null) {
               listener.onSuccess(this);
           }
       }
   }
複製代碼

這裏又調用了listener.onSuccess(this)方法,那麼這個listener是誰?其實listener就是咱們調用MqttAndroidClient類的publish發佈的最後一個參數,即咱們自定義的監聽發佈消息是否發佈成功的回調類。上面在MqttConnection類的publish方法中封裝過MqttServiceConstants.SEND_ACTION的Bundle數據,而此數據是被MqttConnection類裏的MqttConnectionListener攜帶。因此MqttConnectionListener裏的onSuccess被調用時就會調用service.callbackToActivity,繼而到sendBroadcast發送廣播,最後調用sendAction方法,回調自定義的IMqttActionListener的實現類。而MqttConnectionListener裏的onSuccess是在CommsCallback類裏的fireActionEvent方法中,往上走就到CommsCallback類的了handleActionComplete和run()函數。

如今看是否是有點懵畢竟上面有兩個 監聽Broker返回的消息,一個是用來監聽Broker發給客戶端數據的監聽,另外一個是客戶端發佈消息是否發佈成功的監聽而已。二者都是使用MqttActionListener,不過前者在MqttActionListener監聽回調裏最後調用的是自定義的MqttCallback回調而已。而且二者監聽的位置不同,前者是在 MqttConnection類的connect時就已確認下來的,對於一個MQTT鏈接只會有一個,因此這個是一直用來監聽數據的;然後者監聽發佈消息是否成功是每一個publish都須要傳入的,並在MqttConnection類裏的publish初始化。這麼講是否是就清晰一些啦。

哈哈,到此MQTT的publish發佈以及接收Broker數據的源碼分析也看完啦。

(注:如有什麼地方闡述有誤,敬請指正。)

相關文章
相關標籤/搜索