本篇demo實現的功能是基於netty的心跳機制和長鏈接以及重連機制,最關鍵的就是經過netty中的 IdleStateHandler
的超時機制來實現心跳和重連 ,而後經過org.msgpack
編碼器來實現跨平臺數據傳輸,html
實現的功能就是經過Scanner來輸入消息獲得服務端的迴應,超過設定的超時時間就觸發超時事件來進行心跳傳輸,若是服務端宕機客戶端就會一直髮起重連。java
1、運行效果編程
服務端:bootstrap
客戶端:服務器
在maven pom文件添加依賴:網絡
<!-- 解碼and編碼器 --> <!-- https://mvnrepository.com/artifact/org.msgpack/msgpack --> <dependency> <groupId>org.msgpack</groupId> <artifactId>msgpack</artifactId> <version>0.6.12</version> </dependency> <!-- netty 核心依賴 --> <!-- https://mvnrepository.com/artifact/io.netty/netty-all --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.33.Final</version> </dependency>
導入以上依賴 ↓ 建立配置模型model(模型類) , TypeData(參數配置類) ↓ 建立解碼and編碼器MsgPckDecode(解碼器) ,MsgPckEncode(編碼器) ↓ 建立各自的控制器 AbstractClientChannelInboundHandleAdapter,AbstractServerChannelInboundHandleAdapter ↓ 建立客戶端及客戶端控制器Client(客戶端啓動類) , ClientHandler(客戶端控制器) ↓ 建立服務端以及控制器Server(客戶端啓動類) , ServerHandler(客戶端控制器) ps:本demo使用了msgpack , It’s like JSON. but fast and small.
package com.zxh.demo.model; import java.io.Serializable; import org.msgpack.annotation.Message; /** * 消息類型分離器 * @author Administrator * */ @Message public class Model implements Serializable{ private static final long serialVersionUID = 1L; //類型 private int type; //內容 private String body; public String getBody() { return body; } public void setBody(String body) { this.body = body; } public int getType() { return type; } public void setType(int type) { this.type = type; } @Override public String toString() { return "Model [type=" + type + ", body=" + body + "]"; } }
編寫一個配置類接口,用於控制心跳包和應用消息的處理
package com.zxh.demo.model; /** * 配置項 * @author Administrator * */ public interface TypeData { byte PING = 1; byte PONG = 2; //顧客 byte CUSTOMER = 3; }
建立MsgPckDecode(解碼器)框架
package com.zxh.demo.model; import java.util.List; import org.msgpack.MessagePack; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; /** * 解碼器 * @author Administrator * */ public class MsgPckDecode extends MessageToMessageDecoder<ByteBuf>{ @Override protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { final byte[] array; final int length = msg.readableBytes(); array = new byte[length]; msg.getBytes(msg.readerIndex(), array, 0, length); MessagePack pack = new MessagePack(); out.add(pack.read(array, Model.class)); } }
建立MsgPckEncode(編碼器)
package com.zxh.demo.model; import org.msgpack.MessagePack; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; /** * 編碼器 * @author Administrator * */ public class MsgPckEncode extends MessageToByteEncoder<Object>{ @Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf buf) throws Exception { // TODO Auto-generated method stub MessagePack pack = new MessagePack(); byte[] write = pack.write(msg); buf.writeBytes(write); } }
建立client客戶端:
package com.zxh.demo.client; import java.util.Scanner; import java.util.concurrent.TimeUnit; import com.zxh.demo.model.Model; import com.zxh.demo.model.MsgPckDecode; import com.zxh.demo.model.MsgPckEncode; import com.zxh.demo.model.TypeData; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.timeout.IdleStateHandler; public class Client { private NioEventLoopGroup worker = new NioEventLoopGroup(); private Channel channel; private Bootstrap bootstrap; public static void main(String[] args) { Client client = new Client(); client.start(); client.sendData(); } private void start() { bootstrap = new Bootstrap(); bootstrap.group(worker) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { // TODO Auto-generated method stub ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new IdleStateHandler(0,0,5)); pipeline.addLast(new MsgPckDecode()); pipeline.addLast(new MsgPckEncode()); pipeline.addLast(new ClientHandler(Client.this)); } }); doConnect(); } /** * 鏈接服務端 and 重連 */ protected void doConnect() { if (channel != null && channel.isActive()){ return; } ChannelFuture connect = bootstrap.connect("127.0.0.1", 8081); //實現監聽通道鏈接的方法 connect.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if(channelFuture.isSuccess()){ channel = channelFuture.channel(); System.out.println("鏈接服務端成功"); }else{ System.out.println("每隔2s重連...."); channelFuture.channel().eventLoop().schedule(new Runnable() { @Override public void run() { doConnect(); } },2,TimeUnit.SECONDS); } } }); } /** * 向服務端發送消息 */ private void sendData() { Scanner sc= new Scanner(System.in); for (int i = 0; i < 1000; i++) { if(channel != null && channel.isActive()){ //獲取一個鍵盤掃描器 String nextLine = sc.nextLine(); Model model = new Model(); model.setType(TypeData.CUSTOMER); model.setBody(nextLine); channel.writeAndFlush(model); } } } }
建立Server服務端:
package com.zxh.demo.server; import com.zxh.demo.model.MsgPckDecode; import com.zxh.demo.model.MsgPckEncode; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.timeout.IdleStateHandler; public class Server { public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(4); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .localAddress(8081) .childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { // TODO Auto-generated method stub ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new IdleStateHandler(10,0,0)); pipeline.addLast(new MsgPckDecode()); pipeline.addLast(new MsgPckEncode()); pipeline.addLast(new ServerHandler()); } }); System.out.println("start server by port 8081 --"); ChannelFuture sync = serverBootstrap.bind().sync(); sync.channel().closeFuture().sync(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }finally{ //優雅的關閉資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
先運行服務端,而後再啓動客戶端 會根據設置的端口鏈接服務端,在客戶端輸入消息就會獲得服務端的迴應,若是超過5秒沒有進行讀寫就會觸發IdleStateHandler
類超時事件 來進行心跳包的傳輸 ,服務端未檢測到客戶端的讀寫或者心跳就會主動關閉channel通道異步
3、項目結構圖socket
所謂的心跳, 即在 TCP 長鏈接中, 客戶端和服務器之間按期發送的一種特殊的數據包, 通知對方本身還在線, 以確保 TCP 鏈接的有效性.由於網絡的不可靠性, 有可能在 TCP 保持長鏈接的過程當中, 因爲某些突發狀況, 例如網線被拔出, 忽然掉電等, 會形成服務器和客戶端的鏈接中斷. 在這些突發狀況下, 若是剛好服務器和客戶端之間沒有交互的話, 那麼它們是不能在短期內發現對方已經掉線的. 爲了解決這個問題, 咱們就須要引入 心跳 機制. 心跳機制的工做原理是: 在服務器和客戶端之間必定時間內沒有數據交互時, 即處於 idle 狀態時, 客戶端或服務器會發送一個特殊的數據包給對方, 當接收方收到這個數據報文後, 也當即發送一個特殊的數據報文, 迴應發送方, 此即一個 PING-PONG 交互. 天然地, 當某一端收到心跳消息後, 就知道了對方仍然在線, 這就確保 TCP 鏈接的有效性maven