數據通訊的場景:長鏈接 OR 短鏈接java
在實際場景中,咱們如何使用Netty進行通訊呢?大體有3種方式:
bootstrap
第一種,使用長鏈接通道不斷開的形式進行通訊,也就是服務器和客戶端的通道一直處於開啓的狀態。若是服務器性能足夠好,而且咱們的客戶端數量也比較少的狀況下,是適合使用長鏈接的通道。數組
第二種,採用短鏈接方式,一次性批量提交數據,也就是咱們會把數據保存在本地臨時緩衝區或者臨時表裏。當達到數量時,就進行批量提交;或者經過定時任務輪詢提交。這種狀況是有弊端的,就是沒法作到實時傳輸。若是應用程序對實時性要求不高,能夠考慮使用。服務器
第三種,採用一種特殊的長鏈接。特殊在哪裏呢?在指定的某一時間以內,服務器與某臺客戶端沒有任何通訊,則斷開鏈接,若是斷開鏈接後,客戶端又須要向服務器發送請求,那麼再次創建鏈接。這裏有點CachedThreadPool的味道。框架
本篇博客將採用Netty來實現第三種方式的數據通訊,接下來咱們一塊兒來看看吧~socket
Netty數據通訊代碼實例ide
請求消息對象工具
package day3; import java.io.Serializable; public class Request implements Serializable{ private static final long SerialVersionUID = 1L; private String id ; private String name ; private String requestMessage ; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getRequestMessage() { return requestMessage; } public void setRequestMessage(String requestMessage) { this.requestMessage = requestMessage; } }
響應消息對象oop
package day3; import java.io.Serializable; public class Response implements Serializable{ private static final long serialVersionUID = 1L; private String id; private String name; private String responseMessage; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getResponseMessage() { return responseMessage; } public void setResponseMessage(String responseMessage) { this.responseMessage = responseMessage; } }
編解碼處理器
性能
package day3; import io.netty.handler.codec.marshalling.DefaultMarshallerProvider; import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider; import io.netty.handler.codec.marshalling.MarshallerProvider; import io.netty.handler.codec.marshalling.MarshallingDecoder; import io.netty.handler.codec.marshalling.MarshallingEncoder; import io.netty.handler.codec.marshalling.UnmarshallerProvider; import org.jboss.marshalling.MarshallerFactory; import org.jboss.marshalling.Marshalling; import org.jboss.marshalling.MarshallingConfiguration; /** * Marshalling工廠 */ public final class MarshallingCodeCFactory { /** * 建立Jboss Marshalling×××MarshallingDecoder * @return MarshallingDecoder */ public static MarshallingDecoder buildMarshallingDecoder() { //首先經過Marshalling工具類的精通方法獲取Marshalling實例對象 參數serial標識建立的是java序列化工廠對象。 final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); //建立了MarshallingConfiguration對象,配置了版本號爲5 final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); //根據marshallerFactory和configuration建立provider UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); //構建Netty的MarshallingDecoder對象,倆個參數分別爲provider和單個消息序列化後的最大長度 MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024); return decoder; } /** * 建立Jboss Marshalling編碼器MarshallingEncoder * @return MarshallingEncoder */ public static MarshallingEncoder buildMarshallingEncoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); //構建Netty的MarshallingEncoder對象,MarshallingEncoder用於實現序列化接口的POJO對象序列化爲二進制數組 MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; } }
注意,在上一篇博客《Netty實踐(二):TCP拆包、粘包問題》中,咱們是本身繼承ByteToMessageDecoder、MessageToByteEncoder來實現ByteBuff與消息對象的轉化的,其實這是有點麻煩的。在實際中,咱們徹底能夠利用相關序列化框架(JBoss Marshlling/Protobuf/Kryo/MessagePack)來幫助咱們快速完成編解碼,這裏我使用的是JBoss Marshalling(jboss-marshalling-1.3.0.CR9.jar+jboss-marshalling-serial-1.3.0.CR9.jar)。具體來講,客戶端和服務端交互的消息對象只須要實現JDK默認的序列化接口,同時利用JBoss Marshalling 生成編碼器和×××,用於後續Client/Server端便可。
Client Handler
package day3; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; public class ClientHandler extends ChannelHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { Response resp = (Response)msg; System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage()); } finally { ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
在這裏能夠清楚的看到,咱們直接將Object轉化成了自定義消息響應對象,可見JBoss Marshalling與Netty結合後,編解碼是如此簡單。
Client
package day3; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.ReadTimeoutHandler; import java.util.concurrent.TimeUnit; /** * */ public class Client { private static class SingletonHolder { static final Client instance = new Client(); } public static Client getInstance(){ return SingletonHolder.instance; } private EventLoopGroup group; private Bootstrap b; //經過ChannelFuture實現讀寫操做 private ChannelFuture cf ; private Client(){ group = new NioEventLoopGroup(); b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); //超時handler(當服務器端與客戶端在指定時間以上沒有任何進行通訊,則會關閉相應的通道,主要爲減少服務端資源佔用) sc.pipeline().addLast(new ReadTimeoutHandler(3)); sc.pipeline().addLast(new ClientHandler()); } }); } public void connect(){ try { this.cf = b.connect("127.0.0.1", 8765).sync(); System.out.println("遠程服務器已經鏈接, 能夠進行數據交換.."); } catch (Exception e) { e.printStackTrace(); } } //這裏是通道關閉,再次創建鏈接的核心代碼 public ChannelFuture getChannelFuture(){ if(this.cf == null){ this.connect(); } if(!this.cf.channel().isActive()){ this.connect(); } return this.cf; } public static void main(String[] args) throws Exception{ final Client c = Client.getInstance(); //注意client好像沒有調用connect()方法進行鏈接,可是實際上在下面的代碼中作了 ChannelFuture cf = c.getChannelFuture(); for(int i = 1; i <= 3; i++ ){ Request request = new Request(); request.setId("" + i); request.setName("pro" + i); request.setRequestMessage("數據信息" + i); cf.channel().writeAndFlush(request); TimeUnit.SECONDS.sleep(4); } cf.channel().closeFuture().sync(); //通道關閉後,經過另外一個線程模擬客戶端再次創建鏈接發送請求 new Thread(new Runnable() { @Override public void run() { try { System.out.println("進入子線程..."); ChannelFuture cf = c.getChannelFuture(); System.out.println(cf.channel().isActive()); System.out.println(cf.channel().isOpen()); //再次發送數據 Request request = new Request(); request.setId("" + 4); request.setName("pro" + 4); request.setRequestMessage("數據信息" + 4); cf.channel().writeAndFlush(request); cf.channel().closeFuture().sync(); System.out.println("子線程結束..."); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); System.out.println("斷開鏈接,主線程結束.."); } }
這裏對Client進行了初步的封裝,採用靜態內部類實現單例。
Client的Handler不單單有Marshalling的編×××,還加入了Netty自帶的ReadTimeoutHandler,這是客戶端與服務端一段時間沒有通訊就斷開鏈接的依據。從這裏也看到Netty的強大之處了,經過提供一些預約義的Handler讓你的代碼變得簡單,只須要專一於業務實現便可。客戶端超時斷開通道後,如何再次創建鏈接進行通訊呢?經過getChannelFuture()你會知道。
客戶端代碼模擬了一個線程通訊超時,關閉通道後,另外一個線程與服務器端再次通訊。
Server Handler
package day3; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class ServerHandler extends ChannelHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Request request = (Request)msg; System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getRequestMessage()); Response response = new Response(); response.setId(request.getId()); response.setName("response" + request.getId()); response.setResponseMessage("響應內容" + request.getId()); ctx.writeAndFlush(response);//.addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
Server
package day3; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.ReadTimeoutHandler; public class Server { public static void main(String[] args) throws Exception{ EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) //設置日誌 .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ReadTimeoutHandler(3)); sc.pipeline().addLast(new ServerHandler()); } }); ChannelFuture cf = b.bind(8765).sync(); cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }
運行結果
說明:因爲客戶端一開始是發送3條消息給服務端,可是每條消息發送間隔4S,因爲超時設置爲3S,因而發送第一條消息後,通道便斷開鏈接。接下來,客戶端又啓動了一個線程再次與服務端通訊。
到這裏,這篇博客就結束了,對你有用嗎?
下週咱們再來看Netty在心跳檢測方面的應用,^_^