MQTT協議筆記之mqtt.io項目TCP協議支持

前言

MQTT定義了物聯網傳輸協議,其標準傾向於原始TCP實現。構建於TCP的上層協議堆棧,諸如HTTP等,在空間上多了一些處理路徑,稍微耗費了CPU和內存,雖看似微乎其微,但對不少處理能力不足的嵌入式設備而言,選擇原始的TCP倒是最好的選擇。java

但單純TCP不是全部物件聯網的最佳選擇,提供構建與TCP基礎之上的傳統的HTTP通訊支持,尤爲是瀏覽器、性能富裕的桌面涉及領域,仍是企業最 可信賴、最可控的傳輸方式之一。支持多種多樣的鏈接通道,讓目前全部一切皆可聯網,除了原始TCP Socket,還要支持構建於其之上的HTTP、HTML5 Websocket,就頗有必要。git

mqtt.io,Pub/Sub中間件,也能夠稱之爲推送服務器,涵蓋全部主流桌面系統、瀏覽器平臺,而且傾斜 於移動互聯網,以及物聯網的廣闊適應天地。使用一句英文歸納可能更爲合適:"Make everything connect」,讓全部物件均可鏈接。其業務目標,可用下圖歸納:github

mqtt.io致力於作下一代支持全部主流桌面平臺、全部主流瀏覽器、全部可聯網物件均可以聯網的PUB/SUB消息推送系統。瀏覽器

構建此係統,在於下降傳統企業各自分散的推送系統,統一運營,統一管理,節省人員、運維開支。服務器

注意事項

  1. mqtt.io是一個項目名稱,沒有官網,http://www.mqtt.io,和這個項目沒有一毛錢關係。
  2. 項目地址:https://github.com/yongboy/mqtt.io
  3. 項目名稱啓發於 http://socket.io http://netty.io 等知名framework。
  4. 目前只實現QoS 0基本特性,實現概覽,後期會根據反饋,作出一些調整

依賴

  1. netty 4,目前JAVA IO界明星
  2. mqtt-library 二進制和MQTT對象的轉換,這種苦活累活都是它來作,真心讓人喜歡。

數據流轉

解碼器

用於轉換二進制流到JAVA對象的過程:app

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
package io.mqtt.handler.coder;
 
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
 
import java.io.ByteArrayInputStream;
import java.util.List;
 
import org.meqantt.message.Message;
import org.meqantt.message.MessageInputStream;
 
public class MqttMessageNewDecoder extends MessageToMessageDecoder<ByteBuf> {
 
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf buf,
List<Object> out) throws Exception {
if (buf.readableBytes() < 2) {
return;
}
buf .markReaderIndex();
buf .readByte(); // read away header
int msgLength = 0;
int multiplier = 1;
int digit;
int lengthSize = 0;
do {
lengthSize ++;
digit = buf.readByte();
msgLength += (digit & 0x7f) * multiplier;
multiplier *= 128;
if ((digit & 0x80) > 0 && !buf.isReadable()) {
buf .resetReaderIndex();
return;
}
} while ((digit & 0x80) > 0);
if (buf.readableBytes() < msgLength) {
buf .resetReaderIndex();
return;
}
byte[] data = new byte[1 + lengthSize + msgLength];
buf .resetReaderIndex();
buf .readBytes(data);
MessageInputStream mis = new MessageInputStream(
new ByteArrayInputStream(data));
Message msg = mis.readMessage();
mis .close();
 
out .add(msg);
}
}

 

編碼器

對全部要寫入網卡緩衝區的JAVA對象轉換成二進制:運維

12345678910111213141516171819202122232425
package io.mqtt.handler.coder;
 
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
 
import java.util.List;
 
import org.meqantt.message.Message;
 
@ Sharable
public class MqttMessageNewEncoder extends MessageToMessageEncoder<Object> {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg,
List<Object> out) throws Exception {
if (!(msg instanceof Message)) {
return;
}
 
byte[] data = ((Message) msg).toBytes();
 
out .add(Unpooled.wrappedBuffer(data));
}
}

 

藉助於mqtt-library項目,編解碼不復雜。socket

MQTT的消息處理

 

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
package io.mqtt.handler;
 
import io.mqtt.processer.ConnectProcesser;
import io.mqtt.processer.DisConnectProcesser;
import io.mqtt.processer.PingReqProcesser;
import io.mqtt.processer.Processer;
import io.mqtt.processer.PublishProcesser;
import io.mqtt.processer.SubscribeProcesser;
import io.mqtt.processer.UnsubscribeProcesser;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.ReadTimeoutException;
 
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
 
import org.meqantt.message.ConnAckMessage;
import org.meqantt.message.ConnAckMessage.ConnectionStatus;
import org.meqantt.message.DisconnectMessage;
import org.meqantt.message.Message;
import org.meqantt.message.Message.Type;
import org.meqantt.message.PingRespMessage;
 
public class MqttMessageHandler extends ChannelInboundHandlerAdapter {
private static PingRespMessage PINGRESP = new PingRespMessage();
 
private static final Map<Message.Type, Processer> processers;
static {
Map<Message.Type, Processer> map = new HashMap<Message.Type, Processer>(
6);
 
map .put(Type.CONNECT, new ConnectProcesser());
map .put(Type.PUBLISH, new PublishProcesser());
map .put(Type.SUBSCRIBE, new SubscribeProcesser());
map .put(Type.UNSUBSCRIBE, new UnsubscribeProcesser());
map .put(Type.PINGREQ, new PingReqProcesser());
map .put(Type.DISCONNECT, new DisConnectProcesser());
 
processers = Collections.unmodifiableMap(map);
}
 
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable e)
throws Exception {
try {
if (e.getCause() instanceof ReadTimeoutException) {
ctx .write(PINGRESP).addListener(
ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
ctx .channel().close();
}
} catch (Throwable t) {
t .printStackTrace();
ctx .channel().close();
}
 
e .printStackTrace();
}
 
@Override
public void channelRead(ChannelHandlerContext ctx, Object obj)
throws Exception {
Message msg = (Message) obj;
Processer p = processers.get(msg.getType());
if (p == null) {
return;
}
Message rmsg = p.proc(msg, ctx);
if (rmsg == null) {
return;
}
 
if (rmsg instanceof ConnAckMessage
&& ((ConnAckMessage) rmsg).getStatus() != ConnectionStatus.ACCEPTED) {
ctx .write(rmsg).addListener(ChannelFutureListener.CLOSE);
} else if (rmsg instanceof DisconnectMessage) {
ctx .write(rmsg).addListener(ChannelFutureListener.CLOSE);
} else {
ctx .write(rmsg).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
}
 
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx .flush();
}
}
view raw MqttMessageHandler.java hosted with ❤ by  GitHub

 

更具體的能夠查看項目。ide

小結

簡單介紹了一個簡單的不能再簡單的MQTT Server,只具備最基本的QoS 0類型的消息訂閱等。性能

後面,對HTML 5 Websocket,會在現有基礎代碼之上,不作多大改動,增長對MQTT Over WebSocket的支持。

相關文章
相關標籤/搜索