前言java
對於應用層通信協議而言,目前流行的協議雖然能夠很好地支持業務的快速迭代,可是不能否認存在安全性、可拓展性等問題。在消息隊列或者微服務框架中,利用自定義協議提升通信效率很常見的現象。是否你也曾想自定義協議但無從入手而苦惱,跟着小棧一塊兒動手實現一個自定義協議吧!安全
請求響應協議設計bash
通用設計:考慮多協議通信,利用版本號以及協議類型使得協議能夠平滑引入新協議拓展;已有的協議升級則利用協議版本供拓展。
服務器
請求-響應設計:區分請求或響應類型,引入消息類型標識REQUEST(byte)0、RESPONCE(byte)1;消息主體需利用編碼器序列化框架進行編碼解碼,引入編碼類型;在請求響應過程當中,利用消息id做爲惟一標識,利用超時時間來檢測服務器處理時效,超時直接不返回;引入body_length來標識消息主體的長度,使得協議可變長。app
協議處理實現框架
編碼ide
在UniqueEncoder構建可變儲存ChannelBuffer,讀取協議類型並在協議工廠查找協議對業務對象編碼。函數
public class UniqueEncoder extends OneToOneEncoder {
@Override
protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
ChannelBuffer channelBuffer = ChannelBuffers.dynamicBuffer();
int type = 0;
if (msg instanceof ResponceWrapper) {
type = ((ResponceWrapper) msg).getProtocolType();
} else if (msg instanceof RequestWrapper) {
type = ((RequestWrapper) msg).getProtocolType();
}
return ProtocolFactory.getProtocol(type).encode(msg, channelBuffer);
}
}複製代碼
那編碼究竟是怎麼處理的呢?微服務
能夠清晰看到,ProcessorProtocol類前面明確地說明了協議頭、協議主體的結構信息,其實跟剛開始貼的圖是一致的。其實這個方即可以清晰地根聽說明對業務編碼,能夠養成這個好的習慣。編碼過程先校驗業務對象類型,是請求 OR 響應類型,並調用ChannelBuffer將協議相關信息依次寫入,再也不贅述。能夠看到寫入長度是5 * 1B + 3 * 4B + body主體。ui
/** * PROTOCOL HEADER * VERSION (1B) version * TYPE (1B) type * PROCESSOR PROTOCOL * VERSION (1B) * TYPE (1B) REQUEST RESPONCE * CODEC (1B) serialize/deserialize * ID (4B) msg id * TIMEOUT (4B) timeout * BODYLENGHT (4B) body length * BODY CONTEXT (BODYLENGTH) * @author duxuan */
public class ProcessorProtocol implements Protocol {
private final int type = 1;
private final int version = 1;
private final int CUSTOMPROTOCOL_HEADER_LENGTH = 3 * 1 + 3 * 4;
public static final byte REQUEST = (byte) 1;
public static final byte RESPONCE = (byte) 0;
/** * @param msg 消息實體 * @param channelBuffer 接收編碼數據 * @return * @throws Exception */
@Override
public ChannelBuffer encode(Object msg, ChannelBuffer channelBuffer) throws Exception {
if (!(msg instanceof RequestWrapper ||
msg instanceof ResponceWrapper)) {
throw new Exception();
}
int processType = REQUEST;
int codec = 0;
int id = 0;
int timeout = 0;
int bodyLength = 0;
byte[] body = new byte[0];
if (msg instanceof RequestWrapper) {
RequestWrapper requestWrapper = (RequestWrapper) msg;
processType = REQUEST;
codec = requestWrapper.getCodecType();
id = requestWrapper.getId();
timeout = requestWrapper.getTimeout();
body = Codecs.getEncoder(codec).encode(requestWrapper.getMsg());
} else if (msg instanceof ResponceWrapper) {
processType = RESPONCE;
ResponceWrapper responceWrapper = (ResponceWrapper) msg;
codec = responceWrapper.getCodecType();
id = responceWrapper.getId();
body = Codecs.getEncoder(codec).encode(responceWrapper.getBody());
}
bodyLength = body.length;
/** * 5 * 1B */
// default version
channelBuffer.writeByte(1);
channelBuffer.writeByte(type);
channelBuffer.writeByte(version);
channelBuffer.writeByte(processType);
channelBuffer.writeByte(codec);
/** * 3 * 4B */
channelBuffer.writeInt(id);
channelBuffer.writeInt(timeout);
channelBuffer.writeInt(bodyLength);
/** * body */
channelBuffer.writeBytes(body);
return channelBuffer;
}
}複製代碼
解碼
利用FrameDecoder實現無感知粘包拆包處理,而UniqueDecoder只須要負責協議解析便可,如解析失敗,則重置讀指針originPos;解析成功則返回已解析對象。
(粘包拆包如何處理?未了解的同窗可點擊查看)
public class UniqueDecoder extends FrameDecoder {
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
final int originPos = buffer.readerIndex();
if (buffer.readableBytes() < 2) {
return null;
}
int version = buffer.readByte();
if (version == 1) {
int type = buffer.readByte();
// 協議支持 協議解析+body
Protocol protocol = ProtocolFactory.getProtocol(type);
if (protocol == null) {
throw new Exception("UnSupport protocol");
}
// reset
protocol.decode(buffer, null, originPos);
} else {
throw new Exception("UnSupport version:" + version);
}
return null;
}
}複製代碼
能夠看到在UniqueDecoder中讀取通信版本version、協議類型type,並查找已註冊到協議工廠的協議來負責解析,若是查找不到,則拋出異常。
/**
* PROTOCOL HEADER
* VERSION (1B) version
* TYPE (1B) type
* PROCESSOR PROTOCOL
* VERSION (1B)
* TYPE (1B) REQUEST RESPONCE
* CODEC (1B) serialize/deserialize
* ID (4B) msg id
* TIMEOUT (4B) timeout
* BODYLENGHT (4B) body length
* BODY CONTEXT (BODYLENGTH)
* @author duxuan
*/
public class ProcessorProtocol implements Protocol {
private final int type = 1;
private final int version = 1;
private final int CUSTOMPROTOCOL_HEADER_LENGTH = 3 * 1 + 3 * 4;
public static final byte REQUEST = (byte) 1;
public static final byte RESPONCE = (byte) 0;
/**
* @param channelBuffer 已讀取過通信版本以及協議類型(2B)
* @param errorObject 設置解碼失敗返回的類型
* @param originPos 已記錄解碼前的readerIndex,用於讀取失敗重置
* @return * @throws Exception
*/
@Override
public Object decode(ChannelBuffer channelBuffer, Object errorObject, final int originPos) throws Exception {
if (channelBuffer.readableBytes() < CUSTOMPROTOCOL_HEADER_LENGTH) {
channelBuffer.readerIndex(originPos);
return errorObject;
}
int version = channelBuffer.readByte();
if (version == 1) {
int type = channelBuffer.readByte();
int codec = channelBuffer.readByte();
int msgId = channelBuffer.readInt();
int timeout = channelBuffer.readInt();
int bodyLength = channelBuffer.readInt();
byte[] body = new byte[bodyLength];
channelBuffer.readBytes(body);
// decode
Decoder decoder = Codecs.getDecoder(codec);
if (decoder == null) {
throw new Exception("could not support codec decoder");
}
if (type == REQUEST) {
RequestWrapper requestWrapper = new RequestWrapper(body, msgId, timeout, codec, type);
return requestWrapper;
} else if (type == RESPONCE) {
ResponceWrapper responceWrapper = new ResponceWrapper(body, msgId, codec, type);
return responceWrapper;
}
}else {
throw new Exception("could not support processorProtocol version");
}
return null;
}
} 複製代碼
解碼時校驗可讀字節數是否小於協議頭長度,若是小於,重置readerIndex;不然依次讀取協議版本、請求或者響應類型、編碼類型、消息id、超時時間、消息主體長度、消息主體,並根據編碼類型調用序列化框架將消息解碼成業務對象供業務處理器使用。
經常使用序列化框架介紹
Kryo | 速度快,序列化後體積小 | 跨語言支持較複雜 |
Hessian | 默認支持跨語言 | 較慢 |
Protostuff | 速度快,基於protobuf | 需靜態編譯 |
Protostuff-Runtime | 無需靜態編譯,但序列化前需預先傳入schema | 不支持無默認構造函數的類,反序列化時需用戶本身初始化序列化後的對象,其只負責將該對象進行賦值 |
Java | 使用方便,可序列化全部類 | 速度慢,佔空間 |
序列化處理能夠根據須要實現並註冊到協議編解碼器便可!
能夠掃描關注路上小棧或者加wx(arron1126912882)備註掘金歡迎交流!