在moquette使用中發如今設備頻繁上下線和兩個設備ClientId同樣相互頂替鏈接的狀況下,InterceptHandler的onConnect和onConnectionLost的方法調用沒有前後順序,若是在這兩個方法裏面來記錄設備上下線狀態,會形成狀態不對。異步
io.moquette.spi.impl.ProtocolProcessor中的processConnect(Channel channel, MqttConnectMessage msg)部分代碼以下ide
ConnectionDescriptor descriptor = new ConnectionDescriptor(clientId, channel, cleanSession); final ConnectionDescriptor existing = this.connectionDescriptors.addConnection(descriptor); if (existing != null) { LOG.info("Client ID is being used in an existing connection, force to be closed. CId={}", clientId); existing.abort(); this.connectionDescriptors.removeConnection(existing); this.connectionDescriptors.addConnection(descriptor); } initializeKeepAliveTimeout(channel, msg, clientId); storeWillMessage(msg, clientId); if (!sendAck(descriptor, msg, clientId)) { channel.close().addListener(CLOSE_ON_FAILURE); return; } m_interceptor.notifyClientConnected(msg);
能夠看到existing.abort();後會m_interceptor.notifyClientConnected(msg); 先斷開原來的鏈接,而後接着通知上線。因爲Netty自己就是異步的,再加上InterceptHandler相關方法的調用都是在線程池中進行的,所以nterceptHandler的onConnect和onConnectionLost的方法調用前後順序是沒法保證的。性能
在ChannelHandler鏈中添加一個handler,專門處理設備上線事件,對於相同ClientId的鏈接已經存在時,鏈接斷開和鏈接事件強制加上時序。this
@Sharable public class AbrotExistConnectionMqttHandler extends ChannelInboundHandlerAdapter { private static final Logger LOG = LoggerFactory.getLogger(AbrotExistConnectionMqttHandler.class); private final ProtocolProcessor m_processor; private static final ReentrantLock[] locks = new ReentrantLock[8]; static { for (int i = 0; i < locks.length; i++) { locks[i] = new ReentrantLock(); } } public AbrotExistConnectionMqttHandler(ProtocolProcessor m_processor) { this.m_processor = m_processor; } @Override public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception { MqttMessage msg = (MqttMessage) message; MqttMessageType messageType = msg.fixedHeader().messageType(); LOG.debug("Processing MQTT message, type: {}", messageType); if (messageType != MqttMessageType.CONNECT) { super.channelRead(ctx, message); return; } MqttConnectMessage connectMessage = (MqttConnectMessage) msg; String clientId = connectMessage.payload().clientIdentifier(); /** * 經過鎖和sleep來解決設備互頂出現的設備上線和下線回調時序錯亂的問題 * 目前解決的方式經過sleep不是太好 * 解決了多個鏈接互相頂替出現的問題(有一個鏈接先鏈接的狀況) * */ ReentrantLock lock = locks[Math.abs(clientId.hashCode()) % locks.length]; lock.lock(); try { if (!m_processor.isConnected(clientId)) { super.channelRead(ctx, message); return; } m_processor.abortIfExist(clientId); Thread.sleep(50); super.channelRead(ctx, message); Thread.sleep(30); } catch (Exception ex) { ex.printStackTrace(); super.channelRead(ctx, message); } finally { lock.unlock(); } } }
解釋:
1.經過ReentrantLock lock = locks[Math.abs(clientId.hashCode()) % locks.length];來保證相同的ClientId的鏈接都會得到同一個鎖
2.經過兩次Thread.sleep(50);將斷開鏈接和處理設備上線變成前後順序關係。
3.由於相互頂替的狀況並很少見,所以兩個Thread.sleep()也能夠接受,在性能上並不會形成多大影響。線程