Netty實踐(三):實際場景下的數據通訊

數據通訊的場景:長鏈接 OR 短鏈接java

在實際場景中,咱們如何使用Netty進行通訊呢?大體有3種方式:
bootstrap

第一種,使用長鏈接通道不斷開的形式進行通訊,也就是服務器和客戶端的通道一直處於開啓的狀態。若是服務器性能足夠好,而且咱們的客戶端數量也比較少的狀況下,是適合使用長鏈接的通道。數組

第二種,採用短鏈接方式,一次性批量提交數據,也就是咱們會把數據保存在本地臨時緩衝區或者臨時表裏。當達到數量時,就進行批量提交;或者經過定時任務輪詢提交。這種狀況是有弊端的,就是沒法作到實時傳輸。若是應用程序對實時性要求不高,能夠考慮使用。服務器

第三種,採用一種特殊的長鏈接。特殊在哪裏呢?在指定的某一時間以內,服務器與某臺客戶端沒有任何通訊,則斷開鏈接,若是斷開鏈接後,客戶端又須要向服務器發送請求,那麼再次創建鏈接。這裏有點CachedThreadPool的味道。框架

本篇博客將採用Netty來實現第三種方式的數據通訊,接下來咱們一塊兒來看看吧~socket



Netty數據通訊代碼實例ide


請求消息對象工具

package day3;

import java.io.Serializable;

public class Request implements Serializable{

   private static final long  SerialVersionUID = 1L;
   
   private String id ;
   private String name ;
   private String requestMessage ;
   
   public String getId() {
      return id;
   }
   public void setId(String id) {
      this.id = id;
   }
   public String getName() {
      return name;
   }
   public void setName(String name) {
      this.name = name;
   }
   public String getRequestMessage() {
      return requestMessage;
   }
   public void setRequestMessage(String requestMessage) {
      this.requestMessage = requestMessage;
   }


}


響應消息對象oop

package day3;

import java.io.Serializable;

public class Response implements Serializable{
   
   private static final long serialVersionUID = 1L;
   
   private String id;
   private String name;
   private String responseMessage;
   
   public String getId() {
      return id;
   }
   public void setId(String id) {
      this.id = id;
   }
   public String getName() {
      return name;
   }
   public void setName(String name) {
      this.name = name;
   }
   public String getResponseMessage() {
      return responseMessage;
   }
   public void setResponseMessage(String responseMessage) {
      this.responseMessage = responseMessage;
   }
   

}


編解碼處理器
性能

package day3;

import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;

import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;

/**
 * Marshalling工廠
 */
public final class MarshallingCodeCFactory {

    /**
     * 建立Jboss Marshalling×××MarshallingDecoder
     * @return MarshallingDecoder
     */
    public static MarshallingDecoder buildMarshallingDecoder() {
       //首先經過Marshalling工具類的精通方法獲取Marshalling實例對象 參數serial標識建立的是java序列化工廠對象。
      final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
      //建立了MarshallingConfiguration對象,配置了版本號爲5 
      final MarshallingConfiguration configuration = new MarshallingConfiguration();
      configuration.setVersion(5);
      //根據marshallerFactory和configuration建立provider
      UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
      //構建Netty的MarshallingDecoder對象,倆個參數分別爲provider和單個消息序列化後的最大長度
      MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
      return decoder;
    }

    /**
     * 建立Jboss Marshalling編碼器MarshallingEncoder
     * @return MarshallingEncoder
     */
    public static MarshallingEncoder buildMarshallingEncoder() {
      final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
      final MarshallingConfiguration configuration = new MarshallingConfiguration();
      configuration.setVersion(5);
      MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
      //構建Netty的MarshallingEncoder對象,MarshallingEncoder用於實現序列化接口的POJO對象序列化爲二進制數組
      MarshallingEncoder encoder = new MarshallingEncoder(provider);
      return encoder;
    }
}

注意,在上一篇博客《Netty實踐(二):TCP拆包、粘包問題》中,咱們是本身繼承ByteToMessageDecoder、MessageToByteEncoder來實現ByteBuff與消息對象的轉化的,其實這是有點麻煩的。在實際中,咱們徹底能夠利用相關序列化框架(JBoss Marshlling/Protobuf/Kryo/MessagePack)來幫助咱們快速完成編解碼,這裏我使用的是JBoss Marshalling(jboss-marshalling-1.3.0.CR9.jar+jboss-marshalling-serial-1.3.0.CR9.jar)。具體來講,客戶端和服務端交互的消息對象只須要實現JDK默認的序列化接口,同時利用JBoss Marshalling 生成編碼器和×××,用於後續Client/Server端便可。


Client Handler

package day3;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

public class ClientHandler extends ChannelHandlerAdapter{
   
   @Override
   public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      try {
         Response resp = (Response)msg;
         System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());       
      } finally {
         ReferenceCountUtil.release(msg);
      }
   }

   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
      ctx.close();
   }
   
}

在這裏能夠清楚的看到,咱們直接將Object轉化成了自定義消息響應對象,可見JBoss Marshalling與Netty結合後,編解碼是如此簡單。



Client

package day3;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;

import java.util.concurrent.TimeUnit;


/**
 *
 */
public class Client {
   
   private static class SingletonHolder {
      static final Client instance = new Client();
   }
   
   public static Client getInstance(){
      return SingletonHolder.instance;
   }
   
   private EventLoopGroup group;
   private Bootstrap b;

   //經過ChannelFuture實現讀寫操做
   private ChannelFuture cf ;
   
   private Client(){
         group = new NioEventLoopGroup();
         b = new Bootstrap();
         b.group(group)
          .channel(NioSocketChannel.class)
          .handler(new LoggingHandler(LogLevel.INFO))
          .handler(new ChannelInitializer<SocketChannel>() {
               @Override
               protected void initChannel(SocketChannel sc) throws Exception {

                  sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                  sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                  //超時handler(當服務器端與客戶端在指定時間以上沒有任何進行通訊,則會關閉相應的通道,主要爲減少服務端資源佔用)
                  sc.pipeline().addLast(new ReadTimeoutHandler(3)); 
                  sc.pipeline().addLast(new ClientHandler());
               }
          });
   }
   
   public void connect(){
      try {
         this.cf = b.connect("127.0.0.1", 8765).sync();
         System.out.println("遠程服務器已經鏈接, 能夠進行數據交換..");            
      } catch (Exception e) {
         e.printStackTrace();
      }
   }

   //這裏是通道關閉,再次創建鏈接的核心代碼
   public ChannelFuture getChannelFuture(){
      
      if(this.cf == null){
         this.connect();
      }
      if(!this.cf.channel().isActive()){
         this.connect();
      }
      
      return this.cf;
   }
   
   public static void main(String[] args) throws Exception{

      final Client c = Client.getInstance();

      //注意client好像沒有調用connect()方法進行鏈接,可是實際上在下面的代碼中作了
      ChannelFuture cf = c.getChannelFuture();

      for(int i = 1; i <= 3; i++ ){
         Request request = new Request();
         request.setId("" + i);
         request.setName("pro" + i);
         request.setRequestMessage("數據信息" + i);
         cf.channel().writeAndFlush(request);
         TimeUnit.SECONDS.sleep(4);
      }

      cf.channel().closeFuture().sync();
      
      //通道關閉後,經過另外一個線程模擬客戶端再次創建鏈接發送請求
      new Thread(new Runnable() {
         @Override
         public void run() {
            try {
               System.out.println("進入子線程...");
               ChannelFuture cf = c.getChannelFuture();
               System.out.println(cf.channel().isActive());
               System.out.println(cf.channel().isOpen());
               
               //再次發送數據
               Request request = new Request();
               request.setId("" + 4);
               request.setName("pro" + 4);
               request.setRequestMessage("數據信息" + 4);
               cf.channel().writeAndFlush(request);               
               cf.channel().closeFuture().sync();
               System.out.println("子線程結束...");

            } catch (InterruptedException e) {
               e.printStackTrace();
            }
         }
      }).start();
      
      System.out.println("斷開鏈接,主線程結束..");
      
   }
   
   
   
}

這裏對Client進行了初步的封裝,採用靜態內部類實現單例。

Client的Handler不單單有Marshalling的編×××,還加入了Netty自帶的ReadTimeoutHandler,這是客戶端與服務端一段時間沒有通訊就斷開鏈接的依據。從這裏也看到Netty的強大之處了,經過提供一些預約義的Handler讓你的代碼變得簡單,只須要專一於業務實現便可。客戶端超時斷開通道後,如何再次創建鏈接進行通訊呢?經過getChannelFuture()你會知道。

客戶端代碼模擬了一個線程通訊超時,關閉通道後,另外一個線程與服務器端再次通訊。



Server Handler

package day3;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class ServerHandler extends ChannelHandlerAdapter{

   @Override
   public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      Request request = (Request)msg;
      System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getRequestMessage());
      Response response = new Response();
      response.setId(request.getId());
      response.setName("response" + request.getId());
      response.setResponseMessage("響應內容" + request.getId());
      ctx.writeAndFlush(response);//.addListener(ChannelFutureListener.CLOSE);
   }

   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
      ctx.close();
   }

   
   
}


Server

package day3;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;

public class Server {

   public static void main(String[] args) throws Exception{
      
      EventLoopGroup pGroup = new NioEventLoopGroup();
      EventLoopGroup cGroup = new NioEventLoopGroup();
      
      ServerBootstrap b = new ServerBootstrap();
      b.group(pGroup, cGroup)
       .channel(NioServerSocketChannel.class)
       .option(ChannelOption.SO_BACKLOG, 1024)
       //設置日誌
       .handler(new LoggingHandler(LogLevel.INFO))
       .childHandler(new ChannelInitializer<SocketChannel>() {
         protected void initChannel(SocketChannel sc) throws Exception {
            sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
            sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
            sc.pipeline().addLast(new ReadTimeoutHandler(3));
            sc.pipeline().addLast(new ServerHandler());
         }
      });
      
      ChannelFuture cf = b.bind(8765).sync();
      
      cf.channel().closeFuture().sync();
      pGroup.shutdownGracefully();
      cGroup.shutdownGracefully();
      
   }
}



運行結果

wKiom1h63b_ROhaOAAB6W0wcmzc404.png

wKiom1h63c3ALRKyAAA7IlAsS98780.png


說明:因爲客戶端一開始是發送3條消息給服務端,可是每條消息發送間隔4S,因爲超時設置爲3S,因而發送第一條消息後,通道便斷開鏈接。接下來,客戶端又啓動了一個線程再次與服務端通訊。



到這裏,這篇博客就結束了,對你有用嗎?

下週咱們再來看Netty在心跳檢測方面的應用,^_^

相關文章
相關標籤/搜索