MqttAndroidClient暴露了connect()方法用於鏈接代理服務器:服務器
@Override public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback) throws MqttException { IMqttToken token = new MqttTokenAndroid(this, userContext, callback); ... if (mqttService == null) { // First time - must bind to the service 首次啓動服務 Intent serviceStartIntent = new Intent(); serviceStartIntent.setClassName(myContext, SERVICE_NAME); Object service = myContext.startService(serviceStartIntent); if (service == null) {//若是服務啓動失敗則回調鏈接失敗 IMqttActionListener listener = token.getActionCallback(); if (listener != null) { listener.onFailure(token, new RuntimeException("cannot start service " + SERVICE_NAME)); } } //若是服務啓動成功,則綁定service生命週期 // We bind with BIND_SERVICE_FLAG (0), leaving us the manage the lifecycle // until the last time it is stopped by a call to stopService() myContext.bindService(serviceStartIntent, serviceConnection, Context.BIND_AUTO_CREATE); if (!receiverRegistered) registerReceiver(this); } else { pool.execute(new Runnable() { @Override public void run() { doConnect();//鏈接broker //Register receiver to show shoulder tap. if (!receiverRegistered) registerReceiver(MqttAndroidClient.this); } }); } return token; }
查看方法代碼,connect()中會先檢查是否已啓動MqttService,肯定服務已啓動才執行doConnect()。doConnect()中其實也就是調用:session
mqttService.connect(clientHandle, connectOptions, null, activityToken);
再往下看,定位到MqttConnection.class中的connect()方法:app
public void connect(MqttConnectOptions options, String invocationContext, String activityToken) { connectOptions = options; reconnectActivityToken = activityToken; //根據cleanSession清除歷史消息 if (options != null) { cleanSession = options.isCleanSession(); } if (connectOptions.isCleanSession()) { // if it's a clean session,discard old data service.messageStore.clearArrivedMessages(clientHandle); } service.traceDebug(TAG, "Connecting {" + serverURI + "} as {" + clientId + "}"); final Bundle resultBundle = new Bundle(); resultBundle.putString(MqttServiceConstants.CALLBACK_ACTIVITY_TOKEN, activityToken); resultBundle.putString(MqttServiceConstants.CALLBACK_INVOCATION_CONTEXT, invocationContext); resultBundle.putString(MqttServiceConstants.CALLBACK_ACTION, MqttServiceConstants.CONNECT_ACTION); try { if (persistence == null) { // ask Android where we can put files File myDir = service.getExternalFilesDir(TAG); if (myDir == null) { // No external storage, use internal storage instead. myDir = service.getDir(TAG, Context.MODE_PRIVATE); if (myDir == null) { //Shouldn't happen. resultBundle.putString(MqttServiceConstants.CALLBACK_ERROR_MESSAGE, "Error! No external and internal storage available"); resultBundle.putSerializable(MqttServiceConstants.CALLBACK_EXCEPTION, new MqttPersistenceException()); service.callbackToActivity(clientHandle, Status.ERROR, resultBundle); return; } } // use that to setup MQTT client persistence storage persistence = new MqttDefaultFilePersistence(myDir.getAbsolutePath()); } IMqttActionListener listener = new MqttConnectionListener(resultBundle) { @Override public void onSuccess(IMqttToken asyncActionToken) { doAfterConnectSuccess(resultBundle); service.traceDebug(TAG, "connect success!"); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { resultBundle.putString(MqttServiceConstants.CALLBACK_ERROR_MESSAGE, exception.getLocalizedMessage()); resultBundle.putSerializable(MqttServiceConstants.CALLBACK_EXCEPTION, exception); service.traceError(TAG, "connect fail, call connect to reconnect.reason:" + exception.getMessage()); doAfterConnectFail(resultBundle); } }; if (myClient != null) {//若是已經建立過MqttAsyncClient,也即以前就調用過本connect()方法 if (isConnecting) {//上次調用的connect()還在鏈接中,不作處理,等待connect()結果 service.traceDebug(TAG, "myClient != null and the client is connecting. Connect return directly."); service.traceDebug(TAG, "Connect return:isConnecting:" + isConnecting + ".disconnected:" + disconnected); } else if (!disconnected) {//當前已處於長鏈接,提示鏈接成功 service.traceDebug(TAG, "myClient != null and the client is connected and notify!"); doAfterConnectSuccess(resultBundle); } else {//以前的鏈接未成功或者已掉線,從新嘗試鏈接 service.traceDebug(TAG, "myClient != null and the client is not connected"); service.traceDebug(TAG, "Do Real connect!"); setConnectingState(true); myClient.connect(connectOptions, invocationContext, listener); } } else {// if myClient is null, then create a new connection 鏈接不曾創建或已被銷燬,新建鏈接 alarmPingSender = new AlarmPingSender(service);//用於發送心跳包 myClient = new MqttAsyncClient(serverURI, clientId, persistence, alarmPingSender); myClient.setCallback(this); service.traceDebug(TAG, "Do Real connect!"); setConnectingState(true); myClient.connect(connectOptions, invocationContext, listener); } } catch (Exception e) { service.traceError(TAG, "Exception occurred attempting to connect: " + e.getMessage()); setConnectingState(false); handleException(resultBundle, e); } }
查看以上代碼,我在關鍵行都添加了註釋。另外須要注意到其中有兩個比較重要的對象resultBundle和persistence,persistence用於將connection信息持久化,而resultBundle我會在後面分析,它最終會被用於發送廣播觸發咱們connect、publish、subscribe等的回調監聽。繼續深刻到MqttAsyncClient.connect():async
public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback) throws MqttException, MqttSecurityException { ... final boolean automaticReconnect = options.isAutomaticReconnect(); comms.setNetworkModules(createNetworkModules(serverURI, options)); comms.setReconnectCallback(new MqttCallbackExtended() { public void messageArrived(String topic, MqttMessage message) throws Exception { } public void deliveryComplete(IMqttDeliveryToken token) { } public void connectComplete(boolean reconnect, String serverURI) { } public void connectionLost(Throwable cause) { if(automaticReconnect){ // Automatic reconnect is set so make sure comms is in resting state comms.setRestingState(true); reconnecting = true; startReconnectCycle(); } } }); // Insert our own callback to iterate through the URIs till the connect succeeds MqttToken userToken = new MqttToken(getClientId()); ConnectActionListener connectActionListener = new ConnectActionListener(this, persistence, comms, options, userToken, userContext, callback, reconnecting); userToken.setActionCallback(connectActionListener); userToken.setUserContext(this); // If we are using the MqttCallbackExtended, set it on the connectActionListener if(this.mqttCallback instanceof MqttCallbackExtended){ connectActionListener.setMqttCallbackExtended((MqttCallbackExtended)this.mqttCallback); } comms.setNetworkModuleIndex(0); connectActionListener.connect(); return userToken; }
代碼比較多,我省略了一部分,主要關注comms.setReconnectCallback()中的自動重連邏輯便可,自動重連的實現看下面的attemptReconnect()方法,重連失敗會繼續重連直到鏈接成功,不太重連的間隔時間會隨着重連次數增長最大到128s:ide
private void attemptReconnect(){ final String methodName = "attemptReconnect"; //@Trace 500=Attempting to reconnect client: {0} try { connect(this.connOpts, this.userContext,new IMqttActionListener() { public void onSuccess(IMqttToken asyncActionToken) { //@Trace 501=Automatic Reconnect Successful: {0} comms.setRestingState(false); stopReconnectCycle(); } public void onFailure(IMqttToken asyncActionToken, Throwable exception) { //@Trace 502=Automatic Reconnect failed, rescheduling: {0} if(reconnectDelay < 128000){//reconnectDelay初始值爲1000,每次重連失敗時*2 reconnectDelay = reconnectDelay * 2; } rescheduleReconnectCycle(reconnectDelay); } }); } catch (MqttSecurityException ex) { //@TRACE 804=exception } catch (MqttException ex) { //@TRACE 804=exception } }
好了,看完重連邏輯咱們再回到前面的connect()方法,MqttAsyncClient.connect()會進入ClientComms.connect():ui
public void connect(MqttConnectOptions options, MqttToken token) throws MqttException { final String methodName = "connect"; synchronized (conLock) { if (isDisconnected() && !closePending) { ... tokenStore.open(); ConnectBG conbg = new ConnectBG(this, token, connect); conbg.start();//經由ConnectBG而後執行ClientComms.internalSend()方法 }else { ... } } }
經由ConnectBG而後執行ClientComms.internalSend()方法,並最終進入ClientState.send()方法this
public void send(MqttWireMessage message, MqttToken token) throws MqttException { final String methodName = "send"; 。。。 if (message instanceof MqttPublish) { synchronized (queueLock) { if (actualInFlight >= this.maxInflight) { //@TRACE 613= sending {0} msgs at max inflight window log.fine(CLASS_NAME, methodName, "613", new Object[]{new Integer(actualInFlight)}); throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT); } MqttMessage innerMessage = ((MqttPublish) message).getMessage(); //@TRACE 628=pending publish key={0} qos={1} message={2} log.fine(CLASS_NAME,methodName,"628", new Object[]{new Integer(message.getMessageId()), new Integer(innerMessage.getQos()), message}); switch(innerMessage.getQos()) { case 2: outboundQoS2.put(new Integer(message.getMessageId()), message); persistence.put(getSendPersistenceKey(message), (MqttPublish) message); break; case 1: outboundQoS1.put(new Integer(message.getMessageId()), message); persistence.put(getSendPersistenceKey(message), (MqttPublish) message); break; } tokenStore.saveToken(token, message); pendingMessages.addElement(message); queueLock.notifyAll(); } } else { //@TRACE 615=pending send key={0} message {1} log.fine(CLASS_NAME,methodName,"615", new Object[]{new Integer(message.getMessageId()), message}); if (message instanceof MqttConnect) { synchronized (queueLock) { // Add the connect action at the head of the pending queue ensuring it jumps // ahead of any of other pending actions. tokenStore.saveToken(token, message); pendingFlows.insertElementAt(message,0); queueLock.notifyAll(); } } else { if (message instanceof MqttPingReq) { this.pingCommand = message; } else if (message instanceof MqttPubRel) { outboundQoS2.put(new Integer(message.getMessageId()), message); persistence.put(getSendConfirmPersistenceKey(message), (MqttPubRel) message); } else if (message instanceof MqttPubComp) { persistence.remove(getReceivedPersistenceKey(message)); } synchronized (queueLock) { if ( !(message instanceof MqttAck )) { tokenStore.saveToken(token, message); } pendingFlows.addElement(message); queueLock.notifyAll(); } } } }
如今咱們再回頭聊一聊剛纔說的resultBundle,取其中一處引用:代理
private void doAfterConnectSuccess(final Bundle resultBundle) { //since the device's cpu can go to sleep, acquire a wakelock and drop it later. acquireWakeLock(); service.callbackToActivity(clientHandle, Status.OK, resultBundle); deliverBacklog(); setConnectingState(false); disconnected = false; releaseWakeLock(); }
鏈接成功後會調用MqttService.callbackToActivity(),resultBundle就做爲其中一個參數被傳入,接下來咱們看看這個方法的實現:rest
/** * 全部消息都經此方法發出 * pass data back to the Activity, by building a suitable Intent object and * broadcasting it * * @param clientHandle source of the data * @param status OK or Error * @param dataBundle the data to be passed */ void callbackToActivity(String clientHandle, Status status, Bundle dataBundle) { // Don't call traceDebug, as it will try to callbackToActivity leading // to recursion. Intent callbackIntent = new Intent(MqttServiceConstants.CALLBACK_TO_ACTIVITY); if (clientHandle != null) { callbackIntent.putExtra(MqttServiceConstants.CALLBACK_CLIENT_HANDLE, clientHandle); } callbackIntent.putExtra(MqttServiceConstants.CALLBACK_STATUS, status); if (dataBundle != null) { callbackIntent.putExtras(dataBundle); } LocalBroadcastManager.getInstance(this).sendBroadcast(callbackIntent); }
callbackToActivity()方法用於發送本地廣播,廣播中攜帶resultBundle,其實包括publish、subscribe等行爲不論成功失敗都會調用此方法,發出一個指示行爲類型及狀態的本地廣播。那麼發出的廣播又是在哪裏被處理的呢?請往下看。MqttAndroidClient類繼承自BroadCastReceiver,查看其onReceive()方法:server
@Override public void onReceive(Context context, Intent intent) { Bundle data = intent.getExtras(); String handleFromIntent = data.getString(MqttServiceConstants.CALLBACK_CLIENT_HANDLE); if ((handleFromIntent == null) || (!handleFromIntent.equals(clientHandle))) { return; } String action = data.getString(MqttServiceConstants.CALLBACK_ACTION); if (MqttServiceConstants.CONNECT_ACTION.equals(action)) { connectAction(data); } else if (MqttServiceConstants.CONNECT_EXTENDED_ACTION.equals(action)) { connectExtendedAction(data); } else if (MqttServiceConstants.MESSAGE_ARRIVED_ACTION.equals(action)) { messageArrivedAction(data); } else if (MqttServiceConstants.SUBSCRIBE_ACTION.equals(action)) { subscribeAction(data); } else if (MqttServiceConstants.UNSUBSCRIBE_ACTION.equals(action)) { unSubscribeAction(data); } else if (MqttServiceConstants.SEND_ACTION.equals(action)) { sendAction(data); } else if (MqttServiceConstants.MESSAGE_DELIVERED_ACTION.equals(action)) { messageDeliveredAction(data); } else if (MqttServiceConstants.ON_CONNECTION_LOST_ACTION.equals(action)) { connectionLostAction(data); } else if (MqttServiceConstants.DISCONNECT_ACTION.equals(action)) { disconnected(data); } else if (MqttServiceConstants.TRACE_ACTION.equals(action)) { traceAction(data); } else { mqttService.traceError(MqttService.TAG, "Callback action doesn't exist."); } }
沒錯,data.getString(MqttServiceConstants.CALLBACK_ACTION)獲取的就是咱們前面存放在resultBundle中的action,而後根據action去回調callback的對應方法,這裏的callback就是咱們創建鏈接時傳入MqttAndroidClient的MqttCallback對象,若是須要監聽action爲MqttServiceConstants.CONNECT_EXTENDED_ACTION的行爲,則要求咱們傳入的callback必須爲MqttCallbackExtended的實現,MqttCallbackExtended是MqttCallback的子類