因爲現代軟件的複雜性,一個大型軟件系統每每會被人爲地拆分稱爲多個模塊,另外隨着移動互聯網的興起,網站的規模愈來愈大,業務功能愈來愈多,每每須要集羣和分佈式部署。模塊之間的通訊就須要進行跨節點通訊。
傳統的Java應用中節點通訊的經常使用方式:java
下面使用Netty設計私有協議node
除了鏈路層的物理鏈接外,還須要對請求和響應消息進行編解碼。 在請求和應答以外,還須要控制和管理類指令,例如鏈路創建的握手信息,鏈路檢測的心跳信息。這些功能組合到一塊兒後,就會造成私有協議。apache
具體步驟:bootstrap
相似於http協議,消息分爲消息頭和消息體。其中消息體是一個Object類型,消息頭則以下所示:數組
名稱 | 類型 | 長度 | 描述 |
length | 整型 int | 32 | 消息長度,整個消息,包括消息頭和消息體 |
sessionId | 長整型long | 64 | 集羣節點內全局惟一,由會話ID生成器生成 |
type | Byte | 8 | 0: 表示請求消息緩存 1: 業務響應消息安全 2: 業務ONE WAY消息(便是請求又是響應消息)服務器 3: 握手請求消息網絡 4: 握手應答消息session 5: 心跳請求消息 6: 心跳應答消息 |
priority | Byte | 8 | 消息優先級: 0-255 |
attachment | Map<String,Object> | 變長 | 可選字段,用於擴展消息頭 |
編碼規範:
(1) crcCode: java.nio.ByteBuffer.putInt(int value),若是採用其它緩存區實現,必須與其等價
(2) length: java.nio.ByteBuffer.putInt(int value),若是採用其它緩衝區實現,必須與其等價
(3) sessionID: java.nio.ByteBuffer.putLong(long value),若是採用其它緩衝區實現,必須與其等價
(4) type: java.nio.ByteBuffer.put(byte b),若是採用其它緩衝區實現,必須與其等價
(5) priority: java.nio.ByteBuffer.put(byte b),若是採用其它緩衝區實現,必須與其等價
(6) attachment: 若是長度爲0,表示沒有可選附件,則將長度編碼爲0,即java.nio.ByteBuffer.putInt(0),若是大於0,表示有附件須要編碼,具體規則以下:
首先對附件的個數進行編碼,java.nio.ByteBuffer.putInt(attachment.size());
而後對Key進行編碼,先編碼長度,而後再將它轉換成byte數組以後編碼內容,具體代碼以下:
String key = null; byte[] value = null; for (Map.Entry<String, Object> param: attachment:entrySet()) { key = param.getKey(); buffer.writeString(key); value = marshaller.writeObject(param.getValue()); buffer.writeBinary(value); } key = null; value = null;
(7) body的編碼: 經過JBoss Marshalling將其序列化爲byte數組,而後調用java.nio.ByteBuffer.put(byte[] src);將其寫入ByteBuffer緩衝區中。
在全部的內容都編碼完成以後更新消息頭的length字段。
解碼規範:
(1) crcCode: java.nio.ByteBuffer.getInt()獲取校驗碼字段,若是採用其它緩存區實現,必須與其等價
(2) length: java.nio.ByteBuffer.getInt()獲取Netty消息的長度,若是採用其它緩衝區實現,必須與其等價
(3) sessionID: java.nio.ByteBuffer.getLong()獲取會話ID,若是採用其它緩衝區實現,必須與其等價
(4) type: java.nio.ByteBuffer.get()獲取消息類型,若是採用其它緩衝區實現,必須與其等價
(5) priority: java.nio.ByteBuffer.get()獲取消息優先級,若是採用其它緩衝區實現,必須與其等價
(6) attachment: 它的解碼規則爲-首先建立一個新的attachment對象,調用java.nio.ByteBuffer.getInt()獲取附件的長度,若是爲0,說明附件爲空,解碼結束,解析解消息體,不然,根據長度經過for循環進行解碼。
(7) body: 使用JBoss marshaller對其進行解碼
不區分客戶端和服務端:若是A節點須要B節點的服務,可是A和B之間尚未創建物理鏈路,則由調用方主動發起鏈接,此時調用方爲客戶端,被調用方爲服務端。
使用簡單的黑白名單進行認證,實際環境中,應該使用密鑰,用戶名密碼等方式。
客戶端發送請求消息:
服務端接收到握手請求消息,若是IP校驗經過,返回握手成功應答給客戶端,應用層鏈路創建成功。握手應答消息:
鏈路成功創建後,客戶端和服務端就能夠相互發送業務消息了。
因爲採用長鏈接通訊,正常的業務運行期間,雙方經過心跳和業務消息維持鏈路,任何一方不須要主動關閉鏈接。
可是,在如下狀況下,客戶端和服務端須要關閉鏈接。
(1) 當對方宕機或者重啓時,會主動釋放鏈路,另外一方讀取到操做系統的通知信號,獲得對方REST鏈路,須要關閉鏈接,釋放自身的句柄等資源。因爲採用TCP全雙工通訊,通訊雙方都須要關閉鏈接,釋放資源;
(2) 消息在讀寫過程當中,發生了I/O異常,須要主動關閉鏈接;
(3) 心跳消息讀寫過程當中發生了I/O異常,須要主動關閉鏈接;
(4) 心跳超時,須要主動關閉鏈接;
(5) 發生編碼異常等不可恢復的錯誤時,須要主動關閉鏈接;
網絡環境是惡劣的。意外沒法避免,須要在出現意外的時候正常工做或者說是恢復,須要可靠性設計的保證。
在凌晨等業務低谷期,若是發生網絡閃斷、鏈接被Hang住等網絡問題,因爲沒有業務消息,應用進程很難發現。到了白天業務高峯期,會發生大量的網絡通訊失敗,嚴重的會致使一段時間進程內沒法處理業務消息。
爲了解決這個問題,在網絡空閒的時候採用心跳機制來檢測鏈路的互通性,一旦發現了網絡故障,當即關閉鏈路,主動重連。
設計思路:
若是鏈路中斷,等待INTERVAL時間後,由客戶端發起重連操做,若是重連失敗,間隔週期INTERVAL以後再繼續重連。
不管什麼場景下的重連失敗,客戶端必須保證自身資源被成功及時釋放
重連失敗,須要記錄異常堆棧信息,方便問題定位。
客戶端握手成功以後,鏈路處於正常狀態下,不容許客戶端重複登陸,以防止客戶端在異常狀態下反覆重連致使句柄資源被耗盡。
server在接收到握手消息後,首先進行ip合法性校驗,若是成功,則在緩存的地址表中查看客戶端是否已經登陸,若是已經登陸,則拒絕重複登陸,返回錯誤碼-1,同時關閉鏈路,而且在服務端日誌中打印錯誤信息。
爲了防止由服務端和客戶端對鏈路狀態理解不一致的問題,當服務端連續N次心跳超時以後須要主動關閉鏈路,同時清空該客戶端的緩存信息,保證後續的客戶端能夠重連。
不管是客戶端仍是服務端,在發生鏈路中斷以後,恢復鏈路以前,緩存在消息隊列的待發送的消息不能丟失。同時考慮到內存溢出風險,應該在消息緩存隊列中設置上限。
Netty協議棧須要具有必定的擴展能力,例如統一的消息攔截、接口日誌、安全、加密解密等能夠被方便地添加和刪除,推薦使用Servelt的FilterChain機制,考慮到性能因素,不推薦AOP。
無論心跳消息、握手請求和握手應答消息均可以用NettyMessage來定義,只是type不一樣而已。
import java.util.HashMap; import java.util.Map; /** * @author Lilinfeng * @version 1.0 * @date 2014年3月14日 */ public final class Header { private int crcCode = 0xabef0101; private int length;// 消息長度 private long sessionID;// 會話ID private byte type;// 消息類型 private byte priority;// 消息優先級 private Map<String, Object> attachment = new HashMap<String, Object>(); // 附件 /** * @return the crcCode */ public final int getCrcCode() { return crcCode; } /** * @param crcCode the crcCode to set */ public final void setCrcCode(int crcCode) { this.crcCode = crcCode; } /** * @return the length */ public final int getLength() { return length; } /** * @param length the length to set */ public final void setLength(int length) { this.length = length; } /** * @return the sessionID */ public final long getSessionID() { return sessionID; } /** * @param sessionID the sessionID to set */ public final void setSessionID(long sessionID) { this.sessionID = sessionID; } /** * @return the type */ public final byte getType() { return type; } /** * @param type the type to set */ public final void setType(byte type) { this.type = type; } /** * @return the priority */ public final byte getPriority() { return priority; } /** * @param priority the priority to set */ public final void setPriority(byte priority) { this.priority = priority; } /** * @return the attachment */ public final Map<String, Object> getAttachment() { return attachment; } /** * @param attachment the attachment to set */ public final void setAttachment(Map<String, Object> attachment) { this.attachment = attachment; } /* * (non-Javadoc) * * @see java.lang.Object#toString() */ @Override public String toString() { return "Header [crcCode=" + crcCode + ", length=" + length + ", sessionID=" + sessionID + ", type=" + type + ", priority=" + priority + ", attachment=" + attachment + "]"; } }
/** * @author lilinfeng * @version 1.0 * @date 2014年3月14日 */ public final class NettyMessage { private Header header; private Object body; /** * @return the header */ public final Header getHeader() { return header; } /** * @param header the header to set */ public final void setHeader(Header header) { this.header = header; } /** * @return the body */ public final Object getBody() { return body; } /** * @param body the body to set */ public final void setBody(Object body) { this.body = body; } /* * (non-Javadoc) * * @see java.lang.Object#toString() */ @Override public String toString() { return "NettyMessage [header=" + header + "]"; } }
因爲依賴於JBoss Marshalling...,添加maven依賴
<dependency> <groupId>org.jboss.marshalling</groupId> <artifactId>jboss-marshalling</artifactId> <version>1.4.10.Final</version> </dependency> <dependency> <groupId>org.jboss.marshalling</groupId> <artifactId>jboss-marshalling-serial</artifactId> <version>1.4.10.Final</version> </dependency>
import org.jboss.marshalling.*; import java.io.IOException; /** * @author Administrator * @version 1.0 * @date 2014年3月15日 */ public final class MarshallingCodecFactory { /** * 建立Jboss Marshaller * * @return * @throws IOException */ protected static Marshaller buildMarshalling() throws IOException { final MarshallerFactory marshallerFactory = Marshalling .getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); Marshaller marshaller = marshallerFactory .createMarshaller(configuration); return marshaller; } /** * 建立Jboss Unmarshaller * * @return * @throws IOException */ protected static Unmarshaller buildUnMarshalling() throws IOException { final MarshallerFactory marshallerFactory = Marshalling .getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); final Unmarshaller unmarshaller = marshallerFactory .createUnmarshaller(configuration); return unmarshaller; } }
增長JBossMarshalling序列化對象->ByteBuf工具
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandler.Sharable; import org.jboss.marshalling.Marshaller; import java.io.IOException; /** * @author Lilinfeng * @version 1.0 * @date 2014年3月14日 */ @Sharable public class MarshallingEncoder { private static final byte[] LENGTH_PLACEHOLDER = new byte[4]; Marshaller marshaller; public MarshallingEncoder() throws IOException { marshaller = MarshallingCodecFactory.buildMarshalling(); } // 使用marshall對Object進行編碼,而且寫入bytebuf... protected void encode(Object msg, ByteBuf out) throws Exception { try { //1. 獲取寫入位置 int lengthPos = out.writerIndex(); //2. 先寫入4個bytes,用於記錄Object對象編碼後長度 out.writeBytes(LENGTH_PLACEHOLDER); //3. 使用代理對象,防止marshaller寫完以後關閉byte buf ChannelBufferByteOutput output = new ChannelBufferByteOutput(out); //4. 開始使用marshaller往bytebuf中編碼 marshaller.start(output); marshaller.writeObject(msg); //5. 結束編碼 marshaller.finish(); //6. 設置對象長度 out.setInt(lengthPos, out.writerIndex() - lengthPos - 4); } finally { marshaller.close(); } } }
import io.netty.buffer.ByteBuf; import org.jboss.marshalling.ByteOutput; import java.io.IOException; /** * {@link ByteOutput} implementation which writes the data to a {@link ByteBuf} * * */ class ChannelBufferByteOutput implements ByteOutput { private final ByteBuf buffer; /** * Create a new instance which use the given {@link ByteBuf} */ public ChannelBufferByteOutput(ByteBuf buffer) { this.buffer = buffer; } @Override public void close() throws IOException { // Nothing to do } @Override public void flush() throws IOException { // nothing to do } @Override public void write(int b) throws IOException { buffer.writeByte(b); } @Override public void write(byte[] bytes) throws IOException { buffer.writeBytes(bytes); } @Override public void write(byte[] bytes, int srcIndex, int length) throws IOException { buffer.writeBytes(bytes, srcIndex, length); } /** * Return the {@link ByteBuf} which contains the written content * */ ByteBuf getBuffer() { return buffer; } }
增長JBossMarshalling反序列化對象<-ByteBuf工具
import io.netty.buffer.ByteBuf; import org.jboss.marshalling.ByteInput; import org.jboss.marshalling.Unmarshaller; import java.io.IOException; import java.io.StreamCorruptedException; /** * @author Lilinfeng * @version 1.0 * @date 2014年3月14日 */ public class MarshallingDecoder { private final Unmarshaller unmarshaller; /** * Creates a new decoder whose maximum object size is {@code 1048576} bytes. * If the size of the received object is greater than {@code 1048576} bytes, * a {@link StreamCorruptedException} will be raised. * * @throws IOException */ public MarshallingDecoder() throws IOException { unmarshaller = MarshallingCodecFactory.buildUnMarshalling(); } protected Object decode(ByteBuf in) throws Exception { //1. 讀取第一個4bytes,裏面放置的是object對象的byte長度 int objectSize = in.readInt(); ByteBuf buf = in.slice(in.readerIndex(), objectSize); //2 . 使用bytebuf的代理類 ByteInput input = new ChannelBufferByteInput(buf); try { //3. 開始解碼 unmarshaller.start(input); Object obj = unmarshaller.readObject(); unmarshaller.finish(); //4. 讀完以後設置讀取的位置 in.readerIndex(in.readerIndex() + objectSize); return obj; } finally { unmarshaller.close(); } } }
import io.netty.buffer.ByteBuf; import org.jboss.marshalling.ByteInput; import java.io.IOException; /** * {@link ByteInput} implementation which reads its data from a {@link ByteBuf} */ class ChannelBufferByteInput implements ByteInput { private final ByteBuf buffer; public ChannelBufferByteInput(ByteBuf buffer) { this.buffer = buffer; } @Override public void close() throws IOException { // nothing to do } @Override public int available() throws IOException { return buffer.readableBytes(); } @Override public int read() throws IOException { if (buffer.isReadable()) { return buffer.readByte() & 0xff; } return -1; } @Override public int read(byte[] array) throws IOException { return read(array, 0, array.length); } @Override public int read(byte[] dst, int dstIndex, int length) throws IOException { int available = available(); if (available == 0) { return -1; } length = Math.min(available, length); buffer.readBytes(dst, dstIndex, length); return length; } @Override public long skip(long bytes) throws IOException { int readable = buffer.readableBytes(); if (readable < bytes) { bytes = readable; } buffer.readerIndex((int) (buffer.readerIndex() + bytes)); return bytes; } }
下面根據上述所說的進行對消息編解碼:
import demo.protocol.netty.struct.NettyMessage; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import java.io.IOException; import java.util.Map; /** * Created by carl.yu on 2016/12/19. */ public class NettyMessageEncoder extends MessageToByteEncoder<NettyMessage> { MarshallingEncoder marshallingEncoder; public NettyMessageEncoder() throws IOException { this.marshallingEncoder = new MarshallingEncoder(); } @Override protected void encode(ChannelHandlerContext ctx, NettyMessage msg, ByteBuf sendBuf) throws Exception { if (null == msg || null == msg.getHeader()) { throw new Exception("The encode message is null"); } //---寫入crcCode--- sendBuf.writeInt((msg.getHeader().getCrcCode())); //---寫入length--- sendBuf.writeInt((msg.getHeader().getLength())); //---寫入sessionId--- sendBuf.writeLong((msg.getHeader().getSessionID())); //---寫入type--- sendBuf.writeByte((msg.getHeader().getType())); //---寫入priority--- sendBuf.writeByte((msg.getHeader().getPriority())); //---寫入附件大小--- sendBuf.writeInt((msg.getHeader().getAttachment().size())); String key = null; byte[] keyArray = null; Object value = null; for (Map.Entry<String, Object> param : msg.getHeader().getAttachment() .entrySet()) { key = param.getKey(); keyArray = key.getBytes("UTF-8"); sendBuf.writeInt(keyArray.length); sendBuf.writeBytes(keyArray); value = param.getValue(); // marshallingEncoder.encode(value, sendBuf); } // for gc key = null; keyArray = null; value = null; if (msg.getBody() != null) { marshallingEncoder.encode(msg.getBody(), sendBuf); } else sendBuf.writeInt(0); // 以前寫了crcCode 4bytes,除去crcCode和length 8bytes即爲更新以後的字節 sendBuf.setInt(4, sendBuf.readableBytes() - 8); } }
import demo.protocol.netty.struct.Header; import demo.protocol.netty.struct.NettyMessage; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import java.io.IOException; import java.util.HashMap; import java.util.Map; /** * @author Lilinfeng * @version 1.0 * @date 2014年3月15日 */ public class NettyMessageDecoder extends LengthFieldBasedFrameDecoder { MarshallingDecoder marshallingDecoder; public NettyMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) throws IOException { super(maxFrameLength, lengthFieldOffset, lengthFieldLength); marshallingDecoder = new MarshallingDecoder(); } @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { ByteBuf frame = (ByteBuf) super.decode(ctx, in); if (frame == null) { return null; } NettyMessage message = new NettyMessage(); Header header = new Header(); header.setCrcCode(frame.readInt()); header.setLength(frame.readInt()); header.setSessionID(frame.readLong()); header.setType(frame.readByte()); header.setPriority(frame.readByte()); int size = frame.readInt(); if (size > 0) { Map<String, Object> attch = new HashMap<String, Object>(size); int keySize = 0; byte[] keyArray = null; String key = null; for (int i = 0; i < size; i++) { keySize = frame.readInt(); keyArray = new byte[keySize]; frame.readBytes(keyArray); key = new String(keyArray, "UTF-8"); attch.put(key, marshallingDecoder.decode(frame)); } keyArray = null; key = null; header.setAttachment(attch); } if (frame.readableBytes() > 4) { message.setBody(marshallingDecoder.decode(frame)); } message.setHeader(header); return message; } }
關鍵在於解碼器繼承了LengthFieldBasedFrameDecoder,三個參數:
ch.pipeline().addLast( new NettyMessageDecoder(1024 * 1024, 4, 4));
第一個參數:1024*1024: 最大長度
第二個參數: 從第4個bytes開始表示是長度
第三個參數: 有4個bytes的長度表示是長度
Netty的機制大可能是基於Handler鏈。
client端在通道激活時構建login請求:
/** * @author Lilinfeng * @version 1.0 * @date 2014年3月15日 */ public class LoginAuthRespHandler extends ChannelInboundHandlerAdapter { private final static Log LOG = LogFactory.getLog(LoginAuthRespHandler.class); /** * 本地緩存 */ private Map<String, Boolean> nodeCheck = new ConcurrentHashMap<String, Boolean>(); private String[] whitekList = {"127.0.0.1", "192.168.1.104"}; /** * Calls {@link ChannelHandlerContext#fireChannelRead(Object)} to forward to * the next {@link ChannelHandler} in the {@link ChannelPipeline}. * <p> * Sub-classes may override this method to change behavior. */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { NettyMessage message = (NettyMessage) msg; // 若是是握手請求消息,處理,其它消息透傳 if (message.getHeader() != null && message.getHeader().getType() == MessageType.LOGIN_REQ .value()) { String nodeIndex = ctx.channel().remoteAddress().toString(); NettyMessage loginResp = null; // 重複登錄,拒絕 if (nodeCheck.containsKey(nodeIndex)) { loginResp = buildResponse((byte) -1); } else { InetSocketAddress address = (InetSocketAddress) ctx.channel() .remoteAddress(); String ip = address.getAddress().getHostAddress(); boolean isOK = false; for (String WIP : whitekList) { if (WIP.equals(ip)) { isOK = true; break; } } loginResp = isOK ? buildResponse((byte) 0) : buildResponse((byte) -1); if (isOK) nodeCheck.put(nodeIndex, true); } LOG.info("The login response is : " + loginResp + " body [" + loginResp.getBody() + "]"); ctx.writeAndFlush(loginResp); } else { ctx.fireChannelRead(msg); } } private NettyMessage buildResponse(byte result) { NettyMessage message = new NettyMessage(); Header header = new Header(); header.setType(MessageType.LOGIN_RESP.value()); message.setHeader(header); message.setBody(result); return message; } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); nodeCheck.remove(ctx.channel().remoteAddress().toString());// 刪除緩存 ctx.close(); ctx.fireExceptionCaught(cause); } }
server端判斷是不是login請求,並對ip進行驗證:
/** * @author Lilinfeng * @version 1.0 * @date 2014年3月15日 */ public class LoginAuthReqHandler extends ChannelInboundHandlerAdapter { private static final Log LOG = LogFactory.getLog(LoginAuthReqHandler.class); /** * Calls {@link ChannelHandlerContext#fireChannelActive()} to forward to the * next {@link ChannelHandler} in the {@link ChannelPipeline}. * <p/> * Sub-classes may override this method to change behavior. */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(buildLoginReq()); } /** * Calls {@link ChannelHandlerContext#fireChannelRead(Object)} to forward to * the next {@link ChannelHandler} in the {@link ChannelPipeline}. * <p/> * Sub-classes may override this method to change behavior. */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { NettyMessage message = (NettyMessage) msg; // 若是是握手應答消息,須要判斷是否定證成功 if (message.getHeader() != null && message.getHeader().getType() == MessageType.LOGIN_RESP .value()) { byte loginResult = (byte) message.getBody(); if (loginResult != (byte) 0) { // 握手失敗,關閉鏈接 ctx.close(); } else { LOG.info("Login is ok : " + message); ctx.fireChannelRead(msg); } } else //調用下一個channel鏈.. ctx.fireChannelRead(msg); } /** * 構建登陸請求 */ private NettyMessage buildLoginReq() { NettyMessage message = new NettyMessage(); Header header = new Header(); header.setType(MessageType.LOGIN_REQ.value()); message.setHeader(header); return message; } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } }
握手成功以後,由客戶端主動發送心跳消息,服務端接收到心跳消息以後,返回應答,因爲心跳消息的目的是爲了檢測鏈路的可用性,所以不須要攜帶消息體。
/** * @author Lilinfeng * @version 1.0 * @date 2014年3月15日 */ public class HeartBeatReqHandler extends ChannelInboundHandlerAdapter { private static final Log LOG = LogFactory.getLog(HeartBeatReqHandler.class); //使用定時任務發送 private volatile ScheduledFuture<?> heartBeat; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { NettyMessage message = (NettyMessage) msg; // 當握手成功後,Login響應向下透傳,主動發送心跳消息 if (message.getHeader() != null && message.getHeader().getType() == MessageType.LOGIN_RESP .value()) { //NioEventLoop是一個Schedule,所以支持定時器的執行,建立心跳計時器 heartBeat = ctx.executor().scheduleAtFixedRate( new HeartBeatReqHandler.HeartBeatTask(ctx), 0, 5000, TimeUnit.MILLISECONDS); } else if (message.getHeader() != null && message.getHeader().getType() == MessageType.HEARTBEAT_RESP .value()) { LOG.info("Client receive server heart beat message : ---> " + message); } else ctx.fireChannelRead(msg); } //Ping消息任務類 private class HeartBeatTask implements Runnable { private final ChannelHandlerContext ctx; public HeartBeatTask(final ChannelHandlerContext ctx) { this.ctx = ctx; } @Override public void run() { NettyMessage heatBeat = buildHeatBeat(); LOG.info("Client send heart beat messsage to server : ---> " + heatBeat); ctx.writeAndFlush(heatBeat); } private NettyMessage buildHeatBeat() { NettyMessage message = new NettyMessage(); Header header = new Header(); header.setType(MessageType.HEARTBEAT_REQ.value()); message.setHeader(header); return message; } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); if (heartBeat != null) { heartBeat.cancel(true); heartBeat = null; } ctx.fireExceptionCaught(cause); } }
import demo.protocol.netty.MessageType; import demo.protocol.netty.struct.Header; import demo.protocol.netty.struct.NettyMessage; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; /** * @author Lilinfeng * @version 1.0 * @date 2014年3月15日 */ public class HeartBeatRespHandler extends ChannelInboundHandlerAdapter { private static final Log LOG = LogFactory.getLog(HeartBeatRespHandler.class); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { NettyMessage message = (NettyMessage) msg; // 返回心跳應答消息 if (message.getHeader() != null && message.getHeader().getType() == MessageType.HEARTBEAT_REQ .value()) { LOG.info("Receive client heart beat message : ---> " + message); NettyMessage heartBeat = buildHeatBeat(); LOG.info("Send heart beat response message to client : ---> " + heartBeat); ctx.writeAndFlush(heartBeat); } else ctx.fireChannelRead(msg); } private NettyMessage buildHeatBeat() { NettyMessage message = new NettyMessage(); Header header = new Header(); header.setType(MessageType.HEARTBEAT_RESP.value()); message.setHeader(header); return message; } }
心跳超時的機制很是簡單,直接利用Netty的ReadTimeoutHandler進行實現,當必定週期內(50s)沒有接收到任何對方消息時,須要主動關閉鏈路。若是是客戶端,則從新發起鏈接,若是是服務端,則釋放資源,清除客戶端登陸緩存信息,等待服務器端重連。
在client感知到斷連事件以後,釋放資源,從新發起鏈接,具體代碼如如下部分
首先監聽網絡斷連事件,若是Channel關閉,則執行後續的重連任務,經過Bootstrap從新發起鏈接,客戶端掛在closeFuture上監聽鏈路關閉信號,一旦關閉,則建立定時器,重連。
服務端在監聽到斷連事件後,還須要清空緩存中的登陸認證註冊信息,以保證後續客戶端能夠正常重連。
public final class NettyConstant { public static final String REMOTEIP = "127.0.0.1"; public static final int PORT = 8080; public static final int LOCAL_PORT = 12088; public static final String LOCALIP = "127.0.0.1"; }
import demo.protocol.netty.NettyConstant; import demo.protocol.netty.codec.NettyMessageDecoder; import demo.protocol.netty.codec.NettyMessageEncoder; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.timeout.ReadTimeoutHandler; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.net.InetSocketAddress; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * @author Lilinfeng * @version 1.0 * @date 2014年3月15日 */ public class NettyClient { private static final Log LOG = LogFactory.getLog(NettyClient.class); private ScheduledExecutorService executor = Executors .newScheduledThreadPool(1); EventLoopGroup group = new NioEventLoopGroup(); public void connect(int port, String host) throws Exception { // 配置客戶端NIO線程組 try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new NettyMessageDecoder(1024 * 1024, 4, 4)); ch.pipeline().addLast("MessageEncoder", new NettyMessageEncoder()); ch.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(50)); ch.pipeline().addLast("LoginAuthHandler", new LoginAuthReqHandler()); ch.pipeline().addLast("HeartBeatHandler", new HeartBeatReqHandler()); } }); // 發起異步鏈接操做 ChannelFuture future = b.connect( new InetSocketAddress(host, port), new InetSocketAddress(NettyConstant.LOCALIP, NettyConstant.LOCAL_PORT)).sync(); // 當對應的channel關閉的時候,就會返回對應的channel。 // Returns the ChannelFuture which will be notified when this channel is closed. This method always returns the same future instance. future.channel().closeFuture().sync(); } finally { // 全部資源釋放完成以後,清空資源,再次發起重連操做 executor.execute(new Runnable() { @Override public void run() { try { TimeUnit.SECONDS.sleep(1); try { connect(NettyConstant.PORT, NettyConstant.REMOTEIP);// 發起重連操做 } catch (Exception e) { e.printStackTrace(); } } catch (InterruptedException e) { e.printStackTrace(); } } }); } } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { new NettyClient().connect(NettyConstant.PORT, NettyConstant.REMOTEIP); } }
import demo.protocol.netty.NettyConstant; import demo.protocol.netty.codec.NettyMessageDecoder; import demo.protocol.netty.codec.NettyMessageEncoder; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.ReadTimeoutHandler; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.io.IOException; /** * @author Lilinfeng * @version 1.0 * @date 2014年3月15日 */ public class NettyServer { private static final Log LOG = LogFactory.getLog(NettyServer.class); public void bind() throws Exception { // 配置服務端的NIO線程組 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws IOException { ch.pipeline().addLast( new NettyMessageDecoder(1024 * 1024, 4, 4)); ch.pipeline().addLast(new NettyMessageEncoder()); ch.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(50)); ch.pipeline().addLast(new LoginAuthRespHandler()); ch.pipeline().addLast("HeartBeatHandler", new HeartBeatRespHandler()); } }); // 綁定端口,同步等待成功 b.bind(NettyConstant.REMOTEIP, NettyConstant.PORT).sync(); LOG.info("Netty server start ok : " + (NettyConstant.REMOTEIP + " : " + NettyConstant.PORT)); } public static void main(String[] args) throws Exception { new NettyServer().bind(); } }
啓動server端,再啓動client端
2016-12-19 20:52:23 INFO HeartBeatRespHandler:44 - Receive client heart beat message : ---> NettyMessage [header=Header [crcCode=-1410399999, length=18, sessionID=0, type=5, priority=0, attachment={}]] 2016-12-19 20:52:23 INFO HeartBeatRespHandler:47 - Send heart beat response message to client : ---> NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionID=0, type=6, priority=0, attachment={}]] 2016-12-19 20:52:28 INFO HeartBeatRespHandler:44 - Receive client heart beat message : ---> NettyMessage [header=Header [crcCode=-1410399999, length=18, sessionID=0, type=5, priority=0, attachment={}]] 2016-12-19 20:52:28 INFO HeartBeatRespHandler:47 - Send heart beat response message to client : ---> NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionID=0, type=6, priority=0, attachment={}]] 2016-12-19 20:52:33 INFO HeartBeatRespHandler:44 - Receive client heart beat message : ---> NettyMessage [header=Header [crcCode=-1410399999, length=18, sessionID=0, type=5, priority=0, attachment={}]] 2016-12-19 20:52:33 INFO HeartBeatRespHandler:47 - Send heart beat response message to client : ---> NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionID=0, type=6, priority=0, attachment={}]] 2016-12-19 20:52:38 INFO HeartBeatRespHandler:44 - Receive client heart beat message : ---> NettyMessage [header=Header [crcCode=-1410399999, length=18, sessionID=0, type=5, priority=0, attachment={}]] 2016-12-19 20:52:38 INFO HeartBeatRespHandler:47 - Send heart beat response message to client : ---> NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionID=0, type=6, priority=0, attachment={}]] 2016-12-19 20:52:43 INFO HeartBeatRespHandler:44 - Receive client heart beat message : ---> NettyMessage [header=Header [crcCode=-1410399999, length=18, sessionID=0, type=5, priority=0, attachment={}]]
關閉服務端,client因爲心跳,一直報錯:
io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: /127.0.0.1:8080 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:347) at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:627) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:551) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:465) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:437) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:873) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) at java.lang.Thread.run(Thread.java:745)
須要測試信息以下:
(1) 客戶端是否可以正常發起重連
(2) 重連以後,再也不重連
(3) 斷連期間,心跳定時器中止工做,再也不發送心跳請求消息
(4) 服務器重啓成功後,容許客戶端從新登陸
(5) 服務器重啓成功之,客戶端可以重連和握手成功
(6) 重連成功以後,雙方的心跳可以正常護法
(7) 性能指標:重連期間,客戶端能源獲得了正常回收,不會致使句柄等資源泄露
使用vituralvm或者Jconsole工具,監控斷連期間,cpu,線程,堆內存等資源佔用正常.
重連以後,能夠繼續通訊
也能夠從新啓動,且清空緩存信息,清空代碼在LoginAuthHandler中的異常捕獲部分:
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); nodeCheck.remove(ctx.channel().remoteAddress().toString());// 刪除緩存 ctx.close(); ctx.fireExceptionCaught(cause); }