筆者在上一章對鏈接報文進行了相關的講解。這一章筆者想寫一個鏈接報文的例子來加深理解。原本這一章也應該在上一章出現的。但是筆者怕太長了。很差方便閱。因此決定分倆章來。正如筆者上一章所講的。筆者會用Netty通訊框架進行編寫。主要由於Netty已經爲咱們集成了相關MQTT功能。html
開發環境java
開發工具:intellij idea.(之前我一直在eclipse。最近新版的老報錯。因此就放棄了)apache
Netty包:netty-all-4.1.16.Final.jar。下載網站:http://netty.io/downloads.html服務器
JDK:JAVA 8session
第三包:commons-lang3-3.6.jar。下載網站:http://commons.apache.org/proper/commons-lang/download_lang.cgiapp
MQTT編寫框架
在這裏筆者並不打包把客戶端的代碼一塊兒編寫出。事實上關於客戶端的開源的代碼是很是多的。筆者這裏只會略微的編寫一下服務端的代碼。固然這裏代碼只是爲方更瞭解MQTT協議。並不是企業級的編蜜棗這一點但願讀者見諒。爲了實現鏈接報文。筆者定義了三個類。dom
Main類:用於啓動服務。eclipse
BrokerHandler類:處理接受來的信息。ide
BrokerSessionHelper:用於發送信息給客戶。
Main類的源碼
1 public static void main(String[] args) throws Exception { 2 EventLoopGroup bossGroup = new NioEventLoopGroup(); 3 EventLoopGroup workerGroup = new NioEventLoopGroup(); 4 5 Runtime.getRuntime().addShutdownHook(new Thread() { 6 public void run() { 7 workerGroup.shutdownGracefully(); 8 bossGroup.shutdownGracefully(); 9 } 10 }); 11 12 13 ServerBootstrap b = new ServerBootstrap(); 14 b.group(bossGroup, workerGroup) 15 .channel(NioServerSocketChannel.class) 16 .handler(new LoggingHandler(LogLevel.INFO)) 17 .childHandler(new ChannelInitializer<SocketChannel>() { 18 @Override 19 public void initChannel(SocketChannel ch) throws Exception { 20 21 ChannelPipeline p = ch.pipeline(); 22 23 p.addFirst("idleHandler", new IdleStateHandler(0, 0, 120)); 24 p.addLast("encoder", MqttEncoder.INSTANCE); 25 p.addLast("decoder", new MqttDecoder()); 26 p.addLast("logicHandler", new BrokerHandler(65535)); 27 28 } 29 }) 30 .option(ChannelOption.SO_BACKLOG, 511) 31 .childOption(ChannelOption.SO_KEEPALIVE, true); 32 33 ChannelFuture f = b.bind("0.0.0.0", 1883).sync(); 34 35 f.channel().closeFuture().sync(); 36 }
上面的一、2倆行表是Netty裏面倆個線程組。事實上也就是Reactor線程組。bossGroup 用於處理接受來自客戶端的鏈接。workerGroup 用於處理接受客戶端的讀取信息。13行的ServerBootstrap能夠理解爲啓動服務的一個引導類。主要關鍵是他的group方法。這樣子就能夠把倆個線程組關係在一塊兒了。重點就在17行這裏。childHandler用於處理IO事件。好比讀取客戶端進行。而後本身編碼。大家能夠看到24行的MqttEncoder.INSTANCE和25行的MqttDecoder吧。他們就是用於處理MQTT協議傳來的信息進行處理。而26行BrokerHandler類就是筆者來處理每個報文對應的響應。筆者就不在過多的說了。若是大家不懂的話,能夠去看一下Netty框架的知識在過看的話會比較好。
BrokerHandler類的源碼
1 public class BrokerHandler extends SimpleChannelInboundHandler<MqttMessage> { 2 private MqttVersion version; 3 private String clientId; 4 private String userName; 5 private String brokerId; 6 private boolean connected; 7 private boolean cleanSession; 8 private int keepAlive; 9 private int keepAliveMax; 10 private MqttPublishMessage willMessage; 11 12 public BrokerHandler(int keepAliveMax) { 13 14 this.keepAliveMax = keepAliveMax; 15 } 16 17 @Override 18 @SuppressWarnings("ThrowableResultOfMethodCallIgnored") 19 protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) throws Exception { 20 21 if (msg.decoderResult().isFailure()) { 22 23 Throwable cause = msg.decoderResult().cause(); 24 25 if (cause instanceof MqttUnacceptableProtocolVersionException) { 26 27 BrokerSessionHelper.sendMessage( 28 ctx, 29 MqttMessageFactory.newMessage( 30 new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), 31 new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, false), 32 null), 33 "INVALID", 34 null, 35 true); 36 37 } else if (cause instanceof MqttIdentifierRejectedException) { 38 39 BrokerSessionHelper.sendMessage( 40 ctx, 41 MqttMessageFactory.newMessage( 42 new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), 43 new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), 44 null), 45 "INVALID", 46 null, 47 true); 48 } 49 50 ctx.close(); 51 52 return; 53 } 54 55 switch (msg.fixedHeader().messageType()) { 56 case CONNECT: 57 onConnect(ctx, (MqttConnectMessage) msg); 58 break; 59 case PUBLISH: 60 onPublish(ctx, (MqttPublishMessage) msg); 61 break; 62 case PUBACK: 63 onPubAck(ctx, msg); 64 break; 65 case PUBREC: 66 onPubRec(ctx, msg); 67 break; 68 case PUBREL: 69 onPubRel(ctx, msg); 70 break; 71 case PUBCOMP: 72 onPubComp(ctx, msg); 73 break; 74 case SUBSCRIBE: 75 onSubscribe(ctx, (MqttSubscribeMessage) msg); 76 break; 77 case UNSUBSCRIBE: 78 onUnsubscribe(ctx, (MqttUnsubscribeMessage) msg); 79 break; 80 case PINGREQ: 81 onPingReq(ctx); 82 break; 83 case DISCONNECT: 84 onDisconnect(ctx); 85 break; 86 } 87 88 } 89 90 private void onConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) { 91 92 this.version = MqttVersion.fromProtocolNameAndLevel(msg.variableHeader().name(), (byte) msg.variableHeader().version()); 93 this.clientId = msg.payload().clientIdentifier(); 94 this.cleanSession = msg.variableHeader().isCleanSession(); 95 96 if (msg.variableHeader().keepAliveTimeSeconds() > 0 && msg.variableHeader().keepAliveTimeSeconds() <= this.keepAliveMax) { 97 this.keepAlive = msg.variableHeader().keepAliveTimeSeconds(); 98 } 99 100 //MQTT 3.1以後可能存在爲空的客戶ID。因此要進行處理。若是客戶ID是空,並且還在保存處理相關的信息。這樣子是不行。 101 //必須有客戶ID咱們才能存保相關信息。 102 if (StringUtils.isBlank(this.clientId)) { 103 if (!this.cleanSession) { 104 105 BrokerSessionHelper.sendMessage( 106 ctx, 107 MqttMessageFactory.newMessage( 108 new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), 109 new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), 110 null), 111 "INVALID", 112 null, 113 true); 114 115 ctx.close(); 116 117 return; 118 119 } else { 120 this.clientId = java.util.UUID.randomUUID().toString(); 121 } 122 } 123 124 //有可能發送倆次的鏈接包。若是已經存在鏈接就是關閉當前的鏈接。 125 if (this.connected) { 126 ctx.close(); 127 return; 128 } 129 130 131 boolean userNameFlag = msg.variableHeader().hasUserName(); 132 boolean passwordFlag = msg.variableHeader().hasPassword(); 133 this.userName = msg.payload().userName(); 134 135 String password = "" ; 136 if( msg.payload().passwordInBytes() != null && msg.payload().passwordInBytes().length > 0) 137 password = new String(msg.payload().passwordInBytes()); 138 139 boolean mistake = false; 140 141 //若是有用戶名標示,那麼就必須有密碼標示。 142 //當有用戶名標的時候,用戶不能爲空。 143 //當有密碼標示的時候,密碼不能爲空。 144 if (userNameFlag) { 145 if (StringUtils.isBlank(this.userName)) 146 mistake = true; 147 } else { 148 if (StringUtils.isNotBlank(this.userName) || passwordFlag) mistake = true; 149 } 150 151 152 if (passwordFlag) { 153 154 if (StringUtils.isBlank(password)) mistake = true; 155 } else { 156 if (StringUtils.isNotBlank(password)) mistake = true; 157 } 158 159 if (mistake) { 160 BrokerSessionHelper.sendMessage( 161 ctx, 162 MqttMessageFactory.newMessage( 163 new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), 164 new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false), 165 null), 166 this.clientId, 167 null, 168 true); 169 ctx.close(); 170 return; 171 } 172 173 BrokerSessionHelper.sendMessage( 174 ctx, 175 MqttMessageFactory.newMessage( 176 new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), 177 new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, !this.cleanSession), 178 null), 179 this.clientId, 180 null, 181 true); 182 183 ChannelHandlerContext lastSession = BrokerSessionHelper.removeSession(this.clientId); 184 if (lastSession != null) { 185 lastSession.close(); 186 } 187 188 String willTopic = msg.payload().willTopic(); 189 String willMessage = ""; 190 if(msg.payload().willMessageInBytes() != null && msg.payload().willMessageInBytes().length > 0) 191 willMessage = new String(msg.payload().willMessageInBytes()); 192 193 if (msg.variableHeader().isWillFlag() && StringUtils.isNotEmpty(willTopic) && StringUtils.isNotEmpty(willMessage)) { 194 195 this.willMessage = (MqttPublishMessage) MqttMessageFactory.newMessage( 196 new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.valueOf(msg.variableHeader().willQos()), msg.variableHeader().isWillRetain(), 0), 197 new MqttPublishVariableHeader(willTopic, 0), 198 Unpooled.wrappedBuffer(willMessage.getBytes()) 199 ); 200 } 201 202 this.connected = true; 203 BrokerSessionHelper.saveSession(this.clientId, ctx); 204 } 205 206 private void onSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) { 207 } 208 209 private void onUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage msg) { 210 } 211 212 private void onPingReq(ChannelHandlerContext ctx) { 213 } 214 215 private void onDisconnect(ChannelHandlerContext ctx) { 216 217 if (!this.connected) { 218 ctx.close(); 219 return; 220 } 221 222 BrokerSessionHelper.removeSession(this.clientId, ctx); 223 224 this.willMessage = null; 225 226 this.connected = false; 227 228 ctx.close(); 229 230 } 231 232 private void onPubComp(ChannelHandlerContext ctx, MqttMessage msg) { 233 234 } 235 236 private void onPubRel(ChannelHandlerContext ctx, MqttMessage msg) { 237 } 238 239 private void onPubRec(ChannelHandlerContext ctx, MqttMessage msg) { 240 } 241 242 private void onPubAck(ChannelHandlerContext ctx, MqttMessage msg) { 243 } 244 245 private void onPublish(ChannelHandlerContext ctx, MqttPublishMessage msg) { 246 } 247 }
19 行中的channelRead0方法中有倆個參數。一個爲ChannelHandlerContext(通首的上下文)。一個是MqttMessage(客戶端來的MQTT報文)。咱們接下來動做都是跟MqttMessage來作相關的邏輯處理。這一點從55行就能夠看出來。咱們能夠判斷他是什麼類型的報文。筆者這裏只實現鏈接報文的處理。21行的代碼msg.decoderResult().isFailure()是用來判斷傳過來的報文是否是正確的。事實上是Netty框架幫咱們作了第一層的驗證。23行就是得到發生的異常。
從第99行onConnect方法開始就是處理鏈接報文的處理。筆者這裏只作下面相關的處理。
1.驗證保持鏈接(Keep Alive)的有效性。代碼以下
1 if (msg.variableHeader().keepAliveTimeSeconds() > 0 && msg.variableHeader().keepAliveTimeSeconds() <= this.keepAliveMax) { 2 this.keepAlive = msg.variableHeader().keepAliveTimeSeconds(); 3 }
2.驗證客戶ID爲空的時候,還要求保存會話狀。這是不合理的。由於個人會話狀態是跟根客戶ID來保存。不然的話,隨更給一個。反正後面仍是清除會話狀態。那麼爲何會有空的呢?主要是在MQTT 3.1.1裏面指出客戶ID能夠爲空了。
1 if (StringUtils.isBlank(this.clientId)) { 2 if (!this.cleanSession) { 3 4 BrokerSessionHelper.sendMessage( 5 ctx, 6 MqttMessageFactory.newMessage( 7 new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), 8 new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), 9 null), 10 "INVALID", 11 null, 12 true); 13 14 ctx.close(); 15 16 return; 17 18 } else { 19 this.clientId = java.util.UUID.randomUUID().toString(); 20 } 21 }
3.判斷是不是第二次鏈接報文。若是是的話,就要斷開了。
1 if (this.connected) { 2 ctx.close(); 3 return; 4 }
4.判斷用戶和密碼是否合法性。好比上一章出講到的只有在用戶名標誌爲1的時候,密碼才能夠出現。
boolean userNameFlag = msg.variableHeader().hasUserName(); boolean passwordFlag = msg.variableHeader().hasPassword(); this.userName = msg.payload().userName(); String password = "" ; if( msg.payload().passwordInBytes() != null && msg.payload().passwordInBytes().length > 0) password = new String(msg.payload().passwordInBytes()); boolean mistake = false; //若是有用戶名標示,那麼就必須有密碼標示。 //當有用戶名標的時候,用戶不能爲空。 //當有密碼標示的時候,密碼不能爲空。 if (userNameFlag) { if (StringUtils.isBlank(this.userName)) mistake = true; } else { if (StringUtils.isNotBlank(this.userName) || passwordFlag) mistake = true; } if (passwordFlag) { if (StringUtils.isBlank(password)) mistake = true; } else { if (StringUtils.isNotBlank(password)) mistake = true; } if (mistake) { BrokerSessionHelper.sendMessage( ctx, MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false), null), this.clientId, null, true); ctx.close(); return; }
6.接受客戶端了。事實上筆者還有不少沒有作的事情。好比保存會狀態的處理。由於主要是爲學習因此就沒有講出來。在加上會話狀態存保就要思考保存在哪裏。同時還有一個就是用戶的合法性驗證沒有處理。
1 BrokerSessionHelper.sendMessage( 2 ctx, 3 MqttMessageFactory.newMessage( 4 new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), 5 new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, !this.cleanSession), 6 null), 7 this.clientId, 8 null, 9 true);
7.處理當前報文的遺囑。
1 String willTopic = msg.payload().willTopic(); 2 String willMessage = ""; 3 if(msg.payload().willMessageInBytes() != null && msg.payload().willMessageInBytes().length > 0) 4 willMessage = new String(msg.payload().willMessageInBytes()); 5 6 if (msg.variableHeader().isWillFlag() && StringUtils.isNotEmpty(willTopic) && StringUtils.isNotEmpty(willMessage)) { 7 8 this.willMessage = (MqttPublishMessage) MqttMessageFactory.newMessage( 9 new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.valueOf(msg.variableHeader().willQos()), msg.variableHeader().isWillRetain(), 0), 10 new MqttPublishVariableHeader(willTopic, 0), 11 Unpooled.wrappedBuffer(willMessage.getBytes()) 12 ); 13 }
若是你看到這個類的最後代碼的時候,會發現筆者也寫了相關的ACNNACK響應。他的內容比較簡單。你們看代碼吧。
1 private void onDisconnect(ChannelHandlerContext ctx) { 2 3 if (!this.connected) { 4 ctx.close(); 5 return; 6 } 7 8 BrokerSessionHelper.removeSession(this.clientId, ctx); 9 10 this.willMessage = null; 11 12 this.connected = false; 13 14 ctx.close(); 15 16 }
BrokerSessionHelper類的源碼
public class BrokerSessionHelper { private static final Map<String, ChannelHandlerContext> sessionRepository = new ConcurrentHashMap<>(); public static void saveSession(String clientId, ChannelHandlerContext session) { sessionRepository.put(clientId, session); } public static ChannelHandlerContext getSession(String clientId) { return sessionRepository.get(clientId); } public static ChannelHandlerContext removeSession(String clientId) { return sessionRepository.remove(clientId); } public static boolean removeSession(String clientId, ChannelHandlerContext session) { return sessionRepository.remove(clientId, session); } /** * 發送信息 * * @param msg * @param clientId * @param packetId * @param flush */ public static void sendMessage(MqttMessage msg, String clientId, Integer packetId, boolean flush) { ChannelHandlerContext ctx = getSession(clientId); if (ctx == null) { String pid = packetId == null || packetId <= 0 ? "" : String.valueOf(packetId); return; } sendMessage(ctx, msg, clientId, packetId, flush); } /** * 發送信息 * * @param ctx * @param msg * @param clientId * @param packetId * @param flush */ public static void sendMessage(ChannelHandlerContext ctx, MqttMessage msg, String clientId, Integer packetId, boolean flush) { String pid = packetId == null || packetId <= 0 ? "" : String.valueOf(packetId); ChannelFuture future = flush ? ctx.writeAndFlush(msg) : ctx.write(msg); future.addListener(f -> { if (f.isSuccess()) { } else { } }); } }
BrokerSessionHelper類就是用於存放當前服務器上相關通道信息。同時用於發送返回的相關報文。讀者們能夠進行看代碼吧。
這個時候就大家只按照之前面講的去作。就能夠抓到報文了。客戶端的話。筆者只用前面說的MQTTLens來測試。