paho.mqtt.android代碼逐步分析(四)

MqttAndroidClient暴露了connect()方法用於鏈接代理服務器:服務器

@Override
public IMqttToken connect(MqttConnectOptions options, Object userContext,
                          IMqttActionListener callback) throws MqttException {

    IMqttToken token = new MqttTokenAndroid(this, userContext, callback);
    ...

    if (mqttService == null) { // First time - must bind to the service 首次啓動服務
        Intent serviceStartIntent = new Intent();
        serviceStartIntent.setClassName(myContext, SERVICE_NAME);
        Object service = myContext.startService(serviceStartIntent);
        if (service == null) {//若是服務啓動失敗則回調鏈接失敗
            IMqttActionListener listener = token.getActionCallback();
            if (listener != null) {
                listener.onFailure(token, new RuntimeException("cannot start service " + SERVICE_NAME));
            }
        }

        //若是服務啓動成功,則綁定service生命週期
        // We bind with BIND_SERVICE_FLAG (0), leaving us the manage the lifecycle
        // until the last time it is stopped by a call to stopService()
        myContext.bindService(serviceStartIntent, serviceConnection, Context.BIND_AUTO_CREATE);

        if (!receiverRegistered) registerReceiver(this);
    } else {
        pool.execute(new Runnable() {

            @Override
            public void run() {
                doConnect();//鏈接broker

                //Register receiver to show shoulder tap.
                if (!receiverRegistered) registerReceiver(MqttAndroidClient.this);
            }

        });
    }

    return token;
}

查看方法代碼,connect()中會先檢查是否已啓動MqttService,肯定服務已啓動才執行doConnect()。doConnect()中其實也就是調用:session

mqttService.connect(clientHandle, connectOptions, null, activityToken);

再往下看,定位到MqttConnection.class中的connect()方法:app

public void connect(MqttConnectOptions options, String invocationContext, String activityToken) {
    connectOptions = options;
    reconnectActivityToken = activityToken;

    //根據cleanSession清除歷史消息
    if (options != null) {
        cleanSession = options.isCleanSession();
    }
    if (connectOptions.isCleanSession()) { // if it's a clean session,discard old data
        service.messageStore.clearArrivedMessages(clientHandle);
    }

    service.traceDebug(TAG, "Connecting {" + serverURI + "} as {" + clientId + "}");
    final Bundle resultBundle = new Bundle();
    resultBundle.putString(MqttServiceConstants.CALLBACK_ACTIVITY_TOKEN, activityToken);
    resultBundle.putString(MqttServiceConstants.CALLBACK_INVOCATION_CONTEXT, invocationContext);
    resultBundle.putString(MqttServiceConstants.CALLBACK_ACTION, MqttServiceConstants.CONNECT_ACTION);

    try {
        if (persistence == null) {
            // ask Android where we can put files
            File myDir = service.getExternalFilesDir(TAG);
            if (myDir == null) {
                // No external storage, use internal storage instead.
                myDir = service.getDir(TAG, Context.MODE_PRIVATE);

                if (myDir == null) {
                    //Shouldn't happen.
                    resultBundle.putString(MqttServiceConstants.CALLBACK_ERROR_MESSAGE, "Error! No external and internal storage available");
                    resultBundle.putSerializable(MqttServiceConstants.CALLBACK_EXCEPTION, new MqttPersistenceException());
                    service.callbackToActivity(clientHandle, Status.ERROR, resultBundle);
                    return;
                }
            }

            // use that to setup MQTT client persistence storage
            persistence = new MqttDefaultFilePersistence(myDir.getAbsolutePath());
        }

        IMqttActionListener listener = new MqttConnectionListener(resultBundle) {

            @Override
            public void onSuccess(IMqttToken asyncActionToken) {
                doAfterConnectSuccess(resultBundle);
                service.traceDebug(TAG, "connect success!");
            }

            @Override
            public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                resultBundle.putString(MqttServiceConstants.CALLBACK_ERROR_MESSAGE, exception.getLocalizedMessage());
                resultBundle.putSerializable(MqttServiceConstants.CALLBACK_EXCEPTION, exception);
                service.traceError(TAG, "connect fail, call connect to reconnect.reason:" + exception.getMessage());

                doAfterConnectFail(resultBundle);
            }
        };

        if (myClient != null) {//若是已經建立過MqttAsyncClient,也即以前就調用過本connect()方法
            if (isConnecting) {//上次調用的connect()還在鏈接中,不作處理,等待connect()結果
                service.traceDebug(TAG, "myClient != null and the client is connecting. Connect return directly.");
                service.traceDebug(TAG, "Connect return:isConnecting:" + isConnecting + ".disconnected:" + disconnected);
            } else if (!disconnected) {//當前已處於長鏈接,提示鏈接成功
                service.traceDebug(TAG, "myClient != null and the client is connected and notify!");
                doAfterConnectSuccess(resultBundle);
            } else {//以前的鏈接未成功或者已掉線,從新嘗試鏈接
                service.traceDebug(TAG, "myClient != null and the client is not connected");
                service.traceDebug(TAG, "Do Real connect!");
                setConnectingState(true);
                myClient.connect(connectOptions, invocationContext, listener);
            }
        } else {// if myClient is null, then create a new connection 鏈接不曾創建或已被銷燬,新建鏈接
            alarmPingSender = new AlarmPingSender(service);//用於發送心跳包
            myClient = new MqttAsyncClient(serverURI, clientId, persistence, alarmPingSender);
            myClient.setCallback(this);

            service.traceDebug(TAG, "Do Real connect!");
            setConnectingState(true);
            myClient.connect(connectOptions, invocationContext, listener);
        }
    } catch (Exception e) {
        service.traceError(TAG, "Exception occurred attempting to connect: " + e.getMessage());
        setConnectingState(false);
        handleException(resultBundle, e);
    }
}

查看以上代碼,我在關鍵行都添加了註釋。另外須要注意到其中有兩個比較重要的對象resultBundle和persistence,persistence用於將connection信息持久化,而resultBundle我會在後面分析,它最終會被用於發送廣播觸發咱們connect、publish、subscribe等的回調監聽。繼續深刻到MqttAsyncClient.connect():async

public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback)
      throws MqttException, MqttSecurityException {
    ...

   final boolean automaticReconnect = options.isAutomaticReconnect();

   comms.setNetworkModules(createNetworkModules(serverURI, options));
   comms.setReconnectCallback(new MqttCallbackExtended() {
      
      public void messageArrived(String topic, MqttMessage message) throws Exception {
      }
      public void deliveryComplete(IMqttDeliveryToken token) {
      }
      public void connectComplete(boolean reconnect, String serverURI) {
      }

      public void connectionLost(Throwable cause) {
         if(automaticReconnect){
               // Automatic reconnect is set so make sure comms is in resting state
               comms.setRestingState(true);
               reconnecting = true;
               startReconnectCycle();
            }
      }
   });
   
   // Insert our own callback to iterate through the URIs till the connect succeeds
   MqttToken userToken = new MqttToken(getClientId());
   ConnectActionListener connectActionListener = new ConnectActionListener(this, persistence, comms, options, userToken, userContext, callback, reconnecting);
   userToken.setActionCallback(connectActionListener);
   userToken.setUserContext(this);

   // If we are using the MqttCallbackExtended, set it on the connectActionListener
   if(this.mqttCallback instanceof MqttCallbackExtended){
      connectActionListener.setMqttCallbackExtended((MqttCallbackExtended)this.mqttCallback);
   }

   comms.setNetworkModuleIndex(0);
   connectActionListener.connect();

   return userToken;
}

代碼比較多,我省略了一部分,主要關注comms.setReconnectCallback()中的自動重連邏輯便可,自動重連的實現看下面的attemptReconnect()方法,重連失敗會繼續重連直到鏈接成功,不太重連的間隔時間會隨着重連次數增長最大到128s:ide

private void attemptReconnect(){
   final String methodName = "attemptReconnect";  
   //@Trace 500=Attempting to reconnect client: {0}
   try {
      connect(this.connOpts, this.userContext,new IMqttActionListener() {

         public void onSuccess(IMqttToken asyncActionToken) {
            //@Trace 501=Automatic Reconnect Successful: {0}
            comms.setRestingState(false);
            stopReconnectCycle();
         }
         
         public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
            //@Trace 502=Automatic Reconnect failed, rescheduling: {0}
            if(reconnectDelay < 128000){//reconnectDelay初始值爲1000,每次重連失敗時*2
               reconnectDelay = reconnectDelay * 2;
            }
            rescheduleReconnectCycle(reconnectDelay);
         }
      });
   } catch (MqttSecurityException ex) {
      //@TRACE 804=exception
   } catch (MqttException ex) {
      //@TRACE 804=exception
   }
}

好了,看完重連邏輯咱們再回到前面的connect()方法,MqttAsyncClient.connect()會進入ClientComms.connect():ui

public void connect(MqttConnectOptions options, MqttToken token) throws MqttException {
   final String methodName = "connect";
   synchronized (conLock) {
      if (isDisconnected() && !closePending) {
         ...
         tokenStore.open();
         ConnectBG conbg = new ConnectBG(this, token, connect);
         conbg.start();//經由ConnectBG而後執行ClientComms.internalSend()方法
      }else {
         ...
      }
   }
}

經由ConnectBG而後執行ClientComms.internalSend()方法,並最終進入ClientState.send()方法this

public void send(MqttWireMessage message, MqttToken token) throws MqttException {
   final String methodName = "send";
。。。
   if (message instanceof MqttPublish) {
      synchronized (queueLock) {
         if (actualInFlight >= this.maxInflight) {
            //@TRACE 613= sending {0} msgs at max inflight window
            log.fine(CLASS_NAME, methodName, "613", new Object[]{new Integer(actualInFlight)});

            throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT);
         }
         
         MqttMessage innerMessage = ((MqttPublish) message).getMessage();
         //@TRACE 628=pending publish key={0} qos={1} message={2}
         log.fine(CLASS_NAME,methodName,"628", new Object[]{new Integer(message.getMessageId()), new Integer(innerMessage.getQos()), message});

         switch(innerMessage.getQos()) {
            case 2:
               outboundQoS2.put(new Integer(message.getMessageId()), message);
               persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
               break;
            case 1:
               outboundQoS1.put(new Integer(message.getMessageId()), message);
               persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
               break;
         }
         tokenStore.saveToken(token, message);
         pendingMessages.addElement(message);
         queueLock.notifyAll();
      }
   } else {
      //@TRACE 615=pending send key={0} message {1}
      log.fine(CLASS_NAME,methodName,"615", new Object[]{new Integer(message.getMessageId()), message});
      
      if (message instanceof MqttConnect) {
         synchronized (queueLock) {
            // Add the connect action at the head of the pending queue ensuring it jumps
            // ahead of any of other pending actions.
            tokenStore.saveToken(token, message);
            pendingFlows.insertElementAt(message,0);
            queueLock.notifyAll();
         }
      } else {
         if (message instanceof MqttPingReq) {
            this.pingCommand = message;
         }
         else if (message instanceof MqttPubRel) {
            outboundQoS2.put(new Integer(message.getMessageId()), message);
            persistence.put(getSendConfirmPersistenceKey(message), (MqttPubRel) message);
         }
         else if (message instanceof MqttPubComp)  {
            persistence.remove(getReceivedPersistenceKey(message));
         }
         
         synchronized (queueLock) {
            if ( !(message instanceof MqttAck )) {
               tokenStore.saveToken(token, message);
            }
            pendingFlows.addElement(message);
            queueLock.notifyAll();
         }
      }
   }
}

如今咱們再回頭聊一聊剛纔說的resultBundle,取其中一處引用:代理

private void doAfterConnectSuccess(final Bundle resultBundle) {
    //since the device's cpu can go to sleep, acquire a wakelock and drop it later.
    acquireWakeLock();
    service.callbackToActivity(clientHandle, Status.OK, resultBundle);
    deliverBacklog();
    setConnectingState(false);
    disconnected = false;
    releaseWakeLock();
}

鏈接成功後會調用MqttService.callbackToActivity(),resultBundle就做爲其中一個參數被傳入,接下來咱們看看這個方法的實現:rest

/**
 * 全部消息都經此方法發出
 * pass data back to the Activity, by building a suitable Intent object and
 * broadcasting it
 *
 * @param clientHandle source of the data
 * @param status       OK or Error
 * @param dataBundle   the data to be passed
 */
void callbackToActivity(String clientHandle, Status status, Bundle dataBundle) {
    // Don't call traceDebug, as it will try to callbackToActivity leading
    // to recursion.
    Intent callbackIntent = new Intent(MqttServiceConstants.CALLBACK_TO_ACTIVITY);
    if (clientHandle != null) {
        callbackIntent.putExtra(MqttServiceConstants.CALLBACK_CLIENT_HANDLE, clientHandle);
    }
    callbackIntent.putExtra(MqttServiceConstants.CALLBACK_STATUS, status);
    if (dataBundle != null) {
        callbackIntent.putExtras(dataBundle);
    }
    LocalBroadcastManager.getInstance(this).sendBroadcast(callbackIntent);
}

callbackToActivity()方法用於發送本地廣播,廣播中攜帶resultBundle,其實包括publish、subscribe等行爲不論成功失敗都會調用此方法,發出一個指示行爲類型及狀態的本地廣播。那麼發出的廣播又是在哪裏被處理的呢?請往下看。MqttAndroidClient類繼承自BroadCastReceiver,查看其onReceive()方法:server

@Override
public void onReceive(Context context, Intent intent) {
    Bundle data = intent.getExtras();
    String handleFromIntent = data.getString(MqttServiceConstants.CALLBACK_CLIENT_HANDLE);

    if ((handleFromIntent == null) || (!handleFromIntent.equals(clientHandle))) {
        return;
    }

    String action = data.getString(MqttServiceConstants.CALLBACK_ACTION);

    if (MqttServiceConstants.CONNECT_ACTION.equals(action)) {
        connectAction(data);
    } else if (MqttServiceConstants.CONNECT_EXTENDED_ACTION.equals(action)) {
        connectExtendedAction(data);
    } else if (MqttServiceConstants.MESSAGE_ARRIVED_ACTION.equals(action)) {
        messageArrivedAction(data);
    } else if (MqttServiceConstants.SUBSCRIBE_ACTION.equals(action)) {
        subscribeAction(data);
    } else if (MqttServiceConstants.UNSUBSCRIBE_ACTION.equals(action)) {
        unSubscribeAction(data);
    } else if (MqttServiceConstants.SEND_ACTION.equals(action)) {
        sendAction(data);
    } else if (MqttServiceConstants.MESSAGE_DELIVERED_ACTION.equals(action)) {
        messageDeliveredAction(data);
    } else if (MqttServiceConstants.ON_CONNECTION_LOST_ACTION.equals(action)) {
        connectionLostAction(data);
    } else if (MqttServiceConstants.DISCONNECT_ACTION.equals(action)) {
        disconnected(data);
    } else if (MqttServiceConstants.TRACE_ACTION.equals(action)) {
        traceAction(data);
    } else {
        mqttService.traceError(MqttService.TAG, "Callback action doesn't exist.");
    }
}

沒錯,data.getString(MqttServiceConstants.CALLBACK_ACTION)獲取的就是咱們前面存放在resultBundle中的action,而後根據action去回調callback的對應方法,這裏的callback就是咱們創建鏈接時傳入MqttAndroidClient的MqttCallback對象,若是須要監聽action爲MqttServiceConstants.CONNECT_EXTENDED_ACTION的行爲,則要求咱們傳入的callback必須爲MqttCallbackExtended的實現,MqttCallbackExtended是MqttCallback的子類

相關文章
相關標籤/搜索