此篇文章繼上一篇物聯網協議之MQTT源碼分析(一)而寫的第二篇MQTT發佈消息以及接收Broker消息的源碼分析,想看MQTT鏈接的小夥伴能夠去看我上一篇哦。bash
MQTT發佈消息是由MqttAndroidClient類的publish函數執行的,咱們來看看這個函數:socket
// MqttAndroidClient類:
@Override
public IMqttDeliveryToken publish(String topic, byte[] payload, int qos,
boolean retained, Object userContext,
IMqttActionListener callback)
throws MqttException, MqttPersistenceException {
// 將消息內容、qos消息等級、retained消息是否保留封裝成MqttMessage
MqttMessage message = new MqttMessage(payload);
message.setQos(qos);
message.setRetained(retained);
// 每一條消息都有本身的token
MqttDeliveryTokenAndroid token = new MqttDeliveryTokenAndroid(
this, userContext, callback, message);
String activityToken = storeToken(token);
IMqttDeliveryToken internalToken = mqttService.publish(clientHandle,
topic, payload, qos, retained, null, activityToken);
token.setDelegate(internalToken);
return token;
}
複製代碼
從上面代碼能夠看出,發佈消息須要topic消息主題、payload消息內容、callback回調監聽等,經由mqttService.publish繼續執行發佈操做:async
// MqttService類:MQTT惟一組件
public IMqttDeliveryToken publish(String clientHandle, String topic,
byte[] payload, int qos, boolean retained,
String invocationContext, String activityToken)
throws MqttPersistenceException, MqttException {
MqttConnection client = getConnection(clientHandle);
return client.publish(topic, payload, qos, retained, invocationContext,
activityToken);
}
複製代碼
MqttConnection在上一篇中講解過,MQTT的鏈接會初始化一個MqttConnection,並保存在一個Map集合connections中,並經過getConnection(clientHandle)方法獲取。很明顯咱們要接着看client.publish函數啦:ide
// MqttConnection類:
public IMqttDeliveryToken publish(String topic, byte[] payload, int qos,
boolean retained, String invocationContext,
String activityToken) {
// 用於發佈消息,是否發佈成功的回調
final Bundle resultBundle = new Bundle();
resultBundle.putString(MqttServiceConstants.CALLBACK_ACTION,
MqttServiceConstants.SEND_ACTION);
resultBundle.putString(MqttServiceConstants.CALLBACK_ACTIVITY_TOKEN,
activityToken);
resultBundle.putString(
MqttServiceConstants.CALLBACK_INVOCATION_CONTEXT,
invocationContext);
IMqttDeliveryToken sendToken = null;
if ((myClient != null) && (myClient.isConnected())) {
// 攜帶resultBundle數據,用於監聽回調發布消息是否成功
IMqttActionListener listener = new MqttConnectionListener(
resultBundle);
try {
MqttMessage message = new MqttMessage(payload);
message.setQos(qos);
message.setRetained(retained);
sendToken = myClient.publish(topic, payload, qos, retained,
invocationContext, listener);
storeSendDetails(topic, message, sendToken, invocationContext,
activityToken);
} catch (Exception e) {
handleException(resultBundle, e);
}
} else {
resultBundle.putString(MqttServiceConstants.CALLBACK_ERROR_MESSAGE,
NOT_CONNECTED);
service.traceError(MqttServiceConstants.SEND_ACTION, NOT_CONNECTED);
service.callbackToActivity(clientHandle, Status.ERROR, resultBundle);
}
return sendToken;
}
複製代碼
這段代碼中很明顯能夠看出發佈的操做又交給了myClient.publish方法,那myClient是誰呢?上一篇文章中講過myClient是MqttAsyncClient,是在MQTT鏈接時在MqttConnection類的connect方法中初始化的,詳情請看上一篇。函數
// MqttAsyncClient類:
public IMqttDeliveryToken publish(String topic, byte[] payload, int qos
, boolean retained,Object userContext,
IMqttActionListener callback) throws MqttException,MqttPersistenceException {
MqttMessage message = new MqttMessage(payload);
message.setQos(qos);
message.setRetained(retained);
return this.publish(topic, message, userContext, callback);
}
public IMqttDeliveryToken publish(String topic, MqttMessage message
, Object userContext,
IMqttActionListener callback) throws MqttException,MqttPersistenceException {
final String methodName = "publish";
// @TRACE 111=< topic={0} message={1}userContext={1} callback={2}
log.fine(CLASS_NAME, methodName, "111", new Object[]{topic, userContext, callback});
// Checks if a topic is valid when publishing a message.
MqttTopic.validate(topic, false/* wildcards NOT allowed */);
MqttDeliveryToken token = new MqttDeliveryToken(getClientId());
token.setActionCallback(callback);
token.setUserContext(userContext);
token.setMessage(message);
token.internalTok.setTopics(new String[]{topic});
MqttPublish pubMsg = new MqttPublish(topic, message);
comms.sendNoWait(pubMsg, token);
// @TRACE 112=<
log.fine(CLASS_NAME, methodName, "112");
return token;
}
複製代碼
從這段代碼中能夠看到,如今把把topic和message封裝成了MqttPublish類型的消息,並繼續由comms.sendNoWait執行,comms是ClientComms,ClientComms是在初始化MqttAsyncClient的構造方法中初始化的,詳情看上一篇。源碼分析
// ClientComms類:
public void sendNoWait(MqttWireMessage message, MqttToken token) throws MqttException {
final String methodName = "sendNoWait";
// 判斷狀態或者消息類型
if (isConnected() ||
(!isConnected() && message instanceof MqttConnect) ||
(isDisconnecting() && message instanceof MqttDisconnect)) {
if (disconnectedMessageBuffer != null && disconnectedMessageBuffer.getMessageCount() != 0) {
//@TRACE 507=Client Connected, Offline Buffer available, but not empty. Adding
// message to buffer. message={0}
log.fine(CLASS_NAME, methodName, "507", new Object[]{message.getKey()});
if (disconnectedMessageBuffer.isPersistBuffer()) {
this.clientState.persistBufferedMessage(message);
}
disconnectedMessageBuffer.putMessage(message, token);
} else {
// 如今不是disconnect所以,邏輯走這裏
this.internalSend(message, token);
}
} else if (disconnectedMessageBuffer != null) {
//@TRACE 508=Offline Buffer available. Adding message to buffer. message={0}
log.fine(CLASS_NAME, methodName, "508", new Object[]{message.getKey()});
if (disconnectedMessageBuffer.isPersistBuffer()) {
this.clientState.persistBufferedMessage(message);
}
disconnectedMessageBuffer.putMessage(message, token);
} else {
//@TRACE 208=failed: not connected
log.fine(CLASS_NAME, methodName, "208");
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
}
}
void internalSend(MqttWireMessage message, MqttToken token) throws MqttException {
final String methodName = "internalSend";
...
try {
// Persist if needed and send the message
this.clientState.send(message, token);
} catch (MqttException e) {
// 注意此處代碼***
if (message instanceof MqttPublish) {
this.clientState.undo((MqttPublish) message);
}
throw e;
}
}
複製代碼
comms.sendNoWait方法中又調用了本類中的internalSend方法,而且在internalSend方法中又調用了clientState.send(message, token)方法繼續發佈。ClientState對象是在ClientComms初始化的構造方法中初始化的。此處須要注意一下catch裏的代碼,下面會具體說明。post
// ClientState類:
public void send(MqttWireMessage message, MqttToken token) throws MqttException {
final String methodName = "send";
...
if (message instanceof MqttPublish) {
synchronized (queueLock) {
/**
* 注意這裏:actualInFlight實際飛行中>maxInflight最大飛行中
* maxInflight:是咱們在本身代碼中經過鏈接選項MqttConnectOptions.setMaxInflight();設置的,默認大小爲10
*/
if (actualInFlight >= this.maxInflight) {
//@TRACE 613= sending {0} msgs at max inflight window
log.fine(CLASS_NAME, methodName, "613",
new Object[]{new Integer(actualInFlight)});
throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT);
}
MqttMessage innerMessage = ((MqttPublish) message).getMessage();
//@TRACE 628=pending publish key={0} qos={1} message={2}
log.fine(CLASS_NAME, methodName, "628",
new Object[]{new Integer(message.getMessageId()),
new Integer(innerMessage.getQos()), message});
/**
* 根據本身設置的qos等級,來決定是否須要恢復消息
* 這裏須要說明一下qos等級區別:
* qos==0,至多發送一次,不進行重試,Broker不會返回確認消息。
* qos==1,至少發送一次,確保消息到達Broker,Broker須要返回確認消息PUBACK
* qos==2,Broker確定會收到消息,且只收到一次,qos==1可能會發送重複消息
*/
switch (innerMessage.getQos()) {
case 2:
outboundQoS2.put(new Integer(message.getMessageId()), message);
persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
break;
case 1:
outboundQoS1.put(new Integer(message.getMessageId()), message);
persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
break;
}
tokenStore.saveToken(token, message);
pendingMessages.addElement(message);
queueLock.notifyAll();
}
} else {
...
}
}
複製代碼
這段代碼中咱們發現了一個可能須要咱們本身設置的屬性maxInflight,若是實際發送中的消息大於maxInflight約束的最大的話就會拋出MqttException異常,那麼這個異常catch裏是怎麼處理的呢,這就要往回看一步代碼啦,上面已經提示過須要注意ClientComms類中internalSend方法中的catch裏的代碼:ui
if (message instanceof MqttPublish) {
this.clientState.undo((MqttPublish) message);
}
複製代碼
能夠很明確的看出若消息類型是MqttPublish,則執行clientState.undo((MqttPublish) message)方法,咱們前面說過消息已經在MqttAsyncClient類的publish方法中把topic和message封裝成了MqttPublish類型的消息,所以此處會執行undo方法:this
// ClientState類:
protected void undo(MqttPublish message) throws MqttPersistenceException {
final String methodName = "undo";
synchronized (queueLock) {
//@TRACE 618=key={0} QoS={1}
log.fine(CLASS_NAME, methodName, "618",
new Object[]{new Integer(message.getMessageId()),
new Integer(message.getMessage().getQos())});
if (message.getMessage().getQos() == 1) {
outboundQoS1.remove(new Integer(message.getMessageId()));
} else {
outboundQoS2.remove(new Integer(message.getMessageId()));
}
pendingMessages.removeElement(message);
persistence.remove(getSendPersistenceKey(message));
tokenStore.removeToken(message);
if (message.getMessage().getQos() > 0) {
//Free this message Id so it can be used again
releaseMessageId(message.getMessageId());
message.setMessageId(0);
}
checkQuiesceLock();
}
}
複製代碼
代碼已經很明顯了,就是把大於maxInflight這部分消息remove移除掉,所以在實際操做中要注意本身的Mqtt消息的發佈會不會在短期內達到maxInflight默認的10的峯值,若能達到,則須要手動設置一個適合本身項目的範圍閥值啦。
咱們繼續說clientState.send(message, token)方法裏的邏輯,代碼中註釋中也說明了Mqtt會根據qos等級來決定消息到達機制
qos等級
根據qos等級,若qos等於1和2,則講消息分別加入Hashtable類型的outboundQoS1和outboundQoS2中,已在後續邏輯中確保消息發送成功併到達。
注:qos等級優先級沒有maxInflight高,從代碼中能夠看出,會先判斷maxInflight再區分qos等級
代碼的最後講消息添加進Vector類型的pendingMessages裏,在上一篇中咱們能夠了解到MQTT的發射器是輪詢檢查pendingMessages裏是否存在數據,若存在則經過socket的OutputStream發送出去。而且會經過接收器接收從Broker發送回來的數據。
發送咱們就不看源碼啦,接收咱們再看一下源碼,經過源碼看一看數據是怎麼回到咱們本身的回調裏的:
// CommsReceiver類中:
public void run() {
recThread = Thread.currentThread();
recThread.setName(threadName);
final String methodName = "run";
MqttToken token = null;
try {
runningSemaphore.acquire();
} catch (InterruptedException e) {
running = false;
return;
}
while (running && (in != null)) {
try {
//@TRACE 852=network read message
log.fine(CLASS_NAME, methodName, "852");
receiving = in.available() > 0;
MqttWireMessage message = in.readMqttWireMessage();
receiving = false;
// 消息是否屬於Mqtt確認類型
if (message instanceof MqttAck) {
token = tokenStore.getToken(message);
// token通常不會爲空,前面已經保存過
if (token != null) {
synchronized (token) {
// ...
clientState.notifyReceivedAck((MqttAck) message);
}
}
...
} finally {
receiving = false;
runningSemaphore.release();
}
}
}
複製代碼
從代碼中能夠看出,Broker返回來的數據交給了clientState.notifyReceivedAck方法:
// ClientState類:
protected void notifyReceivedAck(MqttAck ack) throws MqttException {
final String methodName = "notifyReceivedAck";
...
MqttToken token = tokenStore.getToken(ack);
MqttException mex = null;
if (token == null) {
...
} else if (ack instanceof MqttPubRec) {
// qos==2 是返回
MqttPubRel rel = new MqttPubRel((MqttPubRec) ack);
this.send(rel, token);
} else if (ack instanceof MqttPubAck || ack instanceof MqttPubComp) {
// qos==1/2 消息移除前通知的結果
notifyResult(ack, token, mex);
// Do not remove publish / delivery token at this stage
// do this when the persistence is removed later
} else if (ack instanceof MqttPingResp) {
// 鏈接心跳數據消息
...
} else if (ack instanceof MqttConnack) {
// MQTT鏈接消息
...
} else {
notifyResult(ack, token, mex);
releaseMessageId(ack.getMessageId());
tokenStore.removeToken(ack);
}
checkQuiesceLock();
}
複製代碼
從上面註釋可知,發佈的消息qos==0,返回結果是直接走else,而qos==1/2,確認消息也最終會走到notifyResult(ack, token, mex)方法中:
protected void notifyResult(MqttWireMessage ack, MqttToken token, MqttException ex) {
final String methodName = "notifyResult";
// 取消阻止等待令牌的任何線程,並保存ack
token.internalTok.markComplete(ack, ex);
// 通知此令牌已收到響應消息,設置已完成狀態,並經過isComplete()獲取狀態
token.internalTok.notifyComplete();
// 讓用戶知道異步操做已完成,而後刪除令牌
if (ack != null && ack instanceof MqttAck && !(ack instanceof MqttPubRec)) {
//@TRACE 648=key{0}, msg={1}, excep={2}
log.fine(CLASS_NAME, methodName, "648", new Object[]{token.internalTok.getKey(), ack,ex});
// CommsCallback類
callback.asyncOperationComplete(token);
}
// 有些狀況下,因爲操做失敗,所以沒有確認
if (ack == null) {
//@TRACE 649=key={0},excep={1}
log.fine(CLASS_NAME, methodName, "649", new Object[]{token.internalTok.getKey(), ex});
callback.asyncOperationComplete(token);
}
}
// Token類:
protected void markComplete(MqttWireMessage msg, MqttException ex) {
final String methodName = "markComplete";
//@TRACE 404=>key={0} response={1} excep={2}
log.fine(CLASS_NAME, methodName, "404", new Object[]{getKey(), msg, ex});
synchronized (responseLock) {
// ACK means that everything was OK, so mark the message for garbage collection.
if (msg instanceof MqttAck) {
this.message = null;
}
this.pendingComplete = true;
// 將消息保存在response成員變量中,並經過getWireMessage()方法獲取消息msg
this.response = msg;
this.exception = ex;
}
}
// Token類:
protected void notifyComplete() {
...
synchronized (responseLock) {
...
if (exception == null && pendingComplete) {
// 設置已完成,並經過isComplete()獲取狀態
completed = true;
pendingComplete = false;
} else {
pendingComplete = false;
}
responseLock.notifyAll();
}
...
}
複製代碼
此時已將MqttWireMessage消息保存到token中,異步操做已完成,調用回調監聽CommsCallback裏的asyncOperationComplete方法:
// CommsCallback類:
public void asyncOperationComplete(MqttToken token) {
final String methodName = "asyncOperationComplete";
if (running) {
// invoke callbacks on callback thread
completeQueue.addElement(token);
synchronized (workAvailable) {
// @TRACE 715=new workAvailable. key={0}
log.fine(CLASS_NAME, methodName, "715", new Object[]{token.internalTok.getKey()});
workAvailable.notifyAll();
}
} else {
// invoke async callback on invokers thread
try {
handleActionComplete(token);
} catch (Throwable ex) {
// Users code could throw an Error or Exception e.g. in the case
// of class NoClassDefFoundError
// @TRACE 719=callback threw ex:
log.fine(CLASS_NAME, methodName, "719", null, ex);
// Shutdown likely already in progress but no harm to confirm
clientComms.shutdownConnection(null, new MqttException(ex));
}
}
}
複製代碼
CommsCallback是Mqtt鏈接就已經開始一直運行,所以running爲true,因此如今已經將token添加進了completeQueue完成隊列中,CommsCallback跟發射器同樣,一直輪詢等待數據,所以此時completeQueue已有數據,此時CommsCallback的run函數則會有接下來的操做:
// CommsCallback類:
public void run() {
...
while (running) {
try {
...
if (running) {
// Check for deliveryComplete callbacks...
MqttToken token = null;
synchronized (completeQueue) {
// completeQueue不爲空
if (!completeQueue.isEmpty()) {
// 獲取第一個token
token = (MqttToken) completeQueue.elementAt(0);
completeQueue.removeElementAt(0);
}
}
if (null != token) {
// token不爲null,執行handleActionComplete
handleActionComplete(token);
}
...
}
if (quiescing) {
clientState.checkQuiesceLock();
}
} catch (Throwable ex) {
...
} finally {
...
}
}
}
private void handleActionComplete(MqttToken token)
throws MqttException {
final String methodName = "handleActionComplete";
synchronized (token) {
// 由上面已經,isComplete()已設置爲true
if (token.isComplete()) {
// Finish by doing any post processing such as delete
// from persistent store but only do so if the action
// is complete
clientState.notifyComplete(token);
}
// 取消阻止任何服務員,若是待完成,如今設置完成
token.internalTok.notifyComplete();
if (!token.internalTok.isNotified()) {
...
// 如今調用異步操做完成回調
fireActionEvent(token);
}
...
}
}
複製代碼
run中調用了handleActionComplete函數,接着後調用了clientState.notifyComplete()方法和fireActionEvent(token)方法,先看notifyComplete():
// ClientState類:
protected void notifyComplete(MqttToken token) throws MqttException {
final String methodName = "notifyComplete";
// 獲取保存到Token中的Broker返回的消息,上面有說明
MqttWireMessage message = token.internalTok.getWireMessage();
if (message != null && message instanceof MqttAck) {
...
MqttAck ack = (MqttAck) message;
if (ack instanceof MqttPubAck) {
// qos==1,用戶通知如今從持久性中刪除
persistence.remove(getSendPersistenceKey(message));
persistence.remove(getSendBufferedPersistenceKey(message));
outboundQoS1.remove(new Integer(ack.getMessageId()));
decrementInFlight();
releaseMessageId(message.getMessageId());
tokenStore.removeToken(message);
// @TRACE 650=removed Qos 1 publish. key={0}
log.fine(CLASS_NAME, methodName, "650",
new Object[]{new Integer(ack.getMessageId())});
} else if (ack instanceof MqttPubComp) {
...
}
checkQuiesceLock();
}
}
複製代碼
再來看fireActionEvent(token)方法:
// CommsCallback類:
public void fireActionEvent(MqttToken token) {
final String methodName = "fireActionEvent";
if (token != null) {
IMqttActionListener asyncCB = token.getActionCallback();
if (asyncCB != null) {
if (token.getException() == null) {
...
asyncCB.onSuccess(token);
} else {
...
asyncCB.onFailure(token, token.getException());
}
}
}
}
複製代碼
從這段代碼中終於能看到回調onSuccess和onFailure的方法啦,那asyncCB是誰呢?
// MqttToken類:
public IMqttActionListener getActionCallback() {
return internalTok.getActionCallback();
}
// Token類:
public IMqttActionListener getActionCallback() {
return callback;
}
複製代碼
看到這,一臉懵逼,這究竟是誰呢,其實咱們能夠直接看這個回調設置方法,看看是從哪設置進來的就能夠啦:
// Token類:
public void setActionCallback(IMqttActionListener listener) {
this.callback = listener;
}
// MqttToken類:
public void setActionCallback(IMqttActionListener listener) {
internalTok.setActionCallback(listener);
}
// ConnectActionListener類:
public void connect() throws MqttPersistenceException {
// 初始化MqttToken
MqttToken token = new MqttToken(client.getClientId());
// 將此類設置成回調類
token.setActionCallback(this);
token.setUserContext(this);
...
}
複製代碼
其實早在MQTT鏈接時,就已經將此callback設置好,所以asyncCB就是ConnectActionListener,因此此時就已經走到了ConnectActionListener類裏的onSuccess和onFailure的方法中,咱們只挑一個onSuccess看一看:
// ConnectActionListener類:
public void onSuccess(IMqttToken token) {
if (originalMqttVersion == MqttConnectOptions.MQTT_VERSION_DEFAULT) {
options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_DEFAULT);
}
// 此時將Broker的數據保存進了userToken裏
userToken.internalTok.markComplete(token.getResponse(), null);
userToken.internalTok.notifyComplete();
userToken.internalTok.setClient(this.client);
comms.notifyConnect();
if (userCallback != null) {
userToken.setUserContext(userContext);
userCallback.onSuccess(userToken);
}
if (mqttCallbackExtended != null) {
String serverURI =
comms.getNetworkModules()[comms.getNetworkModuleIndex()].getServerURI();
mqttCallbackExtended.connectComplete(reconnect, serverURI);
}
}
複製代碼
這裏的userCallback又是誰呢?上一篇其實說過的,userCallback其實就是MqttConnection.connect函數中IMqttActionListener listener,因此此時又來到了MqttConnection類裏connect方法裏的listener監聽回調內:
// MqttConnection類:
public void connect(MqttConnectOptions options, String invocationContext,
String activityToken) {
...
service.traceDebug(TAG, "Connecting {" + serverURI + "} as {" + clientId + "}");
final Bundle resultBundle = new Bundle();
resultBundle.putString(MqttServiceConstants.CALLBACK_ACTIVITY_TOKEN,
activityToken);
resultBundle.putString(
MqttServiceConstants.CALLBACK_INVOCATION_CONTEXT,
invocationContext);
resultBundle.putString(MqttServiceConstants.CALLBACK_ACTION,
MqttServiceConstants.CONNECT_ACTION);
try {
...
// 此時邏輯已經來到這裏
IMqttActionListener listener = new MqttConnectionListener(
resultBundle) {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
// 執行以下代碼:
doAfterConnectSuccess(resultBundle);
service.traceDebug(TAG, "connect success!");
}
@Override
public void onFailure(IMqttToken asyncActionToken,
Throwable exception) {
resultBundle.putString(
MqttServiceConstants.CALLBACK_ERROR_MESSAGE,
exception.getLocalizedMessage());
resultBundle.putSerializable(
MqttServiceConstants.CALLBACK_EXCEPTION, exception);
service.traceError(TAG,
"connect fail, call connect to reconnect.reason:"
+ exception.getMessage());
doAfterConnectFail(resultBundle);
}
};
if (myClient != null) {
if (isConnecting) {
...
} else {
service.traceDebug(TAG, "myClient != null and the client is not connected");
service.traceDebug(TAG, "Do Real connect!");
setConnectingState(true);
myClient.connect(connectOptions, invocationContext, listener);
}
}
// if myClient is null, then create a new connection
else {
...
myClient.connect(connectOptions, invocationContext, listener);
}
} catch (Exception e) {
...
}
}
複製代碼
由這段代碼以及註釋能夠知道,如今以及執行到了MqttConnection類裏的doAfterConnectSuccess方法裏:
// MqttConnection類:
private void doAfterConnectSuccess(final Bundle resultBundle) {
// 獲取喚醒鎖
acquireWakeLock();
service.callbackToActivity(clientHandle, Status.OK, resultBundle);
deliverBacklog();
setConnectingState(false);
disconnected = false;
// 釋放喚醒鎖
releaseWakeLock();
}
private void deliverBacklog() {
Iterator<StoredMessage> backlog = service.messageStore
.getAllArrivedMessages(clientHandle);
while (backlog.hasNext()) {
StoredMessage msgArrived = backlog.next();
Bundle resultBundle = messageToBundle(msgArrived.getMessageId(),
msgArrived.getTopic(), msgArrived.getMessage());
// 關注下這個action,下面會用到
resultBundle.putString(MqttServiceConstants.CALLBACK_ACTION,
MqttServiceConstants.MESSAGE_ARRIVED_ACTION);
service.callbackToActivity(clientHandle, Status.OK, resultBundle);
}
}
複製代碼
能夠看到這個函數中調用了幾個方法中的其中兩個service.callbackToActivity(clientHandle, Status.OK, resultBundle);和deliverBacklog();,deliverBacklog()方法最後也是調用的service.callbackToActivity方法。因此直接看service.callbackToActivity:
// MqttService類:
void callbackToActivity(String clientHandle, Status status,
Bundle dataBundle) {
// 發送廣播
Intent callbackIntent = new Intent(
MqttServiceConstants.CALLBACK_TO_ACTIVITY);
if (clientHandle != null) {
callbackIntent.putExtra(
MqttServiceConstants.CALLBACK_CLIENT_HANDLE, clientHandle);
}
callbackIntent.putExtra(MqttServiceConstants.CALLBACK_STATUS, status);
if (dataBundle != null) {
callbackIntent.putExtras(dataBundle);
}
LocalBroadcastManager.getInstance(this).sendBroadcast(callbackIntent);
}
複製代碼
service.callbackToActivity方法其實就是發送廣播,那誰來接收廣播呢?其實接收廣播的就在最開始的MqttAndroidClient,MqttAndroidClient繼承自BroadcastReceiver,因此說MqttAndroidClient自己就是一個廣播接收者,因此咱們來看它的onReceive方法:
// MqttAndroidClient類:
@Override
public void onReceive(Context context, Intent intent) {
Bundle data = intent.getExtras();
String handleFromIntent = data
.getString(MqttServiceConstants.CALLBACK_CLIENT_HANDLE);
if ((handleFromIntent == null)
|| (!handleFromIntent.equals(clientHandle))) {
return;
}
String action = data.getString(MqttServiceConstants.CALLBACK_ACTION);
// 判斷消息的action類型
if (MqttServiceConstants.CONNECT_ACTION.equals(action)) {
connectAction(data);
} else if (MqttServiceConstants.CONNECT_EXTENDED_ACTION.equals(action)) {
connectExtendedAction(data);
} else if (MqttServiceConstants.MESSAGE_ARRIVED_ACTION.equals(action)) {
messageArrivedAction(data);
} else if (MqttServiceConstants.SUBSCRIBE_ACTION.equals(action)) {
subscribeAction(data);
} else if (MqttServiceConstants.UNSUBSCRIBE_ACTION.equals(action)) {
unSubscribeAction(data);
} else if (MqttServiceConstants.SEND_ACTION.equals(action)) {
// 發佈成功與否的回調
sendAction(data);
} else if (MqttServiceConstants.MESSAGE_DELIVERED_ACTION.equals(action)) {
messageDeliveredAction(data);
} else if (MqttServiceConstants.ON_CONNECTION_LOST_ACTION
.equals(action)) {
connectionLostAction(data);
} else if (MqttServiceConstants.DISCONNECT_ACTION.equals(action)) {
disconnected(data);
} else if (MqttServiceConstants.TRACE_ACTION.equals(action)) {
traceAction(data);
} else {
mqttService.traceError(MqttService.TAG, "Callback action doesn't exist.");
}
}
複製代碼
從代碼和註釋以及上面的deliverBacklog方法中能夠知道,咱們如今須要關注的action爲MESSAGE_ARRIVED_ACTION,因此就能夠調用方法messageArrivedAction(data):
// MqttAndroidClient類:
private void messageArrivedAction(Bundle data) {
if (callback != null) {
String messageId = data
.getString(MqttServiceConstants.CALLBACK_MESSAGE_ID);
String destinationName = data
.getString(MqttServiceConstants.CALLBACK_DESTINATION_NAME);
ParcelableMqttMessage message = data
.getParcelable(MqttServiceConstants.CALLBACK_MESSAGE_PARCEL);
try {
if (messageAck == Ack.AUTO_ACK) {
callback.messageArrived(destinationName, message);
mqttService.acknowledgeMessageArrival(clientHandle, messageId);
} else {
message.messageId = messageId;
callback.messageArrived(destinationName, message);
}
// let the service discard the saved message details
} catch (Exception e) {
// Swallow the exception
}
}
}
@Override
public void setCallback(MqttCallback callback) {
this.callback = callback;
}
複製代碼
在messageArrivedAction方法中能夠看到,咱們最後調用了callback回調了messageArrived方法,那麼 callback經過上面下部分代碼能夠知道,其實這個callback就是咱們上一篇文章中所說的咱們初始化MqttAndroidClient後,經過方法setCallback設置的咱們本身定義的實現MqttCallback接口的回調類。
再看下sendAction(data)方法:
private void sendAction(Bundle data) {
IMqttToken token = getMqttToken(data);
// remove on delivery
simpleAction(token, data);
}
private void simpleAction(IMqttToken token, Bundle data) {
if (token != null) {
Status status = (Status) data
.getSerializable(MqttServiceConstants.CALLBACK_STATUS);
if (status == Status.OK) {
// 若是發佈成功回調此方法
((MqttTokenAndroid) token).notifyComplete();
} else {
Exception exceptionThrown =
(Exception) data.getSerializable(MqttServiceConstants.CALLBACK_EXCEPTION);
// 發佈失敗回調
((MqttTokenAndroid) token).notifyFailure(exceptionThrown);
}
} else {
if (mqttService != null) {
mqttService.traceError(MqttService.TAG, "simpleAction : token is null");
}
}
}
複製代碼
接下來再看一看發佈成功回調的MqttTokenAndroid的notifyComplete函數:
// MqttTokenAndroid類:
void notifyComplete() {
synchronized (waitObject) {
isComplete = true;
waitObject.notifyAll();
if (listener != null) {
listener.onSuccess(this);
}
}
}
複製代碼
這裏又調用了listener.onSuccess(this)方法,那麼這個listener是誰?其實listener就是咱們調用MqttAndroidClient類的publish發佈的最後一個參數,即咱們自定義的監聽發佈消息是否發佈成功的回調類。上面在MqttConnection類的publish方法中封裝過MqttServiceConstants.SEND_ACTION的Bundle數據,而此數據是被MqttConnection類裏的MqttConnectionListener攜帶。因此MqttConnectionListener裏的onSuccess被調用時就會調用service.callbackToActivity,繼而到sendBroadcast發送廣播,最後調用sendAction方法,回調自定義的IMqttActionListener的實現類。而MqttConnectionListener裏的onSuccess是在CommsCallback類裏的fireActionEvent方法中,往上走就到CommsCallback類的了handleActionComplete和run()函數。
如今看是否是有點懵畢竟上面有兩個 監聽Broker返回的消息,一個是用來監聽Broker發給客戶端數據的監聽,另外一個是客戶端發佈消息是否發佈成功的監聽而已。二者都是使用MqttActionListener,不過前者在MqttActionListener監聽回調裏最後調用的是自定義的MqttCallback回調而已。而且二者監聽的位置不同,前者是在 MqttConnection類的connect時就已確認下來的,對於一個MQTT鏈接只會有一個,因此這個是一直用來監聽數據的;然後者監聽發佈消息是否成功是每一個publish都須要傳入的,並在MqttConnection類裏的publish初始化。這麼講是否是就清晰一些啦。
哈哈,到此MQTT的publish發佈以及接收Broker數據的源碼分析也看完啦。
(注:如有什麼地方闡述有誤,敬請指正。)