@Override public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback) throws MqttException { IMqttToken token = new MqttTokenAndroid(this, userContext, callback); ... if (mqttService == null) { // First time - must bind to the service 首次啓動服務 Intent serviceStartIntent = new Intent(); serviceStartIntent.setClassName(myContext, SERVICE_NAME); Object service = myContext.startService(serviceStartIntent); if (service == null) {//若是服務啓動失敗則回調鏈接失敗 IMqttActionListener listener = token.getActionCallback(); if (listener != null) { listener.onFailure(token, new RuntimeException("cannot start service " + SERVICE_NAME)); } } //若是服務啓動成功,則綁定service生命週期 // We bind with BIND_SERVICE_FLAG (0), leaving us the manage the lifecycle // until the last time it is stopped by a call to stopService() myContext.bindService(serviceStartIntent, serviceConnection, Context.BIND_AUTO_CREATE); if (!receiverRegistered) registerReceiver(this); } else { pool.execute(new Runnable() { @Override public void run() { doConnect();//鏈接broker //Register receiver to show shoulder tap. if (!receiverRegistered) registerReceiver(MqttAndroidClient.this); } }); } return token; }
mqttService.connect(clientHandle, connectOptions, null, activityToken);
public void connect(MqttConnectOptions options, String invocationContext, String activityToken) { connectOptions = options; reconnectActivityToken = activityToken; //根據cleanSession清除歷史消息 if (options != null) { cleanSession = options.isCleanSession(); } if (connectOptions.isCleanSession()) { // if it's a clean session,discard old data service.messageStore.clearArrivedMessages(clientHandle); } service.traceDebug(TAG, "Connecting {" + serverURI + "} as {" + clientId + "}"); final Bundle resultBundle = new Bundle(); resultBundle.putString(MqttServiceConstants.CALLBACK_ACTIVITY_TOKEN, activityToken); resultBundle.putString(MqttServiceConstants.CALLBACK_INVOCATION_CONTEXT, invocationContext); resultBundle.putString(MqttServiceConstants.CALLBACK_ACTION, MqttServiceConstants.CONNECT_ACTION); try { if (persistence == null) { // ask Android where we can put files File myDir = service.getExternalFilesDir(TAG); if (myDir == null) { // No external storage, use internal storage instead. myDir = service.getDir(TAG, Context.MODE_PRIVATE); if (myDir == null) { //Shouldn't happen. resultBundle.putString(MqttServiceConstants.CALLBACK_ERROR_MESSAGE, "Error! No external and internal storage available"); resultBundle.putSerializable(MqttServiceConstants.CALLBACK_EXCEPTION, new MqttPersistenceException()); service.callbackToActivity(clientHandle, Status.ERROR, resultBundle); return; } } // use that to setup MQTT client persistence storage persistence = new MqttDefaultFilePersistence(myDir.getAbsolutePath()); } IMqttActionListener listener = new MqttConnectionListener(resultBundle) { @Override public void onSuccess(IMqttToken asyncActionToken) { doAfterConnectSuccess(resultBundle); service.traceDebug(TAG, "connect success!"); } @Override public void onFailure(IMqttToken asyncActionToken, Throwable exception) { resultBundle.putString(MqttServiceConstants.CALLBACK_ERROR_MESSAGE, exception.getLocalizedMessage()); resultBundle.putSerializable(MqttServiceConstants.CALLBACK_EXCEPTION, exception); service.traceError(TAG, "connect fail, call connect to reconnect.reason:" + exception.getMessage()); doAfterConnectFail(resultBundle); } }; if (myClient != null) {//若是已經建立過MqttAsyncClient,也即以前就調用過本connect()方法 if (isConnecting) {//上次調用的connect()還在鏈接中,不作處理,等待connect()結果 service.traceDebug(TAG, "myClient != null and the client is connecting. Connect return directly."); service.traceDebug(TAG, "Connect return:isConnecting:" + isConnecting + ".disconnected:" + disconnected); } else if (!disconnected) {//當前已處於長鏈接,提示鏈接成功 service.traceDebug(TAG, "myClient != null and the client is connected and notify!"); doAfterConnectSuccess(resultBundle); } else {//以前的鏈接未成功或者已掉線,從新嘗試鏈接 service.traceDebug(TAG, "myClient != null and the client is not connected"); service.traceDebug(TAG, "Do Real connect!"); setConnectingState(true); myClient.connect(connectOptions, invocationContext, listener); } } else {// if myClient is null, then create a new connection 鏈接不曾創建或已被銷燬,新建鏈接 alarmPingSender = new AlarmPingSender(service);//用於發送心跳包 myClient = new MqttAsyncClient(serverURI, clientId, persistence, alarmPingSender); myClient.setCallback(this); service.traceDebug(TAG, "Do Real connect!"); setConnectingState(true); myClient.connect(connectOptions, invocationContext, listener); } } catch (Exception e) { service.traceError(TAG, "Exception occurred attempting to connect: " + e.getMessage()); setConnectingState(false); handleException(resultBundle, e); } }
public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback) throws MqttException, MqttSecurityException { ... final boolean automaticReconnect = options.isAutomaticReconnect(); comms.setNetworkModules(createNetworkModules(serverURI, options)); comms.setReconnectCallback(new MqttCallbackExtended() { public void messageArrived(String topic, MqttMessage message) throws Exception { } public void deliveryComplete(IMqttDeliveryToken token) { } public void connectComplete(boolean reconnect, String serverURI) { } public void connectionLost(Throwable cause) { if(automaticReconnect){ // Automatic reconnect is set so make sure comms is in resting state comms.setRestingState(true); reconnecting = true; startReconnectCycle(); } } }); // Insert our own callback to iterate through the URIs till the connect succeeds MqttToken userToken = new MqttToken(getClientId()); ConnectActionListener connectActionListener = new ConnectActionListener(this, persistence, comms, options, userToken, userContext, callback, reconnecting); userToken.setActionCallback(connectActionListener); userToken.setUserContext(this); // If we are using the MqttCallbackExtended, set it on the connectActionListener if(this.mqttCallback instanceof MqttCallbackExtended){ connectActionListener.setMqttCallbackExtended((MqttCallbackExtended)this.mqttCallback); } comms.setNetworkModuleIndex(0); connectActionListener.connect(); return userToken; }
private void attemptReconnect(){ final String methodName = "attemptReconnect"; //@Trace 500=Attempting to reconnect client: {0} try { connect(this.connOpts, this.userContext,new IMqttActionListener() { public void onSuccess(IMqttToken asyncActionToken) { //@Trace 501=Automatic Reconnect Successful: {0} comms.setRestingState(false); stopReconnectCycle(); } public void onFailure(IMqttToken asyncActionToken, Throwable exception) { //@Trace 502=Automatic Reconnect failed, rescheduling: {0} if(reconnectDelay < 128000){//reconnectDelay初始值爲1000,每次重連失敗時*2 reconnectDelay = reconnectDelay * 2; } rescheduleReconnectCycle(reconnectDelay); } }); } catch (MqttSecurityException ex) { //@TRACE 804=exception } catch (MqttException ex) { //@TRACE 804=exception } }
public void connect(MqttConnectOptions options, MqttToken token) throws MqttException { final String methodName = "connect"; synchronized (conLock) { if (isDisconnected() && !closePending) { ...; ConnectBG conbg = new ConnectBG(this, token, connect); conbg.start();//經由ConnectBG而後執行ClientComms.internalSend()方法 }else { ... } } }
public void send(MqttWireMessage message, MqttToken token) throws MqttException { final String methodName = "send"; 。。。 if (message instanceof MqttPublish) { synchronized (queueLock) { if (actualInFlight >= this.maxInflight) { //@TRACE 613= sending {0} msgs at max inflight window log.fine(CLASS_NAME, methodName, "613", new Object[]{new Integer(actualInFlight)}); throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT); } MqttMessage innerMessage = ((MqttPublish) message).getMessage(); //@TRACE 628=pending publish key={0} qos={1} message={2} log.fine(CLASS_NAME,methodName,"628", new Object[]{new Integer(message.getMessageId()), new Integer(innerMessage.getQos()), message}); switch(innerMessage.getQos()) { case 2: outboundQoS2.put(new Integer(message.getMessageId()), message); persistence.put(getSendPersistenceKey(message), (MqttPublish) message); break; case 1: outboundQoS1.put(new Integer(message.getMessageId()), message); persistence.put(getSendPersistenceKey(message), (MqttPublish) message); break; } tokenStore.saveToken(token, message); pendingMessages.addElement(message); queueLock.notifyAll(); } } else { //@TRACE 615=pending send key={0} message {1} log.fine(CLASS_NAME,methodName,"615", new Object[]{new Integer(message.getMessageId()), message}); if (message instanceof MqttConnect) { synchronized (queueLock) { // Add the connect action at the head of the pending queue ensuring it jumps // ahead of any of other pending actions. tokenStore.saveToken(token, message); pendingFlows.insertElementAt(message,0); queueLock.notifyAll(); } } else { if (message instanceof MqttPingReq) { this.pingCommand = message; } else if (message instanceof MqttPubRel) { outboundQoS2.put(new Integer(message.getMessageId()), message); persistence.put(getSendConfirmPersistenceKey(message), (MqttPubRel) message); } else if (message instanceof MqttPubComp) { persistence.remove(getReceivedPersistenceKey(message)); } synchronized (queueLock) { if ( !(message instanceof MqttAck )) { tokenStore.saveToken(token, message); } pendingFlows.addElement(message); queueLock.notifyAll(); } } } }
private void doAfterConnectSuccess(final Bundle resultBundle) { //since the device's cpu can go to sleep, acquire a wakelock and drop it later. acquireWakeLock(); service.callbackToActivity(clientHandle, Status.OK, resultBundle); deliverBacklog(); setConnectingState(false); disconnected = false; releaseWakeLock(); }
/** * 全部消息都經此方法發出 * pass data back to the Activity, by building a suitable Intent object and * broadcasting it * * @param clientHandle source of the data * @param status OK or Error * @param dataBundle the data to be passed */ void callbackToActivity(String clientHandle, Status status, Bundle dataBundle) { // Don't call traceDebug, as it will try to callbackToActivity leading // to recursion. Intent callbackIntent = new Intent(MqttServiceConstants.CALLBACK_TO_ACTIVITY); if (clientHandle != null) { callbackIntent.putExtra(MqttServiceConstants.CALLBACK_CLIENT_HANDLE, clientHandle); } callbackIntent.putExtra(MqttServiceConstants.CALLBACK_STATUS, status); if (dataBundle != null) { callbackIntent.putExtras(dataBundle); } LocalBroadcastManager.getInstance(this).sendBroadcast(callbackIntent); }
@Override public void onReceive(Context context, Intent intent) { Bundle data = intent.getExtras(); String handleFromIntent = data.getString(MqttServiceConstants.CALLBACK_CLIENT_HANDLE); if ((handleFromIntent == null) || (!handleFromIntent.equals(clientHandle))) { return; } String action = data.getString(MqttServiceConstants.CALLBACK_ACTION); if (MqttServiceConstants.CONNECT_ACTION.equals(action)) { connectAction(data); } else if (MqttServiceConstants.CONNECT_EXTENDED_ACTION.equals(action)) { connectExtendedAction(data); } else if (MqttServiceConstants.MESSAGE_ARRIVED_ACTION.equals(action)) { messageArrivedAction(data); } else if (MqttServiceConstants.SUBSCRIBE_ACTION.equals(action)) { subscribeAction(data); } else if (MqttServiceConstants.UNSUBSCRIBE_ACTION.equals(action)) { unSubscribeAction(data); } else if (MqttServiceConstants.SEND_ACTION.equals(action)) { sendAction(data); } else if (MqttServiceConstants.MESSAGE_DELIVERED_ACTION.equals(action)) { messageDeliveredAction(data); } else if (MqttServiceConstants.ON_CONNECTION_LOST_ACTION.equals(action)) { connectionLostAction(data); } else if (MqttServiceConstants.DISCONNECT_ACTION.equals(action)) { disconnected(data); } else if (MqttServiceConstants.TRACE_ACTION.equals(action)) { traceAction(data); } else { mqttService.traceError(MqttService.TAG, "Callback action doesn't exist."); } }