閱讀優秀的代碼是一種享受,將優秀的代碼用本身的世界觀優秀地描述出來就十分痛苦了是要死一億個腦細胞的。html
這篇源碼閱讀筆記早在一年前就有了當時只是簡單的記錄一下本身的總結,最近將她從新整理一下但願能幫助有須要的人。node
隨着移動互聯網快速進入後半場,愈來愈多的企業將注意力轉移到物聯網。好比共享單車和小米的智能家居產品等都是典型的物聯網應用。算法
企業相信藉助於大數據和AI技術能夠得到不少額外的價值產生新的商業模式。海量數據須要經過接入服務才能流向後端產生後續價值,在接入服務中MQTT已成爲物聯網中非明確的標準協議國內外雲廠均有其broker實現。後端
MQTT協議是爲大量計算能力有限,且工做在低帶寬、不可靠的網絡的遠程傳感器和控制設備通信而設計的協議,它具備如下主要的幾項特性:bash
==下文中會對上述特性的實現方式進行講解==服務器
使用MQTT的程序或者設備,如環境監控傳感器、共享單車、共享充電寶等。網絡
一個程序或設備,做爲發送消息的客戶端和請求訂閱的客戶端之間的中介。session
客戶端-A 給 客戶端-B 發送消息「hello」流程以下:併發
有別於HTTP協議的請求響應模式,客戶端-A與客戶端-B不發生直接鏈接關係,他們之間的消息傳遞經過服務端Server進行轉發。 服務端Server又稱 MQTT Broker 即訂閱和發送的中間人app
在上述的客戶端-A 給 客戶端-B 發送消息「hello」流程中須要有以下動做。
下面將基於鏈接、訂閱、發佈這幾個動做進行源碼跟蹤解讀。
Session:會話即客戶端(由ClientId做爲標示)和服務端之間邏輯層面的通訊;生命週期(存在時間):會話 >= 網絡鏈接。
ClientID:客戶端惟一標識,服務端用於關聯一個Session 只能包含這些 大寫字母,小寫字母 和 數字(0-9a-zA-Z),23個字符之內 若是 ClientID 在屢次 TCP鏈接中保持一致,客戶端和服務器端會保留會話信息(Session) 同一時間內 Server 和同一個 ClientID 只能保持一個 TCP 鏈接,再次鏈接會踢掉前一個。
CleanSession:在Connect時,由客戶端設置
Keep Alive:目的是保持長鏈接的可靠性,以及雙方對彼此是否在線的確認。 客戶端在Connect的時候設置 Keep Alive 時長。若是服務端在 1.5 * KeepAlive 時間內沒有收到客戶端的報文,它必須斷開客戶端的網絡鏈接 Keep Alive 的值由具體應用指定,通常是幾分鐘。容許的最大值是 18 小時 12 分 15 秒。
Will:遺囑消息(Will Message)存儲在服務端,當網絡鏈接關閉時,服務端必須發佈這個遺囑消息,因此被形象地稱之爲遺囑,可用於通知異常斷線。 客戶端發送 DISCONNECT 關閉連接,遺囑失效並刪除 遺囑消息發佈的條件,包括: 服務端檢測到了一個 I/O 錯誤或者網絡故障 客戶端在保持鏈接(Keep Alive)的時間內未能通信 客戶端沒有先發送 DISCONNECT 報文直接關閉了網絡鏈接 因爲協議錯誤服務端關閉了網絡鏈接 相關設置項,須要在Connect時,由客戶端指定。
Will Flag :遺囑的總開關
Will QoS: 遺囑消息 QoS可取值 0、一、2,含義與消息QoS相同
Will Retain:遺囑是否保留
Will Topic:遺囑話題
Will Payload:遺囑消息內容
public void processConnect(Channel channel, MqttConnectMessage msg) {
MqttConnectPayload payload = msg.payload();
String clientId = payload.clientIdentifier();
LOG.debug("Processing CONNECT message. CId={}, username={}", clientId, payload.userName());
// 1. 判斷客戶端鏈接時發送的MQTT協議版本號,非3.1和3.1.1版本發送協議不支持響應報文並在發送完成後關閉鏈接
if (msg.variableHeader().version() != MqttVersion.MQTT_3_1.protocolLevel()
&& msg.variableHeader().version() != MqttVersion.MQTT_3_1_1.protocolLevel()) {
MqttConnAckMessage badProto = connAck(CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION);
LOG.error("MQTT protocol version is not valid. CId={}", clientId);
channel.writeAndFlush(badProto).addListener(FIRE_EXCEPTION_ON_FAILURE);
channel.close().addListener(CLOSE_ON_FAILURE);
return;
}
final boolean cleanSession = msg.variableHeader().isCleanSession();
if (clientId == null || clientId.length() == 0) {
// 2. 在客戶端配置了cleanSession=false 或者服務端不容許clientId不存在的狀況下客戶端若是未上傳clientId發送協議不支持響應報文並在發送完成後關閉鏈接
if (!cleanSession || !this.allowZeroByteClientId) {
MqttConnAckMessage badId = connAck(CONNECTION_REFUSED_IDENTIFIER_REJECTED);
channel.writeAndFlush(badId).addListener(FIRE_EXCEPTION_ON_FAILURE);
channel.close().addListener(CLOSE_ON_FAILURE);
LOG.error("The MQTT client ID cannot be empty. Username={}", payload.userName());
return;
}
// Generating client id.
clientId = UUID.randomUUID().toString().replace("-", "");
LOG.info("Client has connected with a server generated identifier. CId={}, username={}", clientId,
payload.userName());
}
// 3. 判斷用戶名和密碼是否合法
if (!login(channel, msg, clientId)) {
channel.close().addListener(CLOSE_ON_FAILURE);
return;
}
// 4. 初始化鏈接對象並將鏈接對象引用放入鏈接管理中,若是發現鏈接管理中存在相同客戶端ID的對象則關閉前一個鏈接並將新的鏈接對象放入鏈接管理中
ConnectionDescriptor descriptor = new ConnectionDescriptor(clientId, channel, cleanSession);
final ConnectionDescriptor existing = this.connectionDescriptors.addConnection(descriptor);
if (existing != null) {
LOG.info("Client ID is being used in an existing connection, force to be closed. CId={}", clientId);
existing.abort();
//return;
this.connectionDescriptors.removeConnection(existing);
this.connectionDescriptors.addConnection(descriptor);
}
// 5. 根據客戶端上傳的心跳時間調整服務端當前鏈接的心跳判斷時間(keepAlive * 1.5f)
initializeKeepAliveTimeout(channel, msg, clientId);
// 6. 遺囑消息存儲(當鏈接意外斷開時向存儲的主題發佈消息)
storeWillMessage(msg, clientId);
// 7. 發送鏈接成功響應
if (!sendAck(descriptor, msg, clientId)) {
channel.close().addListener(CLOSE_ON_FAILURE);
return;
}
m_interceptor.notifyClientConnected(msg);
if (!descriptor.assignState(SENDACK, SESSION_CREATED)) {
channel.close().addListener(CLOSE_ON_FAILURE);
return;
}
// 8. 建立當前鏈接session
final ClientSession clientSession = this.sessionsRepository.createOrLoadClientSession(clientId, cleanSession);
// 9. 當cleanSession=false 發送當前session已經存儲的消息
if (!republish(descriptor, msg, clientSession)) {
channel.close().addListener(CLOSE_ON_FAILURE);
return;
}
int flushIntervalMs = 500/* (keepAlive * 1000) / 2 */;
setupAutoFlusher(channel, flushIntervalMs);
final boolean success = descriptor.assignState(MESSAGES_REPUBLISHED, ESTABLISHED);
if (!success) {
channel.close().addListener(CLOSE_ON_FAILURE);
}
LOG.info("Connected client <{}> with login <{}>", clientId, payload.userName());
}
複製代碼
public void processSubscribe(Channel channel, MqttSubscribeMessage msg) {
String clientID = NettyUtils.clientID(channel);
int messageID = messageId(msg);
LOG.debug("Processing SUBSCRIBE message. CId={}, messageId={}", clientID, messageID);
RunningSubscription executionKey = new RunningSubscription(clientID, messageID);
SubscriptionState currentStatus = subscriptionInCourse.putIfAbsent(executionKey, SubscriptionState.VERIFIED);
if (currentStatus != null) {
LOG.warn("Client sent another SUBSCRIBE message while this one was being processed CId={}, messageId={}",
clientID, messageID);
return;
}
String username = NettyUtils.userName(channel);
// 一、訂閱的主題校驗(權限、主題path合法性)
List<MqttTopicSubscription> ackTopics = doVerify(clientID, username, msg);
MqttSubAckMessage ackMessage = doAckMessageFromValidateFilters(ackTopics, messageID);
if (!this.subscriptionInCourse.replace(executionKey, SubscriptionState.VERIFIED, SubscriptionState.STORED)) {
LOG.warn("Client sent another SUBSCRIBE message while the topic filters were being verified CId={}, " +
"messageId={}", clientID, messageID);
return;
}
LOG.debug("Creating and storing subscriptions CId={}, messageId={}, topics={}", clientID, messageID, ackTopics);
// 二、在當前session中存儲訂閱的主題
List<Subscription> newSubscriptions = doStoreSubscription(ackTopics, clientID);
// save session, persist subscriptions from session
// 三、採用全局tree結構存儲訂閱信息(主題和訂閱者信息),用於消息轉發時根據主題查找到對應的訂閱者
for (Subscription subscription : newSubscriptions) {
subscriptions.add(subscription);
}
LOG.debug("Sending SUBACK response CId={}, messageId={}", clientID, messageID);
// 四、發送訂閱迴應
channel.writeAndFlush(ackMessage).addListener(FIRE_EXCEPTION_ON_FAILURE);
// fire the persisted messages in session
// 五、掃描持久化的消息匹配到當前訂閱主題的當即向此鏈接發送消息
for (Subscription subscription : newSubscriptions) {
publishRetainedMessagesInSession(subscription, username);
}
boolean success = this.subscriptionInCourse.remove(executionKey, SubscriptionState.STORED);
if (!success) {
LOG.warn("Unable to perform the final subscription state update CId={}, messageId={}", clientID, messageID);
} else {
LOG.info("Client <{}> subscribed to topics", clientID);
}
}
複製代碼
Packet Identifier:報文標識存在報文的可變報頭部分,非零兩個字節整數 (0-65535]。
一個流程中重複:這些報文包含PacketID,並且在一次通訊流程內保持一致:PUBLISH(QoS>0時)、PUBACK、PUBREC、PUBREL、PUBCOMP、SUBSCRIBE、SUBACK、UNSUBSCIBE、UNSUBACK 。
新的不重複:客戶端每次發送一個新的這些類型的報文時都必須分配一個當前 未使用的PacketID 當客戶端處理完這個報文對應的確認後,這個報文標識符就釋放可重用。
獨立維護:客戶端和服務端彼此獨立地分配報文標識符。所以,客戶端服務端組合使用相同的報文標識符能夠實現併發的消息交換。客戶端和服務端產生的Packet Identifier一致不算異常。
Payload: 有效載荷即消息體最大容許 256MB。 Publish 的 Payload 容許爲空,在不少場合下表明將持久消息(或者遺囑消息)清空。採用UTF-8編碼。
Retain:持久消息(粘性消息)
RETAIN 標記:每一個Publish消息都須要指定的標記
每一個Topic只會保留最多一個 Retain 持久消息 客戶端訂閱帶有持久消息的Topic,會當即受到這條消息。
服務器能夠選擇丟棄持久消息,好比內存或者存儲吃緊的時候。
若是客戶端想要刪除某個Topic 上面的持久消息,能夠向這個Topic發送一個Payload爲空的持久消息 遺囑消息(Will)的Retain持久機制同理。
QoS :服務等級(消息可靠性)
public void processPublish(Channel channel, MqttPublishMessage msg) {
final MqttQoS qos = msg.fixedHeader().qosLevel();
final String clientId = NettyUtils.clientID(channel);
LOG.info("Processing PUBLISH message. CId={}, topic={}, messageId={}, qos={}", clientId,
msg.variableHeader().topicName(), msg.variableHeader().messageId(), qos);
switch (qos) {
case AT_MOST_ONCE:
this.qos0PublishHandler.receivedPublishQos0(channel, msg);
break;
case AT_LEAST_ONCE:
this.qos1PublishHandler.receivedPublishQos1(channel, msg);
break;
case EXACTLY_ONCE:
this.qos2PublishHandler.receivedPublishQos2(channel, msg);
break;
default:
LOG.error("Unknown QoS-Type:{}", qos);
break;
}
}
複製代碼
從上述代碼的switch語句中能夠看出會根據消息的Qos級別分別進行處理
sequenceDiagram
ClientA->>ServerBroker: 發送消息
ServerBroker->>ClientB: 發送消息
複製代碼
void receivedPublishQos0(Channel channel, MqttPublishMessage msg) {
// verify if topic can be write
final Topic topic = new Topic(msg.variableHeader().topicName());
String clientID = NettyUtils.clientID(channel);
String username = NettyUtils.userName(channel);
// 1. 權限判斷
if (!m_authorizator.canWrite(topic, username, clientID)) {
LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic);
return;
}
// route message to subscribers
IMessagesStore.StoredMessage toStoreMsg = asStoredMessage(msg);
toStoreMsg.setClientID(clientID);
// 2. 向全部該主題的訂閱者發佈消息
this.publisher.publish2Subscribers(toStoreMsg, topic);
if (msg.fixedHeader().isRetain()) {
// 3. QoS == 0 && retain => clean old retained
m_messagesStore.cleanRetained(topic);
}
m_interceptor.notifyTopicPublished(msg, clientID, username);
}
複製代碼
sequenceDiagram
ClientA->>ServerBroker: 1.發送消息PUBLISH
ServerBroker->>ServerBroker: 1.1存儲消息
ServerBroker->>ClientA: 1.2發送消息迴應PUBACK
ServerBroker->>ClientB: 2.發送消息
ClientB->>ServerBroker: 2.1發送消息迴應PUBACK
ServerBroker->>ServerBroker: 2.2刪除消息
複製代碼
1.發送消息PUBLISH
void receivedPublishQos1(Channel channel, MqttPublishMessage msg) {
// verify if topic can be write
final Topic topic = new Topic(msg.variableHeader().topicName());
topic.getTokens();
if (!topic.isValid()) {
LOG.warn("Invalid topic format, force close the connection");
channel.close().addListener(CLOSE_ON_FAILURE);
return;
}
String clientID = NettyUtils.clientID(channel);
String username = NettyUtils.userName(channel);
// 1. 權限判斷
if (!m_authorizator.canWrite(topic, username, clientID)) {
LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic);
return;
}
final int messageID = msg.variableHeader().messageId();
// route message to subscribers
IMessagesStore.StoredMessage toStoreMsg = asStoredMessage(msg);
toStoreMsg.setClientID(clientID);
// 2. 向全部該主題的訂閱者發佈消息(每一個session中存儲即將要發送的消息)
this.publisher.publish2Subscribers(toStoreMsg, topic, messageID);
// 3. 發送Ack迴應
sendPubAck(clientID, messageID);
// 4. retain = true => 存儲消息
if (msg.fixedHeader().isRetain()) {
if (!msg.payload().isReadable()) {
m_messagesStore.cleanRetained(topic);
} else {
// before wasn't stored m_messagesStore.storeRetained(topic, toStoreMsg); } } m_interceptor.notifyTopicPublished(msg, clientID, username); } 複製代碼
2.1發送消息迴應PUBACK
服務端Server接收到PUBACK消息後將執行:
public void processPubAck(Channel channel, MqttPubAckMessage msg) {
String clientID = NettyUtils.clientID(channel);
int messageID = msg.variableHeader().messageId();
String username = NettyUtils.userName(channel);
LOG.trace("retrieving inflight for messageID <{}>", messageID);
ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
StoredMessage inflightMsg = targetSession.inFlightAcknowledged(messageID);
String topic = inflightMsg.getTopic();
InterceptAcknowledgedMessage wrapped = new InterceptAcknowledgedMessage(inflightMsg, topic, username,
messageID);
m_interceptor.notifyMessageAcknowledged(wrapped);
}
複製代碼
sequenceDiagram
ClientA->>ServerBroker: 1.發送消息PUBLISH
ServerBroker->>ServerBroker: 1.1存儲消息
ServerBroker->>ClientA: 1.2發送消息迴應Rec
ClientA->>ServerBroker: 2.發送消息Rel
ServerBroker->>ServerBroker: 2.1刪除消息
ServerBroker->>ServerBroker: 2.2存儲消息到發送列隊
ServerBroker->>ClientB: 2.3發送消息
ServerBroker->>ClientA: 2.4發送消息迴應Comp
ClientB->>ServerBroker: 3.發送消息迴應Rec
ServerBroker->>ServerBroker: 3.1刪除2.2中存儲的消息(一次確認)
ServerBroker->>ServerBroker: 3.2存儲消息
ServerBroker->>ClientB: 3.3發送消息Rel
ClientB->>ServerBroker: 3.4發送消息迴應Comp
ServerBroker->>ServerBroker: 3.5刪除消息(二次確認)
複製代碼
1.發送消息PUBLISH
void receivedPublishQos2(Channel channel, MqttPublishMessage msg) {
final Topic topic = new Topic(msg.variableHeader().topicName());
// check if the topic can be wrote
String clientID = NettyUtils.clientID(channel);
String username = NettyUtils.userName(channel);
// 1. 權限判斷
if (!m_authorizator.canWrite(topic, username, clientID)) {
LOG.error("MQTT client is not authorized to publish on topic. CId={}, topic={}", clientID, topic);
return;
}
final int messageID = msg.variableHeader().messageId();
// 2. 存儲消息
IMessagesStore.StoredMessage toStoreMsg = asStoredMessage(msg);
toStoreMsg.setClientID(clientID);
LOG.info("Sending publish message to subscribers CId={}, topic={}, messageId={}", clientID, topic, messageID);
if (LOG.isTraceEnabled()) {
LOG.trace("payload={}, subs Tree={}", payload2Str(toStoreMsg.getPayload()), subscriptions.dumpTree());
}
this.sessionsRepository.sessionForClient(clientID).markAsInboundInflight(messageID, toStoreMsg);
// 3. 發送Rec迴應
sendPubRec(clientID, messageID);
// Next the client will send us a pub rel
// NB publish to subscribers for QoS 2 happen upon PUBREL from publisher
// if (msg.fixedHeader().isRetain()) {
// if (msg.payload().readableBytes() == 0) {
// m_messagesStore.cleanRetained(topic);
// } else {
// m_messagesStore.storeRetained(topic, toStoreMsg);
// }
// }
//TODO this should happen on PUB_REL, else we notify false positive
m_interceptor.notifyTopicPublished(msg, clientID, username);
}
複製代碼
2.發送消息Rel
void processPubRel(Channel channel, MqttMessage msg) {
String clientID = NettyUtils.clientID(channel);
int messageID = messageId(msg);
LOG.info("Processing PUBREL message. CId={}, messageId={}", clientID, messageID);
ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
// 1. 刪除消息
IMessagesStore.StoredMessage evt = targetSession.inboundInflight(messageID);
if (evt == null) {
LOG.warn("Can't find inbound inflight message for CId={}, messageId={}", clientID, messageID);
throw new IllegalArgumentException("Can't find inbound inflight message");
}
final Topic topic = new Topic(evt.getTopic());
// 2. 轉發消息
this.publisher.publish2Subscribers(evt, topic, messageID);
if (evt.isRetained()) {
if (evt.getPayload().readableBytes() == 0) {
m_messagesStore.cleanRetained(topic);
} else {
m_messagesStore.storeRetained(topic, evt);
}
}
//TODO here we should notify to the listeners
//m_interceptor.notifyTopicPublished(msg, clientID, username);
// 3.發送Comp 迴應
sendPubComp(clientID, messageID);
}
複製代碼
3.發送消息迴應Rec
public void processPubRec(Channel channel, MqttMessage msg) {
String clientID = NettyUtils.clientID(channel);
int messageID = messageId(msg);
LOG.debug("Processing PUBREC message. CId={}, messageId={}", clientID, messageID);
ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
// remove from the inflight and move to the QoS2 second phase queue
// 1. 刪除消息
StoredMessage ackedMsg = targetSession.inFlightAcknowledged(messageID);
// 2. 存儲消息(分別存儲在secondPhaseStore和outboundInflightMap)
targetSession.moveInFlightToSecondPhaseAckWaiting(messageID, ackedMsg);
// once received a PUBREC reply with a PUBREL(messageID)
LOG.debug("Processing PUBREC message. CId={}, messageId={}", clientID, messageID);
// 3. 發送PUBREL
MqttFixedHeader pubRelHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, AT_LEAST_ONCE, false, 0);
MqttMessage pubRelMessage = new MqttMessage(pubRelHeader, from(messageID));
channel.writeAndFlush(pubRelMessage).addListener(FIRE_EXCEPTION_ON_FAILURE);
}
複製代碼
3.4發送消息迴應Comp
public void processPubComp(Channel channel, MqttMessage msg) {
String clientID = NettyUtils.clientID(channel);
int messageID = messageId(msg);
LOG.debug("Processing PUBCOMP message. CId={}, messageId={}", clientID, messageID);
// once received the PUBCOMP then remove the message from the temp memory
ClientSession targetSession = this.sessionsRepository.sessionForClient(clientID);
// 1. 刪除消息
StoredMessage inflightMsg = targetSession.completeReleasedPublish(messageID);
String username = NettyUtils.userName(channel);
String topic = inflightMsg.getTopic();
final InterceptAcknowledgedMessage interceptAckMsg = new InterceptAcknowledgedMessage(inflightMsg, topic,
username, messageID);
m_interceptor.notifyMessageAcknowledged(interceptAckMsg);
}
複製代碼
Topic 話題 和 TopicFilter 話題過濾器
Pub-Sub消息模型的核心機制 UTF-8 編碼字符串,不能超過 65535 字節。層級數量沒有限制 不能包含任何的下文中提到的特殊符號(/、+、#),必須至少包含一個字符
區分大小寫,能夠包含空格,不能包含空字符 (Unicode U+0000)
在收部或尾部增長 斜槓 「/」,會產生不一樣的Topic和TopicFilter。舉例:
只包含斜槓 「/」 的 Topic 或 TopicFilter 是合法的
TopicFilter中的特殊符號
層級分隔符 / 用於分割主題的每一個層級,爲主題名提供一個分層結構
主題層級分隔符能夠出如今 Topic 或 TopicFilter 的任何位置
特例:相鄰的主題層次分隔符表示一個零長度的主題層級
單層通配符 +
只能用於單個主題層級匹配的通配符。例如,「a/b/+」 匹配 「a/b/c1」 和 「a/b/c2」 ,可是不匹配 「a/b/c/d」
能夠匹配 任意層級,包括第一個和最後一個層級。
例如,「+」 是有效的,「sport/+/player1」 也是有效的。 能夠在多個層級中使用它,也能夠和多層通配符一塊兒使用。
例如,「+/tennis/#」 是有效的。只能匹配本級不能匹配上級。
例如,「sport/+」 不匹配 「sport」 可是卻匹配「sport/」,「/finance」 匹配 「+/+」 和 「/+」 ,可是不匹配 「+」。
多層通配符 #
用於匹配主題中任意層級的通配符 匹配包含自己的層級和子層級。
例如 「a/b/c/#" 能夠匹配 「a/b/c」、「a/b/c/d」 和 「a/b/c/d/e」 必須是最後的結尾。
例如 「sport/tennis/#/ranking」是無效的
「#」是有效的,會收到全部的應用消息。 (服務器端應將此類 TopicFilter禁掉 )
以$開頭的,服務器保留
服務端不能將 $ 字符開頭的 Topic 匹配通配符 (#或+) 開頭的 TopicFilter
服務端應該阻止客戶端使用這種 Topic 與其它客戶端交換消息。
服務端實現能夠將 $ 開頭的主題名用做其餘目的。
開頭的 Topic,就不會收到對應的消息
若是客戶端想同時接受以 「 開頭主題的消息,它須要同時 訂閱 「#」 和 「$SYS/#」
這4個主題會存儲成以下結構:
@Override
public void add(Subscription newSubscription) {
Action res;
do {
res = insert(newSubscription.clientId, newSubscription.topicFilter, this.root, newSubscription.topicFilter);
} while (res == Action.REPEAT);
}
private Action insert(String clientId, Topic topic, final INode inode, Topic fullpath) {
Token token = topic.headToken();
if (!topic.isEmpty() && inode.mainNode().anyChildrenMatch(token)) {
Topic remainingTopic = topic.exceptHeadToken();
INode nextInode = inode.mainNode().childOf(token);
return insert(clientId, remainingTopic, nextInode, fullpath);
} else {
if (topic.isEmpty()) {
return insertSubscription(clientId, fullpath, inode);
} else {
return createNodeAndInsertSubscription(clientId, topic, inode, fullpath);
}
}
}
複製代碼
public void removeSubscription(Topic topic, String clientID) {
Action res;
do {
res = remove(clientID, topic, this.root, NO_PARENT);
} while (res == Action.REPEAT);
}
private Action remove(String clientId, Topic topic, INode inode, INode iParent) {
Token token = topic.headToken();
if (!topic.isEmpty() && (inode.mainNode().anyChildrenMatch(token))) {
Topic remainingTopic = topic.exceptHeadToken();
INode nextInode = inode.mainNode().childOf(token);
return remove(clientId, remainingTopic, nextInode, inode);
} else {
final CNode cnode = inode.mainNode();
if (cnode instanceof TNode) {
// this inode is a tomb, has no clients and should be cleaned up
// Because we implemented cleanTomb below, this should be rare, but possible
// Consider calling cleanTomb here too
return Action.OK;
}
if (cnode.containsOnly(clientId) && topic.isEmpty() && cnode.allChildren().isEmpty()) {
// last client to leave this node, AND there are no downstream children, remove via TNode tomb
if (inode == this.root) {
return inode.compareAndSet(cnode, inode.mainNode().copy()) ? Action.OK : Action.REPEAT;
}
TNode tnode = new TNode();
return inode.compareAndSet(cnode, tnode) ? cleanTomb(inode, iParent) : Action.REPEAT;
} else if (cnode.contains(clientId) && topic.isEmpty()) {
CNode updatedCnode = cnode.copy();
updatedCnode.removeSubscriptionsFor(clientId);
return inode.compareAndSet(cnode, updatedCnode) ? Action.OK : Action.REPEAT;
} else {
//someone else already removed
return Action.OK;
}
}
}
複製代碼
Set<Subscription> recursiveMatch(Topic topic, INode inode) {
CNode cnode = inode.mainNode();
if (Token.MULTI.equals(cnode.token)) {
return cnode.subscriptions;
}
if (topic.isEmpty()) {
return Collections.emptySet();
}
if (cnode instanceof TNode) {
return Collections.emptySet();
}
final Token token = topic.headToken();
if (!(Token.SINGLE.equals(cnode.token) || cnode.token.equals(token) || ROOT.equals(cnode.token))) {
return Collections.emptySet();
}
Topic remainingTopic = (ROOT.equals(cnode.token)) ? topic : topic.exceptHeadToken();
Set<Subscription> subscriptions = new HashSet<>();
if (remainingTopic.isEmpty()) {
subscriptions.addAll(cnode.subscriptions);
}
for (INode subInode : cnode.allChildren()) {
subscriptions.addAll(recursiveMatch(remainingTopic, subInode));
}
return subscriptions;
}
複製代碼
相關參考