物聯網協議之MQTT源碼分析(一)

不知不覺已經跟MQTT打交道半年了,才驚醒的發現我也算從事的物聯網方法(Android端),一直以來只是單純的使用MQTT鏈接、發佈和接收消息等,也沒在意其Client(Android)端的源碼是怎樣實現的,要不是最近項目出現一個小問題困擾了好久可能也不會引起我看一看MQTT的源碼實現。好啦讓咱們開始瞭解MQTT的神奇之處吧。(注:如有什麼地方闡述有誤,敬請指正。)html

前言

閱讀本文前,默認讀者已經熟知MQTT的Android端使用,Client表明客戶端,Broker表明服務端,此篇源碼分析主要以MQTT客戶端和服務端創建鏈接過程爲主線講解。基礎瞭解Mqtt報文格式等,能夠參考下MQTT協議官網中文地址:git

mcxiaoke.gitbooks.io/mqtt-cn/con…bash

org.eclipse.paho工程源碼分析涉及到的類:服務器

  • MqttAndroidClient
  • MqttService
  • MqttConnection
  • MqttAsyncClient
  • ConnectActionListener
  • ClientComms
  • CommsSender
  • CommsReceiver
  • ClientState
  • CommsCallback

源碼分析準備

爲方便分析源碼先貼上一段工程裏鏈接MQTT的代碼:網絡

// 本身工程中關於MQTT鏈接類:
String uri = "";
if(isSSL){
    uri = "ssl://" + ip + ":" + port;
} else{
    uri = "tcp://" + ip + ":" + port;
}

MqttConnectOptions conOpt = new MqttConnectOptions();
try{
    conOpt.setSocketFactory(get2SSLSocketFactory(clientIns, ins, keypassword, keypassword));
} catch(MqttSecurityException e){
    e.printStackTrace();
}
conOpt.setUserName("mqttservice");
char[] password = "mqttservice".toCharArray();
conOpt.setPassword(password);
conOpt.setConnectionTimeout(5);
conOpt.setCleanSession(false);//設置是否清空session,這裏若是設置爲false表示服務器會保留客戶端的鏈接記錄,這裏設置爲true表示每次鏈接到服務器都以新的身份鏈接
conOpt.setKeepAliveInterval(60);//The default value is 60 seconds
String mClientId = NetUtil.getLocalMacAddress();// 獲取本地網絡mac地址
String[] clientIds = new String[1];
clientIds[0]=mClientId;
clientInfo =uri +mClientId;
mMqttCallback =new MqttConnectCallback(mContext, clientInfo);
myClient =new MqttAndroidClient(mContext, uri, mClientId);
myClient.setCallback(mMqttCallback);
// IMqttActionListener的實現類,動態賦值爲鏈接狀態CONNECT
final ActionListener callback = new ActionListener(ActionType.CONNECT);

String topic = "/client/" + UUID.randomUUID().toString();
int qos = 0;
boolean retained = true;
try{
    // 設置遺囑消息:當客戶端斷開鏈接時,發送給相關的訂閱者的遺囑消息
    // 具體瞭解遺囑消息請參考:https://www.jianshu.com/p/a5c6b768ed55
    conOpt.setWill(topic, "offline".getBytes(), qos, retained);
} catch(Exception e){
    callback.onFailure(null, e);
}
try{
    myClient.connect(conOpt, null, callback);
} catch(Exception e){
    callback.onFailure(null, e);
}
複製代碼

根據上述代碼能夠看出,MQTT的鏈接是由MqttAndroidClient的connect函數而起,MqttAndroidClient對象初始化時傳入了uri和mClientId,調用connect函數時傳入了符合本身需求的Mqtt鏈接選項,所以咱們先來看下MqttAndroidClient的connect函數:session

MqttAndroidClient

// MqttAndroidClient類:
@Override
public IMqttToken connect(MqttConnectOptions options, Object userContext,
                          IMqttActionListener callback) throws MqttException {

    IMqttToken token = new MqttTokenAndroid(this, userContext, callback);

    connectOptions = options;
    connectToken = token;

    /*
     * 實際的鏈接取決於咱們在這裏啓動和綁定的服務,
     * 可是在serviceConnection的onServiceConnected()方法運行(異步)以前
     * 咱們實際上不能使用它,因此鏈接自己發生在onServiceConnected()方法上
     */
    if (mqttService == null) { // First time - must bind to the service
        Intent serviceStartIntent = new Intent();
        serviceStartIntent.setClassName(myContext, SERVICE_NAME);
        Object service = myContext.startService(serviceStartIntent);
        if (service == null) {
            IMqttActionListener listener = token.getActionCallback();
            if (listener != null) {
                listener.onFailure(token, new RuntimeException(
                        "cannot start service " + SERVICE_NAME));
            }
        }

        // We bind with BIND_SERVICE_FLAG (0), leaving us the manage the lifecycle
        // until the last time it is stopped by a call to stopService()
        myContext.bindService(serviceStartIntent, serviceConnection,
                Context.BIND_AUTO_CREATE);

        if (!receiverRegistered) registerReceiver(this);
    } else {
        // 線程池執行
        pool.execute(new Runnable() {
            @Override
            public void run() {
                doConnect();
                //Register receiver to show shoulder tap.
                if (!receiverRegistered) registerReceiver(MqttAndroidClient.this);
            }

        });
    }
    return token;
}

    /**
     * 綁定MqttService服務的回調
     */
    private final class MyServiceConnection implements ServiceConnection {

        @Override
        public void onServiceConnected(ComponentName name, IBinder binder) {
            mqttService = ((MqttServiceBinder) binder).getService();
            bindedService = true;
            // 最後仍是執行的該方法
            doConnect();
        }

        @Override
        public void onServiceDisconnected(ComponentName name) {
            mqttService = null;
        }
    }

    // Listener for when the service is connected or disconnected
    private final MyServiceConnection serviceConnection = new MyServiceConnection();
複製代碼

這個函數會啓動paho mqtt惟一一個組件MqttService,這個組件不支持跨進程調用,若是須要將MqttService放在其餘進程,須要將和mqtt相關的調用所有放在同一個進程內。因爲須要使用MqttService組件中的函數,須要在啓動MqttService後對MqttService進行綁定。若是服務已經啓動,則直接執行創建鏈接操做。這時候創建的鏈接僅僅是網絡鏈接,不是mqtt協議鏈接。由上面代碼能夠看出,不管是MqttService是否啓動並綁定,最終都是調用doConnect()方法繼續執行鏈接操做。併發

// MqttAndroidClient類:
    private void doConnect() {
        if (clientHandle == null) {
            clientHandle = mqttService.getClient(serverURI, clientId, myContext.getApplicationInfo().packageName,persistence);
        }
        mqttService.setTraceEnabled(traceEnabled);
        mqttService.setTraceCallbackId(clientHandle);

        String activityToken = storeToken(connectToken);
        try {
            mqttService.connect(clientHandle, connectOptions, null,
                    activityToken);
        } catch (MqttException e) {
            IMqttActionListener listener = connectToken.getActionCallback();
            if (listener != null) {
                listener.onFailure(connectToken, e);
            }
        }
    }
複製代碼

直到此時出現了activityToken, connectToken, clientHandle,不要慌,咱們來一個一個分析。MqttAndroidClient的connect函數時,會生成connectToken,具體生成以下:dom

// MqttAndroidClient類:
    IMqttToken token = new MqttTokenAndroid(this, userContext, callback);

    connectOptions = options;
    connectToken = token;
複製代碼

token機制在paho mqtt實現中扮演着十分重要的角色,負責消息各類回調的實現,後面章節再單獨分析paho mqtt的token機制。再來看一下clientHandle的來源:eclipse

// MqttService類:
    public String getClient(String serverURI, String clientId
        , String contextId, MqttClientPersistence persistence) {
        String clientHandle = serverURI + ":" + clientId+":"+contextId;
        if (!connections.containsKey(clientHandle)) {
            MqttConnection client = new MqttConnection(this, serverURI,
                clientId, persistence, clientHandle);
            connections.put(clientHandle, client);
        }
        return clientHandle;
  }
複製代碼

clientHandle是serverURI + ":" + clientId+":"+contextId組合造成的字符串,contextId是應用包名。此段代碼中引入了一個新的類MqttConnection,而MqttConnection表明着Mqtt的鏈接實例,MqttService內部使用connections記錄每個鏈接實例。最後瞭解下activityToken,咱們看下storeToken(connectToken)函數:異步

// MqttAndroidClient類:
    private synchronized String storeToken(IMqttToken token) {
        tokenMap.put(tokenNumber, token);
        return Integer.toString(tokenNumber++);
    }
複製代碼

MqttAndroidClient內部使用tokenMap記錄每次調用生成的token, 將tokenNumber返回。activityToken會傳入MqttConnection中,並保存於MqttConnection類中connect函數的Bundle變量resultBundle裏,而resultBundle最終會被用於發送廣播觸發咱們connect、publish、subscribe等的回調監聽。這裏暫時先了解這些,咱們接着看執行完doConnect函數後,函數調用到了MqttService組件中的connect函數:

MqttService

// MqttService類:
    public void connect(String clientHandle, MqttConnectOptions connectOptions,
      String invocationContext, String activityToken)
      throws MqttSecurityException, MqttException {
	  	MqttConnection client = getConnection(clientHandle);
	  	client.connect(connectOptions, null, activityToken);
  }
  
  private MqttConnection getConnection(String clientHandle) {
    MqttConnection client = connections.get(clientHandle);
    if (client == null) {
      throw new IllegalArgumentException("Invalid ClientHandle");
    }
    return client;
  }
複製代碼

看到clientHandle是否是有點熟悉,上面咱們講過connections將生成的MqttConnection實例保存起來,這一步經過getConnection從新獲取。接下來,代碼來到了MqttConnection.connect函數中:

// MqttConnection類:
    public void connect(MqttConnectOptions options, String invocationContext,
                        String activityToken) {

        connectOptions = options;
        reconnectActivityToken = activityToken;
        
        // //根據本身設置的鏈接選項cleanSession,判斷是否清除歷史消息
        if (options != null) {
            cleanSession = options.isCleanSession();
        }

        if (connectOptions.isCleanSession()) {
            // discard old data
            service.messageStore.clearArrivedMessages(clientHandle);
        }

        service.traceDebug(TAG, "Connecting {" + serverURI + "} as {" + clientId + "}");
        final Bundle resultBundle = new Bundle();
        // 將activityToken保存至resultBundle,驗證上面所敘述的activityToken
        resultBundle.putString(MqttServiceConstants.CALLBACK_ACTIVITY_TOKEN,
                activityToken);
        resultBundle.putString(
                MqttServiceConstants.CALLBACK_INVOCATION_CONTEXT,
                invocationContext);
        resultBundle.putString(MqttServiceConstants.CALLBACK_ACTION,
                MqttServiceConstants.CONNECT_ACTION);

        try {
            if (persistence == null) {
                // ask Android where we can put files
                //2016.12 zhn change:for no permissions
                File myDir = service.getFilesDir();//File myDir = service.getExternalFilesDir(TAG);

                if (myDir == null) {
                    // No external storage, use internal storage instead.
                    myDir = service.getDir(TAG, Context.MODE_PRIVATE);

                    if (myDir == null) {
                        resultBundle.putString(
                                MqttServiceConstants.CALLBACK_ERROR_MESSAGE,
                                "Error! No external and internal storage available");
                        resultBundle.putSerializable(
                                MqttServiceConstants.CALLBACK_EXCEPTION,
								new MqttPersistenceException());
                        service.callbackToActivity(clientHandle, Status.ERROR,
                                resultBundle);
                        return;
                    }
                }

                // 用它來設置MQTT客戶端持久性存儲
                persistence = new MqttDefaultFilePersistence(
                        myDir.getAbsolutePath());
            }
            // 用於監聽鏈接成功的回調
            IMqttActionListener listener = new MqttConnectionListener(resultBundle) {

                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    doAfterConnectSuccess(resultBundle);
                    service.traceDebug(TAG, "connect success!");
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken,
                                      Throwable exception) {
                    resultBundle.putString(
                            MqttServiceConstants.CALLBACK_ERROR_MESSAGE,
                            exception.getLocalizedMessage());
                    resultBundle.putSerializable(
                            MqttServiceConstants.CALLBACK_EXCEPTION, exception);
                    service.traceError(TAG,
                            "connect fail, call connect to reconnect.reason:"
                                    + exception.getMessage());

                    doAfterConnectFail(resultBundle);
                }
            };

            if (myClient != null) {//若是已經建立過MqttAsyncClient,即以前就調用過本connect()方法
                if (isConnecting) {//上次調用的connect()還在鏈接中,不作處理,等待connect()結果
                    service.traceDebug(TAG,
                            "myClient != null and the client is connecting. Connect return " +"directly.");
                    service.traceDebug(TAG, "Connect return:isConnecting:" + isConnecting +
							".disconnected:" + disconnected);
                } else if (!disconnected) {//當前已處於長鏈接,提示鏈接成功
                    service.traceDebug(TAG, "myClient != null and the client is connected and " +"notify!");
                    doAfterConnectSuccess(resultBundle);
                } else {//以前的鏈接未成功或者已掉線,從新嘗試鏈接
                    service.traceDebug(TAG, "myClient != null and the client is not connected");
                    service.traceDebug(TAG, "Do Real connect!");
                    setConnectingState(true);
                    myClient.connect(connectOptions, invocationContext, listener);
                }
            }else { // if myClient is null, then create a new connection
                alarmPingSender = new AlarmPingSender(service);//用於發送心跳包
                // 建立MqttAsyncClient
                myClient = new MqttAsyncClient(serverURI, clientId,
                        persistence, alarmPingSender);
                myClient.setCallback(this);

                service.traceDebug(TAG, "Do Real connect!");
                // 設置鏈接狀態
                setConnectingState(true);
                // 鏈接
                myClient.connect(connectOptions, invocationContext, listener);
            }
        } catch (Exception e) {
            service.traceError(TAG, "Exception occurred attempting to connect: " + e.getMessage());
            setConnectingState(false);
            handleException(resultBundle, e);
        }
    }
複製代碼

從上面代碼以及註釋中可知,這段代碼主要做用就是新建了MqttAsyncClient對象,而後註冊了回調函數,而後去執行connect函數,同時將狀態置爲正在鏈接狀態。接下來就分析下MqttAsyncClient.connect函數:

MqttAsyncClient

// MqttAsyncClient類:
        public IMqttToken connect(MqttConnectOptions options
            , Object userContext, IMqttActionListener callback)
			throws MqttException, MqttSecurityException {
		final String methodName = "connect";
		// 狀態判斷
		if (comms.isConnected()) {
			throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CONNECTED);
		}
		if (comms.isConnecting()) {
			throw new MqttException(MqttException.REASON_CODE_CONNECT_IN_PROGRESS);
		}
		if (comms.isDisconnecting()) {
			throw new MqttException(MqttException.REASON_CODE_CLIENT_DISCONNECTING);
		}
		if (comms.isClosed()) {
			throw new MqttException(MqttException.REASON_CODE_CLIENT_CLOSED);
		}
		if (options == null) {
			options = new MqttConnectOptions();
		}
		this.connOpts = options;
		this.userContext = userContext;
		final boolean automaticReconnect = options.isAutomaticReconnect();

		// @TRACE 103=cleanSession={0} connectionTimeout={1} TimekeepAlive={2}
		// userName={3} password={4} will={5} userContext={6} callback={7}
		log.fine(CLASS_NAME, methodName, "103",
				new Object[] { Boolean.valueOf(options.isCleanSession()), new Integer(options.getConnectionTimeout()),
						new Integer(options.getKeepAliveInterval()), options.getUserName(),
						((null == options.getPassword()) ? "[null]" : "[notnull]"),
						((null == options.getWillMessage()) ? "[null]" : "[notnull]"), userContext, callback });
		// 設置網絡鏈接
		comms.setNetworkModules(createNetworkModules(serverURI, options));
		// 設置重連回調
		comms.setReconnectCallback(new MqttReconnectCallback(automaticReconnect));

		// Insert our own callback to iterate through the URIs till the connect
		// succeeds
		MqttToken userToken = new MqttToken(getClientId());
		// 初始化鏈接動做偵聽器connectActionListener
		ConnectActionListener connectActionListener = new ConnectActionListener(this, persistence, comms, options,
				userToken, userContext, callback, reconnecting);
		userToken.setActionCallback(connectActionListener);
		userToken.setUserContext(this);

		// If we are using the MqttCallbackExtended, set it on the
		// connectActionListener
		if (this.mqttCallback instanceof MqttCallbackExtended) {
			connectActionListener.setMqttCallbackExtended((MqttCallbackExtended) this.mqttCallback);
		}

		comms.setNetworkModuleIndex(0);
		// 鏈接動做偵聽器繼續執行connect
		connectActionListener.connect();

		return userToken;
	}
複製代碼

MqttAsyncClient.connect函數的主要做用是設置了網絡鏈接模塊,設置重連回調,最後執行connectActionListener.connect函數。這段代碼又引進來一個新的類ClientComms,咱們先來看下ClientComms的初始化:

// MqttAsyncClient類:
        public MqttAsyncClient(String serverURI, String clientId
            , MqttClientPersistence persistence,MqttPingSender pingSender, ScheduledExecutorService executorService) throws MqttException {
		final String methodName = "MqttAsyncClient";
		...
		// 建立大小爲10的線程池
		this.executorService = executorService;
		if (this.executorService == null) {
			this.executorService = Executors.newScheduledThreadPool(10);
		}
		...
		// 初始化ClientComms,並傳入大小爲10的線程池
		this.comms = new ClientComms(this
		    , this.persistence, pingSender,this.executorService);
		this.persistence.close();
		this.topics = new Hashtable();
	}
// 	ClientComms類中:
	public ClientComms(IMqttAsyncClient client, MqttClientPersistence persistence, MqttPingSender pingSender, ExecutorService executorService) throws MqttException {
		this.conState = DISCONNECTED;
		this.client 	= client;
		this.persistence = persistence;
		this.pingSender = pingSender;
		this.pingSender.init(this);
		this.executorService = executorService;

		this.tokenStore = new CommsTokenStore(getClient().getClientId());
		this.callback 	= new CommsCallback(this);
		this.clientState = new ClientState(persistence, tokenStore, this.callback, this, pingSender);

		callback.setClientState(clientState);
		log.setResourceName(getClient().getClientId());
	}
複製代碼

能夠看出ClientComms是在MqttAsyncClient初始化時完成初始化的,而且將心跳的發送器pingSender和大小爲10的線程池一塊兒傳入ClientComms。ClientComms類的初始化中又初始化了CommsTokenStore、CommsCallback和ClientState幾個類。咱們再來看下重連回調,重連代碼有點多,咱們只關注一下重連的回調函數便可:

// MqttReconnectCallback類(MqttAsyncClient類中的內部類):
        class MqttReconnectCallback implements MqttCallbackExtended {
		...
		// 鏈接失敗,重連時會調用該方法
		public void connectionLost(Throwable cause) {
			if (automaticReconnect) {
				// Automatic reconnect is set so make sure comms is in resting
				// state
				comms.setRestingState(true);
				reconnecting = true;
				startReconnectCycle();
			}
		}
        ...
	}
	
	private void startReconnectCycle() {
		String methodName = "startReconnectCycle";
		// @Trace 503=Start reconnect timer for client: {0}, delay: {1}
		log.fine(CLASS_NAME, methodName, "503", new Object[] { this.clientId, new Long(reconnectDelay) });
		reconnectTimer = new Timer("MQTT Reconnect: " + clientId);
		reconnectTimer.schedule(new ReconnectTask(), reconnectDelay);
	}
	
	private class ReconnectTask extends TimerTask {
		private static final String methodName = "ReconnectTask.run";

		public void run() {
			// @Trace 506=Triggering Automatic Reconnect attempt.
			log.fine(CLASS_NAME, methodName, "506");
			attemptReconnect();
		}
	}
	
	private void attemptReconnect() {
		final String methodName = "attemptReconnect";
		...
		try {
			connect(this.connOpts, this.userContext, new MqttReconnectActionListener(methodName));
		} catch (MqttSecurityException ex) {
			// @TRACE 804=exception
			log.fine(CLASS_NAME, methodName, "804", null, ex);
		} catch (MqttException ex) {
			// @TRACE 804=exception
			log.fine(CLASS_NAME, methodName, "804", null, ex);
		}
	}
	
	class MqttReconnectActionListener implements IMqttActionListener {
		...
		public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
			...
			if (reconnectDelay < 128000) {
			    //reconnectDelay初始值爲1000,每次重連失敗時*2
				reconnectDelay = reconnectDelay * 2;
			}
			rescheduleReconnectCycle(reconnectDelay);
		}
		...
	}
複製代碼

自動重連的實現主要在的attemptReconnect()方法裏,重連失敗會繼續重連直到鏈接成功,不太重連的間隔時間會隨着重連次數增長最大到128s。最後咱們再分析一下網絡鏈接的設置createNetworkModules函數:

// MqttAsyncClient類:
            protected NetworkModule[] createNetworkModules(String address, MqttConnectOptions options)
			throws MqttException, MqttSecurityException {
		final String methodName = "createNetworkModules";
		// @TRACE 116=URI={0}
		log.fine(CLASS_NAME, methodName, "116", new Object[] { address });

		NetworkModule[] networkModules = null;
		String[] serverURIs = options.getServerURIs();
		String[] array = null;
		if (serverURIs == null) {
			array = new String[] { address };
		} else if (serverURIs.length == 0) {
			array = new String[] { address };
		} else {
			array = serverURIs;
		}

		networkModules = new NetworkModule[array.length];
		for (int i = 0; i < array.length; i++) {
			networkModules[i] = createNetworkModule(array[i], options);
		}

		log.fine(CLASS_NAME, methodName, "108");
		return networkModules;
	}
複製代碼

options實例在創建鏈接的過程當中,咱們也僅僅是設置了和鏈接相關的一些狀態,並無設置serverURI,故options.getServerURIS返回爲null。NetworkModule爲paho定義的接口,規定了網絡模塊須要實現的方法。目前paho定義的網絡鏈接模塊有TCPNetworkModule,SSLNetworkModule,WebsocketNetworkModule,WebSocketSecureNetworkModule,能夠看下createNetworkModule根據uri使用的協議類型建立對應的NetworkModule。建立完全部的NetworkModule後,執行comms.setNetworknModule(0),先使用第一個NetworkModule進行鏈接。comms是ClientComms類型的實例,在paho的實現中佔有很是重要的地位,後序部分會進行分析。來看下createNetwokModule函數的實現:

// MqttAsyncClient類:
        private NetworkModule createNetworkModule(String address, MqttConnectOptions options) throws MqttException, MqttSecurityException {
		final String methodName = "createNetworkModule";

		NetworkModule netModule;
		SocketFactory factory = options.getSocketFactory();

		int serverURIType = MqttConnectOptions.validateURI(address);

		URI uri;
		try {
			uri = new URI(address);
			...
		} catch (URISyntaxException e) {
			throw new IllegalArgumentException("Malformed URI: " + address + ", " + e.getMessage());
		}

		String host = uri.getHost();
		int port = uri.getPort(); // -1 if not defined

		switch (serverURIType) {
		case MqttConnectOptions.URI_TYPE_TCP :
			...
			netModule = new TCPNetworkModule(factory, host, port, clientId);
			((TCPNetworkModule)netModule).setConnectTimeout(options.getConnectionTimeout());
			break;
		case MqttConnectOptions.URI_TYPE_SSL:
			...
			netModule = new SSLNetworkModule((SSLSocketFactory) factory, host, port, clientId);
			...
			break;
		case MqttConnectOptions.URI_TYPE_WS:
			...
			netModule = new WebSocketNetworkModule(factory, address, host, port, clientId);
			((WebSocketNetworkModule)netModule).setConnectTimeout(options.getConnectionTimeout());
			break;
		case MqttConnectOptions.URI_TYPE_WSS:
			...
			netModule = new WebSocketSecureNetworkModule((SSLSocketFactory) factory, address, host, port, clientId);
			...
			break;
		default:
			log.fine(CLASS_NAME,methodName, "119", new Object[] {address});
			netModule = null;
		}
		return netModule;
	}
複製代碼

能夠看出,createNetwokModule函數主要是根據serverURIType來判斷須要使用TCPNetworkModule,SSLNetworkModule,WebsocketNetworkModule,WebSocketSecureNetworkModule中的那種網絡模塊實現網絡鏈接。

如今能夠繼續往下繼續分析connectActionListener.connect()函數啦:

ConnectActionListener

// ConnectActionListener類:
public void connect() throws MqttPersistenceException {
    MqttToken token = new MqttToken(client.getClientId());
    token.setActionCallback(this);
    token.setUserContext(this);
    // 打開持久化存儲
    persistence.open(client.getClientId(), client.getServerURI());

    if (options.isCleanSession()) {
      persistence.clear();
    }
    // 設置版本
    if (options.getMqttVersion() == MqttConnectOptions.MQTT_VERSION_DEFAULT) {
      options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
    }
    
    try {
        // 開始鏈接
        comms.connect(options, token);
    }
    catch (MqttException e) {
      onFailure(token, e);
    }
  }
複製代碼

從這段代碼中能夠看出,鏈接已交給comms.connect(options, token)函數,而comms的初始化上面也提到過,ClientComms是在MqttAsyncClient初始化時完成初始化的

ClientComms

// ClientComms類:
    public void connect(MqttConnectOptions options, MqttToken token) throws MqttException {
        final String methodName = "connect";
        synchronized (conLock) {
            if (isDisconnected() && !closePending) {
                //@TRACE 214=state=CONNECTING
                log.fine(CLASS_NAME,methodName,"214");
                // 設置鏈接狀態
                conState = CONNECTING;
                conOptions = options;
                // 構建CONNECT數據包
                MqttConnect connect = new MqttConnect(client.getClientId(),
                        conOptions.getMqttVersion(),
                        conOptions.isCleanSession(),
                        conOptions.getKeepAliveInterval(),
                        conOptions.getUserName(),
                        conOptions.getPassword(),
                        conOptions.getWillMessage(),
                        conOptions.getWillDestination());    
                // 設置clientState屬性
                this.clientState.setKeepAliveSecs(conOptions.getKeepAliveInterval());
                this.clientState.setCleanSession(conOptions.isCleanSession());
                this.clientState.setMaxInflight(conOptions.getMaxInflight());
                tokenStore.open();
                ConnectBG conbg = new ConnectBG(this, token, connect, executorService);
                conbg.start();
            }else {
                ...
            }
        }
    }
複製代碼

從comms.connect函數的代碼中能夠看出,最後調用了conbg.start()函數,而ConnectBG是實現了Runnable的類,而且運行在線程池中:

// ClientComms類:
    private class ConnectBG implements Runnable {
        ...
        void start() {
            executorService.execute(this);
        }
        
        public void run() {
            Thread.currentThread().setName(threadName);
	    final String methodName = "connectBG:run";
	    MqttException mqttEx = null;
	    //@TRACE 220=>
	    log.fine(CLASS_NAME, methodName, "220");
	    
	    try {
	  	  // Reset an exception on existing delivery tokens.
	  	  // This will have been set if disconnect occured before delivery was
	  	  // fully processed.
	  	  MqttDeliveryToken[] toks = tokenStore.getOutstandingDelTokens();
	  	  for (int i=0; i<toks.length; i++) {
	  	 	 toks[i].internalTok.setException(null);
	  	  }
	    
	  	  // Save the connect token in tokenStore as failure can occur before send
	  	  tokenStore.saveToken(conToken,conPacket);
	    
	  	  // 啓動網絡模塊,發起網絡鏈接
	  	  NetworkModule networkModule = networkModules[networkModuleIndex];
	  	  networkModule.start();
	  	  // 鏈接完成後,啓動receiver,負責從broker接收消息
	  	  receiver = new CommsReceiver(clientComms, clientState, tokenStore, networkModule.getInputStream());
	  	  receiver.start("MQTT Rec: "+getClient().getClientId(), executorService);
	  	  // 鏈接完成後,啓動sender,負責向broker發送消息
	  	  sender = new CommsSender(clientComms, clientState, tokenStore, networkModule.getOutputStream());
	  	  sender.start("MQTT Snd: "+getClient().getClientId(), executorService);
	  	  // 鏈接完成後,啓動回調監聽
	  	  /**
	  	   * CommsCallback:接收器和外部API之間的橋接。此類由Receiver調用
	  	   *    ,而後將以comms爲中心的MQTT消息對象轉換爲外部API可理解的。
	  	  */
	  	  callback.start("MQTT Call: "+getClient().getClientId(), executorService);
	  	  // 向broker發送CONNECT數據包
	  	  internalSend(conPacket, conToken);
	    } catch (MqttException ex) {
	  	  //@TRACE 212=connect failed: unexpected exception
	  	  log.fine(CLASS_NAME, methodName, "212", null, ex);
	  	  mqttEx = ex;
	    } catch (Exception ex) {
	  	  //@TRACE 209=connect failed: unexpected exception
	  	  log.fine(CLASS_NAME, methodName, "209", null, ex);
	  	  mqttEx =  ExceptionHelper.createMqttException(ex);
	    }
	    
	    if (mqttEx != null) {
	  	  shutdownConnection(conToken, mqttEx);
	    }
        }
    }
複製代碼

從conbg.start()函數中能夠看出,在線程池啓動運行了ConnectBG,所以如今全部的操做來到了ConnectBG的run()函數中,run()裏啓動了網絡模塊、接收broker消息和發送消息的Runnable(CommsReceiver和CommsSender)、回調監聽。此處須要說明一下NetworkModule爲接口,實現它的子類調用start()方法,其實就是啓動Socket鏈接,而CommsReceiver、CommsSender和callback都是與ConnectBG同樣,皆是實現了Runnable的子類,運行於線程池中。最後調用internalSend方法發送CONNECT數據包:

// ClientComms類:
    void internalSend(MqttWireMessage message, MqttToken token) throws MqttException {
	......
	try {
		// Persist if needed and send the message
		this.clientState.send(message, token);
	} catch(MqttException e) {
		......
	}
    }
複製代碼

clientState負責在receiver和sender之間進行消息處理,能夠將sender看作是clientState的消費者, receiver負責接收來自broker的消息。接下來看看clientState.send(message, token)函數:

ClientState

// ClientState類:
public void send(MqttWireMessage message, MqttToken token) throws MqttException {
	final String methodName = "send";
	......
	if (token != null ) {
		try {
			token.internalTok.setMessageID(message.getMessageId());
		} catch (Exception e) {
		}
	}
		
	if (message instanceof MqttPublish) {
		......
	} else {
		//@TRACE 615=pending send key={0} message {1}
		log.fine(CLASS_NAME,methodName,"615", new Object[]{new Integer(message.getMessageId()), message});
		
		if (message instanceof MqttConnect) {
			synchronized (queueLock) {
				// Add the connect action at the head of the pending queue ensuring it jumps
				// ahead of any of other pending actions.
				tokenStore.saveToken(token, message);
				pendingFlows.insertElementAt(message,0);
				queueLock.notifyAll();
			}
		} else {
			......
		}
	}
}
複製代碼

咱們本文分析的源碼爲MQTT鏈接,所以消息確定是MqttConnect,send函數將消息體添加到pendingFlows中,等待sender的調度併發送。sender時Runnable實例,看下sender是如何調度發送的,如下是sender的run函數:

CommsSender

// CommsSender類中:
    public void run() {
        sendThread = Thread.currentThread();
        sendThread.setName(threadName);
        final String methodName = "run";
        MqttWireMessage message = null;

        try {
            runningSemaphore.acquire();
        } catch (InterruptedException e) {
            running = false;
            return;
        }

        try {
            // 輪詢不斷取消息,out爲OutputStream的Socket
            while (running && (out != null)) {
                try {
                    //從 clientState中取消息
                    message = clientState.get();
                    if (message != null) {
                        //@TRACE 802=network send key={0} msg={1}
                        log.fine(CLASS_NAME, methodName, "802", new Object[]{message.getKey(),message});
                        // mqttAck爲響應消息
                        if (message instanceof MqttAck) {
                            out.write(message);// 寫數據
                            out.flush();// 發送
                        } else {
                            MqttToken token = tokenStore.getToken(message);
                            // While quiescing the tokenstore can be cleared so need
                            // to check for null for the case where clear occurs
                            // while trying to send a message.
                            if (token != null) {
                                synchronized (token) {
                                    out.write(message);// 寫數據
                                    try {
                                        out.flush();// 發送
                                    } catch (IOException ex) {
                                        // The flush has been seen to fail on disconnect of a SSL
                                        // socket
                                        // as disconnect is in progress this should not be 
                                        // treated as an error
                                        if (!(message instanceof MqttDisconnect)) {
                                            throw ex;
                                        }
                                    }
                                    clientState.notifySent(message);
                                }
                            }
                        }
                    } else { // null message
                        //@TRACE 803=get message returned null, stopping}
                        log.fine(CLASS_NAME, methodName, "803");

                        running = false;
                    }
                } catch (MqttException me) {
                    handleRunException(message, me);
                } catch (Exception ex) {
                    handleRunException(message, ex);
                }
            } // end while
        } finally {
            running = false;
            runningSemaphore.release();
        }

        //@TRACE 805=<
        log.fine(CLASS_NAME, methodName, "805");
    }
複製代碼

能夠看出sender不斷循環從clientState獲取待發送的消息,clientState.get函數你們能夠自行分析。 MQTT鏈接的消息發送出去啦,前面說到receiver是負責接收broker發送回來的數據的,receiver也是Runnable類型,看下receiver的run函數實現:

CommsReceiver

// CommsReceiver類中:
    public void run() {
        recThread = Thread.currentThread();
        recThread.setName(threadName);
        final String methodName = "run";
        MqttToken token = null;

        try {
            runningSemaphore.acquire();
        } catch (InterruptedException e) {
            running = false;
            return;
        }
        輪詢不斷等待消息,in爲InputStream的Socket
        while (running && (in != null)) {
            try {
                //@TRACE 852=network read message
                log.fine(CLASS_NAME, methodName, "852");
                receiving = in.available() > 0;
                MqttWireMessage message = in.readMqttWireMessage();
                receiving = false;

                // mqttAck爲響應消息
                if (message instanceof MqttAck) {
                    token = tokenStore.getToken(message);
                    if (token != null) {
                        synchronized (token) {
                            // Ensure the notify processing is done under a lock on the token
                            // This ensures that the send processing can complete  before the
                            // receive processing starts! ( request and ack and ack processing
                            // can occur before request processing is complete if not!
                            clientState.notifyReceivedAck((MqttAck) message);
                        }
                    } else if (message instanceof MqttPubRec || message instanceof MqttPubComp || message instanceof MqttPubAck) {
                        ...
                    } else {
                        throw new MqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR);
                    }
                } else {
                    if (message != null) {
                        // A new message has arrived
                        clientState.notifyReceivedMsg(message);
                    }
                }
            } 
            ......
        }
    }
複製代碼

receiver收到消息後,響應消息的消息類型爲MqttAck,因爲CONACK數據包是MqttAck類型,且token不爲null,故會執行clientState.notifyReceivedAck函數.

// ClientState類:
    protected void notifyReceivedAck(MqttAck ack) throws MqttException {
        final String methodName = "notifyReceivedAck";
        this.lastInboundActivity = System.currentTimeMillis();

        // @TRACE 627=received key={0} message={1}
        log.fine(CLASS_NAME, methodName, "627", new Object[]{
                new Integer(ack.getMessageId()), ack});

        MqttToken token = tokenStore.getToken(ack);
        MqttException mex = null;

        if (token == null) {
            // @TRACE 662=no message found for ack id={0}
            log.fine(CLASS_NAME, methodName, "662", new Object[]{
                    new Integer(ack.getMessageId())});
        } else if (ack instanceof MqttPubRec) {
            // Complete the QoS 2 flow. Unlike all other
            // flows, QoS is a 2 phase flow. The second phase sends a
            // PUBREL - the operation is not complete until a PUBCOMP
            // is received
            MqttPubRel rel = new MqttPubRel((MqttPubRec) ack);
            this.send(rel, token);
        } else if (ack instanceof MqttPubAck || ack instanceof MqttPubComp) {
            // QoS 1 & 2 notify users of result before removing from
            // persistence
            notifyResult(ack, token, mex);
            // Do not remove publish / delivery token at this stage
            // do this when the persistence is removed later 
        } else if (ack instanceof MqttPingResp) {
            synchronized (pingOutstandingLock) {
                pingOutstanding = Math.max(0, pingOutstanding - 1);
                notifyResult(ack, token, mex);
                if (pingOutstanding == 0) {
                    tokenStore.removeToken(ack);
                }
            }
            //@TRACE 636=ping response received. pingOutstanding: {0}                            
            
            log.fine(CLASS_NAME, methodName, "636", new Object[]{new Integer(pingOutstanding)});
        } else if (ack instanceof MqttConnack) {
            int rc = ((MqttConnack) ack).getReturnCode();
            // 根據CONACK數據包中的返回碼判斷協議鏈接是否已經創建,0表示服務端接受鏈接,協議正常創建。
            if (rc == 0) {
                synchronized (queueLock) {
                    if (cleanSession) {
                        clearState();
                        // Add the connect token back in so that users can be  
                        // notified when connect completes.
                        tokenStore.saveToken(token, ack);
                    }
                    inFlightPubRels = 0;
                    actualInFlight = 0;
                    restoreInflightMessages();
                    connected();
                }
            } else {
                mex = ExceptionHelper.createMqttException(rc);
                throw mex;
            }

            clientComms.connectComplete((MqttConnack) ack, mex);
            notifyResult(ack, token, mex);
            tokenStore.removeToken(ack);

            // Notify the sender thread that there maybe work for it to do now
            synchronized (queueLock) {
                queueLock.notifyAll();
            }
        } else {
            notifyResult(ack, token, mex);
            releaseMessageId(ack.getMessageId());
            tokenStore.removeToken(ack);
        }

        checkQuiesceLock();
    }
    
    public void connected() {
        final String methodName = "connected";
        //@TRACE 631=connected
        log.fine(CLASS_NAME, methodName, "631");
        // 設置鏈接完成狀態
        this.connected = true;
        // 開始心跳
        pingSender.start(); //Start ping thread when client connected to server.
    }
複製代碼

notifyReceivedAck函數中,處理各類broker返回消息,而鏈接消息處理最後會到connected()鏈接完成的方法中,該方法設置鏈接完成狀態以及開始發送心跳。 至此,MQTT鏈接源碼分析已完成。

resultBundle

如今咱們回頭看一下前面說的resultBundle,前面說到resultBundle最終會被用於發送廣播觸發咱們connect、publish、subscribe等的回調監聽。咱們先取一處簡單說明一下,前面也說到MqttConnection.connect函數中IMqttActionListener listener用於監聽鏈接成功的回調。

簡單說明下listener調用過程:listener會被傳入MqttAsyncClient類裏,隨後又經過初始化ConnectActionListener類並保存於其成員變量userCallback中,最後是在ConnectActionListener裏的onSuccess和onFailure兩回調方法中調用了listener的onSuccess和onFailure兩個方法,而ConnectActionListener裏的onSuccess和onFailure兩函數通常是CommsCallback類所調用(也會被MqttTokenAndroid類的notifyComplete函數調用,notifyComplete函數被MqttAndroidClient類的simpleAction和disconnected兩方法調用,而simpleAction函數又會被鏈接、訂閱、解除訂閱、發送等調用,暫時只簡單說一下這種狀況)。上面代碼註釋中也說過CommsCallback是接收器和外部API之間的橋接。此類由Receiver調用,而後將以comms爲中心的MQTT消息對象轉換爲外部API可理解的。 CommsReceiver接收器裏輪詢會調用ClientState.notifyReceivedAck((MqttAck)message);函數,該函數裏有幾種消息會調用notifyResult(ack, token, mex);函數,notifyResult方法對調用(CommsCallback)callback.asyncOperationComplete(token);對CommsCallback裏成員變量completeQueue(Vector)進行addElement操做,而CommsCallback的run方法又是一直輪詢監聽completeQueue裏是否有元素,有則調用handleActionComplete方法--》fireActionEvent方法--》ConnectActionListener裏的onSuccess和onFailure。 大體流程參考如圖:

listener的onSuccess函數中裏調用了doAfterConnectSuccess(resultBundle);

// MqttConnection類中:
    private void doAfterConnectSuccess(final Bundle resultBundle) {
        acquireWakeLock();
        service.callbackToActivity(clientHandle, Status.OK, resultBundle);
        deliverBacklog();
        setConnectingState(false);
        disconnected = false;
        releaseWakeLock();
    }
複製代碼

鏈接成功後會調用MqttService.callbackToActivity(),resultBundle就做爲其中一個參數被傳入,接下來咱們看看這個方法的實現:

// MqttService類:
void callbackToActivity(String clientHandle, Status status, Bundle dataBundle) {
    Intent callbackIntent = new Intent(MqttServiceConstants.CALLBACK_TO_ACTIVITY);
    if (clientHandle != null) {
        callbackIntent.putExtra(MqttServiceConstants.CALLBACK_CLIENT_HANDLE, clientHandle);
    }
    callbackIntent.putExtra(MqttServiceConstants.CALLBACK_STATUS, status);
    if (dataBundle != null) {
        callbackIntent.putExtras(dataBundle);
    }
    LocalBroadcastManager.getInstance(this).sendBroadcast(callbackIntent);
}
複製代碼

callbackToActivity()方法用於發送本地廣播,廣播中攜帶resultBundle,其實包括publish、subscribe等行爲不論成功失敗都會調用此方法,發出一個指示行爲類型及狀態的本地廣播。那麼發送的廣播是在哪接收的呢?其實前面一直沒有說MqttAndroidClient類繼承自BroadCastReceiver,所以咱們查看其onReceive()方法:

// MqttAndroidClient類:
    @Override
    public void onReceive(Context context, Intent intent) {
        Bundle data = intent.getExtras();

        String handleFromIntent = data
                .getString(MqttServiceConstants.CALLBACK_CLIENT_HANDLE);

        if ((handleFromIntent == null)
                || (!handleFromIntent.equals(clientHandle))) {
            return;
        }

        String action = data.getString(MqttServiceConstants.CALLBACK_ACTION);

        if (MqttServiceConstants.CONNECT_ACTION.equals(action)) {
            connectAction(data);
        } else if (MqttServiceConstants.CONNECT_EXTENDED_ACTION.equals(action)) {
            connectExtendedAction(data);
        } else if (MqttServiceConstants.MESSAGE_ARRIVED_ACTION.equals(action)) {
            messageArrivedAction(data);
        } else if (MqttServiceConstants.SUBSCRIBE_ACTION.equals(action)) {
            subscribeAction(data);
        } else if (MqttServiceConstants.UNSUBSCRIBE_ACTION.equals(action)) {
            unSubscribeAction(data);
        } else if (MqttServiceConstants.SEND_ACTION.equals(action)) {
            sendAction(data);
        } else if (MqttServiceConstants.MESSAGE_DELIVERED_ACTION.equals(action)) {
            messageDeliveredAction(data);
        } else if (MqttServiceConstants.ON_CONNECTION_LOST_ACTION
                .equals(action)) {
            connectionLostAction(data);
        } else if (MqttServiceConstants.DISCONNECT_ACTION.equals(action)) {
            disconnected(data);
        } else if (MqttServiceConstants.TRACE_ACTION.equals(action)) {
            traceAction(data);
        } else {
            mqttService.traceError(MqttService.TAG, "Callback action doesn't exist.");
        }
    }
    
    private void messageArrivedAction(Bundle data) {
        if (callback != null) {
            String messageId = data
                    .getString(MqttServiceConstants.CALLBACK_MESSAGE_ID);
            String destinationName = data
                    .getString(MqttServiceConstants.CALLBACK_DESTINATION_NAME);

            ParcelableMqttMessage message = data
                    .getParcelable(MqttServiceConstants.CALLBACK_MESSAGE_PARCEL);
            try {
                if (messageAck == Ack.AUTO_ACK) {
                    callback.messageArrived(destinationName, message);
                    mqttService.acknowledgeMessageArrival(clientHandle, messageId);
                } else {
                    message.messageId = messageId;
                    callback.messageArrived(destinationName, message);
                }

                // let the service discard the saved message details
            } catch (Exception e) {
                // Swallow the exception
            }
        }
    }
複製代碼

data.getString(MqttServiceConstants.CALLBACK_ACTION)獲取的就是咱們前面存放在resultBundle中的action,而後根據action去調用對應的方法去回調callback的對應方法,例如:action爲MESSAGE_ARRIVED_ACTION時,調用messageArrivedAction函數,若是須要監聽action爲MqttServiceConstants.MESSAGE_ARRIVED_ACTION的行爲,則要求咱們傳入的callback必須爲MqttCallback的實現,而若是須要監聽action爲MqttServiceConstants.CONNECT_EXTENDED_ACTION的行爲,則要求咱們傳入的callback必須爲MqttCallbackExtended的實現,MqttCallbackExtended是MqttCallback的子類。這裏的callback就是咱們創建鏈接前初始化MqttAndroidClient時設置的MqttCallback對象:

// 本文最初創建MQTT鏈接部分代碼:
    // MqttConnectCallback爲MqttCallback的實現類
    mMqttCallback =new MqttConnectCallback(mContext, clientInfo);
    myClient =new MqttAndroidClient(mContext, uri, mClientId);
    myClient.setCallback(mMqttCallback);
複製代碼

至此,分析完鏈接MQTT的源碼,下一篇分析MQTT發佈消息publish。

參考連接:

blog.csdn.net/rockstore/a…

blog.csdn.net/Dovar_66/ar…

...

(注:如有什麼地方闡述有誤,敬請指正。)

相關文章
相關標籤/搜索