MQTT——編寫鏈接報文

筆者在上一章對鏈接報文進行了相關的講解。這一章筆者想寫一個鏈接報文的例子來加深理解。原本這一章也應該在上一章出現的。但是筆者怕太長了。很差方便閱。因此決定分倆章來。正如筆者上一章所講的。筆者會用Netty通訊框架進行編寫。主要由於Netty已經爲咱們集成了相關MQTT功能。html

開發環境java

開發工具:intellij idea.(之前我一直在eclipse。最近新版的老報錯。因此就放棄了)apache

Netty包:netty-all-4.1.16.Final.jar。下載網站:http://netty.io/downloads.html服務器

JDK:JAVA 8session

第三包:commons-lang3-3.6.jar。下載網站:http://commons.apache.org/proper/commons-lang/download_lang.cgiapp

MQTT編寫框架

在這裏筆者並不打包把客戶端的代碼一塊兒編寫出。事實上關於客戶端的開源的代碼是很是多的。筆者這裏只會略微的編寫一下服務端的代碼。固然這裏代碼只是爲方更瞭解MQTT協議。並不是企業級的編蜜棗這一點但願讀者見諒。爲了實現鏈接報文。筆者定義了三個類。dom

Main類:用於啓動服務。eclipse

BrokerHandler類:處理接受來的信息。ide

BrokerSessionHelper:用於發送信息給客戶。

Main類的源碼

 1 public static void main(String[] args) throws Exception  {
 2         EventLoopGroup bossGroup = new NioEventLoopGroup();
 3         EventLoopGroup workerGroup = new NioEventLoopGroup();
 4 
 5         Runtime.getRuntime().addShutdownHook(new Thread() {
 6             public void run() {
 7                 workerGroup.shutdownGracefully();
 8                 bossGroup.shutdownGracefully();
 9             }
10         });
11 
12 
13         ServerBootstrap b = new ServerBootstrap();
14         b.group(bossGroup, workerGroup)
15                 .channel(NioServerSocketChannel.class)
16                 .handler(new LoggingHandler(LogLevel.INFO))
17                 .childHandler(new ChannelInitializer<SocketChannel>() {
18                     @Override
19                     public void initChannel(SocketChannel ch) throws Exception {
20 
21                         ChannelPipeline p = ch.pipeline();
22 
23                         p.addFirst("idleHandler", new IdleStateHandler(0, 0, 120));
24                         p.addLast("encoder", MqttEncoder.INSTANCE);
25                         p.addLast("decoder", new MqttDecoder());
26                         p.addLast("logicHandler", new BrokerHandler(65535));
27 
28                     }
29                 })
30                 .option(ChannelOption.SO_BACKLOG, 511)
31                 .childOption(ChannelOption.SO_KEEPALIVE, true);
32 
33         ChannelFuture f = b.bind("0.0.0.0", 1883).sync();
34 
35         f.channel().closeFuture().sync();
36     }

上面的一、2倆行表是Netty裏面倆個線程組。事實上也就是Reactor線程組。bossGroup 用於處理接受來自客戶端的鏈接。workerGroup 用於處理接受客戶端的讀取信息。13行的ServerBootstrap能夠理解爲啓動服務的一個引導類。主要關鍵是他的group方法。這樣子就能夠把倆個線程組關係在一塊兒了。重點就在17行這裏。childHandler用於處理IO事件。好比讀取客戶端進行。而後本身編碼。大家能夠看到24行的MqttEncoder.INSTANCE和25行的MqttDecoder吧。他們就是用於處理MQTT協議傳來的信息進行處理。而26行BrokerHandler類就是筆者來處理每個報文對應的響應。筆者就不在過多的說了。若是大家不懂的話,能夠去看一下Netty框架的知識在過看的話會比較好。

BrokerHandler類的源碼

  1 public class BrokerHandler extends SimpleChannelInboundHandler<MqttMessage> {
  2     private MqttVersion version;
  3     private String clientId;
  4     private String userName;
  5     private String brokerId;
  6     private boolean connected;
  7     private boolean cleanSession;
  8     private int keepAlive;
  9     private int keepAliveMax;
 10     private MqttPublishMessage willMessage;
 11 
 12     public BrokerHandler(int keepAliveMax) {
 13 
 14         this.keepAliveMax = keepAliveMax;
 15     }
 16 
 17     @Override
 18     @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
 19     protected void channelRead0(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
 20 
 21         if (msg.decoderResult().isFailure()) {
 22 
 23             Throwable cause = msg.decoderResult().cause();
 24 
 25             if (cause instanceof MqttUnacceptableProtocolVersionException) {
 26 
 27                 BrokerSessionHelper.sendMessage(
 28                         ctx,
 29                         MqttMessageFactory.newMessage(
 30                                 new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
 31                                 new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, false),
 32                                 null),
 33                         "INVALID",
 34                         null,
 35                         true);
 36 
 37             } else if (cause instanceof MqttIdentifierRejectedException) {
 38 
 39                 BrokerSessionHelper.sendMessage(
 40                         ctx,
 41                         MqttMessageFactory.newMessage(
 42                                 new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
 43                                 new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false),
 44                                 null),
 45                         "INVALID",
 46                         null,
 47                         true);
 48             }
 49 
 50             ctx.close();
 51 
 52             return;
 53         }
 54 
 55         switch (msg.fixedHeader().messageType()) {
 56             case CONNECT:
 57                 onConnect(ctx, (MqttConnectMessage) msg);
 58                 break;
 59             case PUBLISH:
 60                 onPublish(ctx, (MqttPublishMessage) msg);
 61                 break;
 62             case PUBACK:
 63                 onPubAck(ctx, msg);
 64                 break;
 65             case PUBREC:
 66                 onPubRec(ctx, msg);
 67                 break;
 68             case PUBREL:
 69                 onPubRel(ctx, msg);
 70                 break;
 71             case PUBCOMP:
 72                 onPubComp(ctx, msg);
 73                 break;
 74             case SUBSCRIBE:
 75                 onSubscribe(ctx, (MqttSubscribeMessage) msg);
 76                 break;
 77             case UNSUBSCRIBE:
 78                 onUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);
 79                 break;
 80             case PINGREQ:
 81                 onPingReq(ctx);
 82                 break;
 83             case DISCONNECT:
 84                 onDisconnect(ctx);
 85                 break;
 86         }
 87 
 88     }
 89 
 90     private void onConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
 91 
 92         this.version = MqttVersion.fromProtocolNameAndLevel(msg.variableHeader().name(), (byte) msg.variableHeader().version());
 93         this.clientId = msg.payload().clientIdentifier();
 94         this.cleanSession = msg.variableHeader().isCleanSession();
 95 
 96         if (msg.variableHeader().keepAliveTimeSeconds() > 0 && msg.variableHeader().keepAliveTimeSeconds() <= this.keepAliveMax) {
 97             this.keepAlive = msg.variableHeader().keepAliveTimeSeconds();
 98         }
 99 
100         //MQTT 3.1以後可能存在爲空的客戶ID。因此要進行處理。若是客戶ID是空,並且還在保存處理相關的信息。這樣子是不行。
101         //必須有客戶ID咱們才能存保相關信息。
102         if (StringUtils.isBlank(this.clientId)) {
103             if (!this.cleanSession) {
104 
105                 BrokerSessionHelper.sendMessage(
106                         ctx,
107                         MqttMessageFactory.newMessage(
108                                 new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
109                                 new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false),
110                                 null),
111                         "INVALID",
112                         null,
113                         true);
114 
115                 ctx.close();
116 
117                 return;
118 
119             } else {
120                 this.clientId =  java.util.UUID.randomUUID().toString();
121             }
122         }
123 
124         //有可能發送倆次的鏈接包。若是已經存在鏈接就是關閉當前的鏈接。
125         if (this.connected) {
126             ctx.close();
127             return;
128         }
129 
130 
131         boolean userNameFlag = msg.variableHeader().hasUserName();
132         boolean passwordFlag = msg.variableHeader().hasPassword();
133         this.userName = msg.payload().userName();
134 
135         String password = "" ;
136         if( msg.payload().passwordInBytes() != null  && msg.payload().passwordInBytes().length > 0)
137             password =   new String(msg.payload().passwordInBytes());
138 
139         boolean mistake = false;
140 
141         //若是有用戶名標示,那麼就必須有密碼標示。
142         //當有用戶名標的時候,用戶不能爲空。
143         //當有密碼標示的時候,密碼不能爲空。
144         if (userNameFlag) {
145             if (StringUtils.isBlank(this.userName))
146                 mistake = true;
147         } else {
148             if (StringUtils.isNotBlank(this.userName) || passwordFlag) mistake = true;
149         }
150 
151 
152         if (passwordFlag) {
153 
154             if (StringUtils.isBlank(password)) mistake = true;
155         } else {
156             if (StringUtils.isNotBlank(password)) mistake = true;
157         }
158 
159         if (mistake) {
160             BrokerSessionHelper.sendMessage(
161                     ctx,
162                     MqttMessageFactory.newMessage(
163                             new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
164                             new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false),
165                             null),
166                     this.clientId,
167                     null,
168                     true);
169             ctx.close();
170             return;
171         }
172 
173         BrokerSessionHelper.sendMessage(
174                 ctx,
175                 MqttMessageFactory.newMessage(
176                         new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
177                         new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, !this.cleanSession),
178                         null),
179                 this.clientId,
180                 null,
181                 true);
182 
183         ChannelHandlerContext lastSession = BrokerSessionHelper.removeSession(this.clientId);
184         if (lastSession != null) {
185             lastSession.close();
186         }
187 
188         String willTopic = msg.payload().willTopic();
189         String willMessage = "";
190         if(msg.payload().willMessageInBytes() != null && msg.payload().willMessageInBytes().length > 0)
191             willMessage =  new String(msg.payload().willMessageInBytes());
192 
193         if (msg.variableHeader().isWillFlag() && StringUtils.isNotEmpty(willTopic) && StringUtils.isNotEmpty(willMessage)) {
194 
195             this.willMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
196                     new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.valueOf(msg.variableHeader().willQos()), msg.variableHeader().isWillRetain(), 0),
197                     new MqttPublishVariableHeader(willTopic, 0),
198                     Unpooled.wrappedBuffer(willMessage.getBytes())
199             );
200         }
201 
202         this.connected = true;
203         BrokerSessionHelper.saveSession(this.clientId, ctx);
204     }
205 
206     private void onSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) {
207     }
208     
209     private void onUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage msg) {
210     }
211 
212     private void onPingReq(ChannelHandlerContext ctx) {
213     }
214 
215     private void onDisconnect(ChannelHandlerContext ctx) {
216 
217         if (!this.connected) {
218             ctx.close();
219             return;
220         }
221 
222         BrokerSessionHelper.removeSession(this.clientId, ctx);
223 
224         this.willMessage = null;
225 
226         this.connected = false;
227 
228         ctx.close();
229 
230     }
231 
232     private void onPubComp(ChannelHandlerContext ctx, MqttMessage msg) {
233 
234     }
235     
236     private void onPubRel(ChannelHandlerContext ctx, MqttMessage msg) {
237     }
238 
239     private void onPubRec(ChannelHandlerContext ctx, MqttMessage msg) {
240     }
241     
242     private void onPubAck(ChannelHandlerContext ctx, MqttMessage msg) {
243     }
244 
245     private void onPublish(ChannelHandlerContext ctx, MqttPublishMessage msg) {
246     }
247 }
BrokerHandler類

 19 行中的channelRead0方法中有倆個參數。一個爲ChannelHandlerContext(通首的上下文)。一個是MqttMessage(客戶端來的MQTT報文)。咱們接下來動做都是跟MqttMessage來作相關的邏輯處理。這一點從55行就能夠看出來。咱們能夠判斷他是什麼類型的報文。筆者這裏只實現鏈接報文的處理。21行的代碼msg.decoderResult().isFailure()是用來判斷傳過來的報文是否是正確的。事實上是Netty框架幫咱們作了第一層的驗證。23行就是得到發生的異常。

從第99行onConnect方法開始就是處理鏈接報文的處理。筆者這裏只作下面相關的處理。

1.驗證保持鏈接(Keep Alive)的有效性。代碼以下

1      if (msg.variableHeader().keepAliveTimeSeconds() > 0 && msg.variableHeader().keepAliveTimeSeconds() <= this.keepAliveMax) {
2             this.keepAlive = msg.variableHeader().keepAliveTimeSeconds();
3         }

2.驗證客戶ID爲空的時候,還要求保存會話狀。這是不合理的。由於個人會話狀態是跟根客戶ID來保存。不然的話,隨更給一個。反正後面仍是清除會話狀態。那麼爲何會有空的呢?主要是在MQTT 3.1.1裏面指出客戶ID能夠爲空了。

 1  if (StringUtils.isBlank(this.clientId)) {
 2             if (!this.cleanSession) {
 3 
 4                 BrokerSessionHelper.sendMessage(
 5                         ctx,
 6                         MqttMessageFactory.newMessage(
 7                                 new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
 8                                 new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false),
 9                                 null),
10                         "INVALID",
11                         null,
12                         true);
13 
14                 ctx.close();
15 
16                 return;
17 
18             } else {
19                 this.clientId =  java.util.UUID.randomUUID().toString();
20             }
21         }

3.判斷是不是第二次鏈接報文。若是是的話,就要斷開了。

1  if (this.connected) {
2             ctx.close();
3             return;
4         }

4.判斷用戶和密碼是否合法性。好比上一章出講到的只有在用戶名標誌爲1的時候,密碼才能夠出現。

   boolean userNameFlag = msg.variableHeader().hasUserName();
        boolean passwordFlag = msg.variableHeader().hasPassword();
        this.userName = msg.payload().userName();

        String password = "" ;
        if( msg.payload().passwordInBytes() != null  && msg.payload().passwordInBytes().length > 0)
            password =   new String(msg.payload().passwordInBytes());

        boolean mistake = false;

        //若是有用戶名標示,那麼就必須有密碼標示。
        //當有用戶名標的時候,用戶不能爲空。
        //當有密碼標示的時候,密碼不能爲空。
        if (userNameFlag) {
            if (StringUtils.isBlank(this.userName))
                mistake = true;
        } else {
            if (StringUtils.isNotBlank(this.userName) || passwordFlag) mistake = true;
        }


        if (passwordFlag) {

            if (StringUtils.isBlank(password)) mistake = true;
        } else {
            if (StringUtils.isNotBlank(password)) mistake = true;
        }

        if (mistake) {
            BrokerSessionHelper.sendMessage(
                    ctx,
                    MqttMessageFactory.newMessage(
                            new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
                            new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false),
                            null),
                    this.clientId,
                    null,
                    true);
            ctx.close();
            return;
        }

6.接受客戶端了。事實上筆者還有不少沒有作的事情。好比保存會狀態的處理。由於主要是爲學習因此就沒有講出來。在加上會話狀態存保就要思考保存在哪裏。同時還有一個就是用戶的合法性驗證沒有處理。

1   BrokerSessionHelper.sendMessage(
2                 ctx,
3                 MqttMessageFactory.newMessage(
4                         new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0),
5                         new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, !this.cleanSession),
6                         null),
7                 this.clientId,
8                 null,
9                 true);

7.處理當前報文的遺囑。

 1    String willTopic = msg.payload().willTopic();
 2         String willMessage = "";
 3         if(msg.payload().willMessageInBytes() != null && msg.payload().willMessageInBytes().length > 0)
 4             willMessage =  new String(msg.payload().willMessageInBytes());
 5 
 6         if (msg.variableHeader().isWillFlag() && StringUtils.isNotEmpty(willTopic) && StringUtils.isNotEmpty(willMessage)) {
 7 
 8             this.willMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
 9                     new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.valueOf(msg.variableHeader().willQos()), msg.variableHeader().isWillRetain(), 0),
10                     new MqttPublishVariableHeader(willTopic, 0),
11                     Unpooled.wrappedBuffer(willMessage.getBytes())
12             );
13         }

 若是你看到這個類的最後代碼的時候,會發現筆者也寫了相關的ACNNACK響應。他的內容比較簡單。你們看代碼吧。

 1 private void onDisconnect(ChannelHandlerContext ctx) {
 2 
 3         if (!this.connected) {
 4             ctx.close();
 5             return;
 6         }
 7 
 8         BrokerSessionHelper.removeSession(this.clientId, ctx);
 9 
10         this.willMessage = null;
11 
12         this.connected = false;
13 
14         ctx.close();
15 
16     }

 

BrokerSessionHelper類的源碼

public class BrokerSessionHelper {

    private static final Map<String, ChannelHandlerContext> sessionRepository = new ConcurrentHashMap<>();

    public static void saveSession(String clientId, ChannelHandlerContext session) {
        sessionRepository.put(clientId, session);
    }


    public static ChannelHandlerContext getSession(String clientId) {

        return sessionRepository.get(clientId);
    }

    public static ChannelHandlerContext removeSession(String clientId) {

        return sessionRepository.remove(clientId);
    }

    public  static boolean removeSession(String clientId, ChannelHandlerContext session) {
        return sessionRepository.remove(clientId, session);
    }

    /**
     * 發送信息
     *
     * @param msg
     * @param clientId
     * @param packetId
     * @param flush
     */
    public static void sendMessage(MqttMessage msg, String clientId, Integer packetId, boolean flush) {
        ChannelHandlerContext ctx = getSession(clientId);
        if (ctx == null) {
            String pid = packetId == null || packetId <= 0 ? "" : String.valueOf(packetId);
            return;
        }
        sendMessage(ctx, msg, clientId, packetId, flush);
    }


    /**
     * 發送信息
     *
     * @param ctx
     * @param msg
     * @param clientId
     * @param packetId
     * @param flush
     */
    public static void sendMessage(ChannelHandlerContext ctx, MqttMessage msg, String clientId, Integer packetId, boolean flush) {
        String pid = packetId == null || packetId <= 0 ? "" : String.valueOf(packetId);
        ChannelFuture future = flush ? ctx.writeAndFlush(msg) : ctx.write(msg);
        future.addListener(f -> {
            if (f.isSuccess()) {

            } else {

            }
        });
    }
}
View Code

BrokerSessionHelper類就是用於存放當前服務器上相關通道信息。同時用於發送返回的相關報文。讀者們能夠進行看代碼吧。

這個時候就大家只按照之前面講的去作。就能夠抓到報文了。客戶端的話。筆者只用前面說的MQTTLens來測試。

相關文章
相關標籤/搜索