手把手教你 Netty 實現自定義協議!

關於協議,使用最爲普遍的是HTTP協議,可是在一些服務交互領域,其使用則相對較少,主要緣由有三方面:


HTTP協議會攜帶諸如header和cookie等信息,其自己對字節的利用率也較低,這使得HTTP協議比較臃腫,在承載相同信息的狀況下,HTTP協議將須要發送更多的數據包;
HTTP協議是基於TCP的短鏈接,其在每次請求和響應的時候都須要進行三次握手和四次揮手,因爲服務的交互設計通常都要求可以承載高併發的請求,於是HTTP協議這種頻繁的握手和揮手動做會極大的影響服務之間交互的效率;
服務之間每每有一些根據其自身業務特性所獨有的需求,而HTTP協議沒法很好的服務於這些業務需求。
基於上面的緣由,通常的服務之間進行交互時都會使用自定義協議,常見的框架,諸如dubbo,kafka,zookeeper都實現了符合其自身業務需求的協議,本文主要講解如何使用Netty實現一款自定義的協議。
bootstrap

1. 協議規定


所謂協議,其本質其實就是定義了一個將數據轉換爲字節,或者將字節轉換爲數據的一個規範。一款自定義協議,其通常包含兩個部分:消息頭和消息體。
消息頭的長度通常是固定的,或者說是可肯定的,其定義了這次消息的一些公有信息,好比當前服務的版本,消息的sessionId,消息的類型等等;消息體則主要是這次消息所須要發送的內容,通常在消息頭的最後必定的字節中保存了當前消息的消息體的長度。下面是咱們爲當前自定義協議所作的一些規定:服務器

上述協議定義中,咱們除了定義經常使用的請求和響應消息類型之外,還定義了Ping和Pong消息。Ping和Pong消息的做用通常是,在服務處於閒置狀態達到必定時長,好比2s時,客戶端服務會向服務端發送一個Ping消息,則會返回一個Pong消息,這樣才表示客戶端與服務端的鏈接是無缺的。
若是服務端沒有返回相應的消息,客戶端就會關閉與服務端的鏈接或者是從新創建與服務端的鏈接。這樣的優勢在於能夠防止忽然會產生的客戶端與服務端的大量交互。
cookie

2. 協議實現


經過上面的定義其實咱們能夠發現,所謂協議,就是定義了一個規範,基於這個規範,咱們能夠將消息轉換爲相應的字節流,而後經由TCP傳輸到目標服務,目標服務則也基於該規範將字節流轉換爲相應的消息,這樣就達到了相互交流的目的。
這裏面最重要的主要是如何基於該規範將消息轉換爲字節流或者將字節流轉換爲消息。
這一方面,Netty爲咱們提供了ByteToMessageDecoder和MessageToByteEncoder用於進行消息和字節流的相互轉換。首先咱們定義了以下消息實體:session


public class Message {
 private int magicNumber;
 private byte mainVersion;
 private byte subVersion;
 private byte modifyVersion;
 private String sessionId;

 private MessageTypeEnum messageType;
 private Map<String, String> attachments = new HashMap<>();
 private String body;

 public Map<String, String> getAttachments() {
   return Collections.unmodifiableMap(attachments);
 }

 public void setAttachments(Map<String, String> attachments) {
   this.attachments.clear();
   if (null != attachments) {
     this.attachments.putAll(attachments);
   }
 }

 public void addAttachment(String key, String value) {
   attachments.put(key, value);
 }

 // getter and setter...
}


上述消息中,咱們將協議中所規定的各個字段都進行了定義,而且定義了一個標誌消息類型的枚舉MessageTypeEnum,以下是該枚舉的源碼:併發


public enum MessageTypeEnum {
 REQUEST((byte)1), RESPONSE((byte)2), PING((byte)3), PONG((byte)4), EMPTY((byte)5);

 private byte type;

 MessageTypeEnum(byte type) {
   this.type = type;
 }

 public int getType() {
   return type;
 }

 public static MessageTypeEnum get(byte type) {
   for (MessageTypeEnum value : values()) {
     if (value.type == type) {
       return value;
     }
   }

   throw new RuntimeException("unsupported type: " + type);
 }
}


上述主要是定義了描述自定義協議相關的實體屬性,對於消息的編碼,本質就是依據上述協議方式將消息實體轉換爲字節流,以下是轉換字節流的代碼:app


public class MessageEncoder extends MessageToByteEncoder<Message> {

 @Override
 protected void encode(ChannelHandlerContext ctx, Message message, ByteBuf out) {
   // 這裏會判斷消息類型是否是EMPTY類型,若是是EMPTY類型,則表示當前消息不須要寫入到管道中
   if (message.getMessageType() != MessageTypeEnum.EMPTY) {
     out.writeInt(Constants.MAGIC_NUMBER);// 寫入當前的魔數
     out.writeByte(Constants.MAIN_VERSION);// 寫入當前的主版本號
     out.writeByte(Constants.SUB_VERSION);// 寫入當前的次版本號
     out.writeByte(Constants.MODIFY_VERSION);// 寫入當前的修訂版本號
     if (!StringUtils.hasText(message.getSessionId())) {
       // 生成一個sessionId,並將其寫入到字節序列中
       String sessionId = SessionIdGenerator.generate();
       message.setSessionId(sessionId);
       out.writeCharSequence(sessionId, Charset.defaultCharset());
     }

     out.writeByte(message.getMessageType().getType());// 寫入當前消息的類型
     out.writeShort(message.getAttachments().size());// 寫入當前消息的附加參數數量
     message.getAttachments().forEach((key, value) -> {
       Charset charset = Charset.defaultCharset();
       out.writeInt(key.length());// 寫入鍵的長度
       out.writeCharSequence(key, charset);// 寫入鍵數據
       out.writeInt(value.length());// 希爾值的長度
       out.writeCharSequence(value, charset);// 寫入值數據
     });

     if (null == message.getBody()) {
       out.writeInt(0);// 若是消息體爲空,則寫入0,表示消息體長度爲0
     } else {
       out.writeInt(message.getBody().length());
       out.writeCharSequence(message.getBody(), Charset.defaultCharset());
     }
   }
 }
}


對於消息的解碼,其過程與上面的消息編碼方式基本一致,主要是基於協議所規定的將字節流數據轉換爲消息實體數據。以下是其轉換過程:框架


public class MessageDecoder extends ByteToMessageDecoder {

 @Override
 protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
   Message message = new Message();
   message.setMagicNumber(byteBuf.readInt());  // 讀取魔數
   message.setMainVersion(byteBuf.readByte()); // 讀取主版本號
   message.setSubVersion(byteBuf.readByte()); // 讀取次版本號
   message.setModifyVersion(byteBuf.readByte());// 讀取修訂版本號
   CharSequence sessionId = byteBuf.readCharSequence(
       Constants.SESSION_ID_LENGTH, Charset.defaultCharset());// 讀取sessionId
   message.setSessionId((String)sessionId);

   message.setMessageType(MessageTypeEnum.get(byteBuf.readByte()));// 讀取當前的消息類型
   short attachmentSize = byteBuf.readShort();// 讀取附件長度
   for (short i = 0; i < attachmentSize; i++) {
     int keyLength = byteBuf.readInt();// 讀取鍵長度和數據
     CharSequence key = byteBuf.readCharSequence(keyLength, Charset.defaultCharset());
     int valueLength = byteBuf.readInt();// 讀取值長度和數據
     CharSequence value = byteBuf.readCharSequence(valueLength, Charset.defaultCharset());
     message.addAttachment(key.toString(), value.toString());
   }

   int bodyLength = byteBuf.readInt();// 讀取消息體長度和數據
   CharSequence body = byteBuf.readCharSequence(bodyLength, Charset.defaultCharset());
   message.setBody(body.toString());
   out.add(message);
 }
}


如此,咱們自定義消息與字節流的相互轉換工做已經完成。對於消息的處理,主要是要根據消息的不一樣類型,對消息進行相應的處理,好比對於request類型消息,要寫入響應數據,對於ping消息,要寫入pong消息做爲迴應。
下面咱們經過定義Netty handler的方式實現對消息的處理:dom


// 服務端消息處理器
public class ServerMessageHandler extends SimpleChannelInboundHandler<Message> {

 // 獲取一個消息處理器工廠類實例
 private MessageResolverFactory resolverFactory = MessageResolverFactory.getInstance();

 @Override
 protected void channelRead0(ChannelHandlerContext ctx, Message message) throws Exception {
   Resolver resolver = resolverFactory.getMessageResolver(message);// 獲取消息處理器
   Message result = resolver.resolve(message);// 對消息進行處理並獲取響應數據
   ctx.writeAndFlush(result);// 將響應數據寫入處處理器中
 }

 @Override
 public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
   resolverFactory.registerResolver(new RequestMessageResolver());// 註冊request消息處理器
   resolverFactory.registerResolver(new ResponseMessageResolver());// 註冊response消息處理器
   resolverFactory.registerResolver(new PingMessageResolver());// 註冊ping消息處理器
   resolverFactory.registerResolver(new PongMessageResolver());// 註冊pong消息處理器
 }
}
// 客戶端消息處理器
public class ClientMessageHandler extends ServerMessageHandler {

 // 建立一個線程,模擬用戶發送消息
 private ExecutorService executor = Executors.newSingleThreadExecutor();

 @Override
 public void channelActive(ChannelHandlerContext ctx) throws Exception {
   // 對於客戶端,在創建鏈接以後,在一個獨立線程中模擬用戶發送數據給服務端
   executor.execute(new MessageSender(ctx));
 }

 /**
  * 這裏userEventTriggered()主要是在一些用戶事件觸發時被調用,這裏咱們定義的事件是進行心跳檢測的
  * ping和pong消息,當前觸發器會在指定的觸發器指定的時間返回內若是客戶端沒有被讀取消息或者沒有寫入
  * 消息到管道,則會觸發當前方法
  */

 @Override
 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
   if (evt instanceof IdleStateEvent) {
     IdleStateEvent event = (IdleStateEvent) evt;
     if (event.state() == IdleState.READER_IDLE) {
       // 必定時間內,當前服務沒有發生讀取事件,也即沒有消息發送到當前服務來時,
       // 其會發送一個Ping消息到服務器,以等待其響應Pong消息
       Message message = new Message();
       message.setMessageType(MessageTypeEnum.PING);
       ctx.writeAndFlush(message);
     } else if (event.state() == IdleState.WRITER_IDLE) {
       // 若是當前服務在指定時間內沒有寫入消息到管道,則關閉當前管道
       ctx.close();
     }
   }
 }

 private static final class MessageSender implements Runnable {

   private static final AtomicLong counter = new AtomicLong(1);
   private volatile ChannelHandlerContext ctx;

   public MessageSender(ChannelHandlerContext ctx) {
     this.ctx = ctx;
   }

   @Override
   public void run() {
     try {
       while (true) {
         // 模擬隨機發送消息的過程
         TimeUnit.SECONDS.sleep(new Random().nextInt(3));
         Message message = new Message();
         message.setMessageType(MessageTypeEnum.REQUEST);
         message.setBody("this is my " + counter.getAndIncrement() + " message.");
         message.addAttachment("name", "xufeng");
         ctx.writeAndFlush(message);
       }
     } catch (InterruptedException e) {
       e.printStackTrace();
     }
   }
 }
}


上述代碼中,因爲客戶端和服務端須要處理的消息類型是徹底同樣的,於是客戶端處理類繼承了服務端處理類。可是對於客戶端而言,其還須要定時向服務端發送心跳消息,用於檢測客戶端與服務器的鏈接是否健在,於是客戶端還會實現userEventTriggered()方法,在該方法中定時向服務器發送心跳消息。
userEventTriggered()方法主要是在客戶端被閒置必定時間後,其會根據其讀取或者寫入消息的限制時長來選擇性的觸發讀取或寫入事件。
上述實現中,咱們看到,對於具體類型消息的處理,咱們是經過一個工廠類來獲取對應的消息處理器,而後處理相應的消息,下面咱們該工廠類的代碼:ide


public final class MessageResolverFactory {

 // 建立一個工廠類實例
 private static final MessageResolverFactory resolverFactory = new MessageResolverFactory();
 private static final List<Resolver> resolvers = new CopyOnWriteArrayList<>();

 private MessageResolverFactory() {}

 // 使用單例模式實例化當前工廠類實例
 public static MessageResolverFactory getInstance() {
   return resolverFactory;
 }

 public void registerResolver(Resolver resolver) {
   resolvers.add(resolver);
 }

 // 根據解碼後的消息,在工廠類處理器中查找能夠處理當前消息的處理器
 public Resolver getMessageResolver(Message message) {
   for (Resolver resolver : resolvers) {
     if (resolver.support(message)) {
       return resolver;
     }
   }

   throw new RuntimeException("cannot find resolver, message type: " + message.getMessageType());
 }

}


上述工廠類比較簡單,主要就是經過單例模式獲取一個工廠類實例,而後提供一個根據具體消息來查找其對應的處理器的方法。下面咱們來看看各個消息處理器的代碼:高併發


// request類型的消息
public class RequestMessageResolver implements Resolver {

 private static final AtomicInteger counter = new AtomicInteger(1);

 @Override
 public boolean support(Message message) {
   return message.getMessageType() == MessageTypeEnum.REQUEST;
 }

 @Override
 public Message resolve(Message message) {
   // 接收到request消息以後,對消息進行處理,這裏主要是將其打印出來
   int index = counter.getAndIncrement();
   System.out.println("[trx: " + message.getSessionId() + "]"
       + index + ". receive request: " + message.getBody());
   System.out.println("[trx: " + message.getSessionId() + "]"
       + index + ". attachments: " + message.getAttachments());

   // 處理完成後,生成一個響應消息返回
   Message response = new Message();
   response.setMessageType(MessageTypeEnum.RESPONSE);
   response.setBody("nice to meet you too!");
   response.addAttachment("name", "xufeng");
   response.addAttachment("hometown", "wuhan");
   return response;
 }
}
// 響應消息處理器
public class ResponseMessageResolver implements Resolver {

 private static final AtomicInteger counter = new AtomicInteger(1);

 @Override
 public boolean support(Message message) {
   return message.getMessageType() == MessageTypeEnum.RESPONSE;
 }

 @Override
 public Message resolve(Message message) {
   // 接收到對方服務的響應消息以後,對響應消息進行處理,這裏主要是將其打印出來
   int index = counter.getAndIncrement();
   System.out.println("[trx: " + message.getSessionId() + "]"
       + index + ". receive response: " + message.getBody());
   System.out.println("[trx: " + message.getSessionId() + "]"
       + index + ". attachments: " + message.getAttachments());

   // 響應消息不須要向對方服務再發送響應,於是這裏寫入一個空消息
   Message empty = new Message();
   empty.setMessageType(MessageTypeEnum.EMPTY);
   return empty;
 }
}
// ping消息處理器
public class PingMessageResolver implements Resolver {

 @Override
 public boolean support(Message message) {
   return message.getMessageType() == MessageTypeEnum.PING;
 }

 @Override
 public Message resolve(Message message) {
   // 接收到ping消息後,返回一個pong消息返回
   System.out.println("receive ping message: " + System.currentTimeMillis());
   Message pong = new Message();
   pong.setMessageType(MessageTypeEnum.PONG);
   return pong;
 }
}
// pong消息處理器
public class PongMessageResolver implements Resolver {

 @Override
 public boolean support(Message message) {
   return message.getMessageType() == MessageTypeEnum.PONG;
 }

 @Override
 public Message resolve(Message message) {
   // 接收到pong消息後,不須要進行處理,直接返回一個空的message
   System.out.println("receive pong message: " + System.currentTimeMillis());
   Message empty = new Message();
   empty.setMessageType(MessageTypeEnum.EMPTY);
   return empty;
 }
}


 如此,對於自定義協議的消息處理過程已經完成,下面則是使用用Netty實現的客戶端與服務端代碼:


// 服務端
public class Server {

 public static void main(String[] args) {
   EventLoopGroup bossGroup = new NioEventLoopGroup();
   EventLoopGroup workerGroup = new NioEventLoopGroup();
   try {
     ServerBootstrap bootstrap = new ServerBootstrap();
     bootstrap.group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG, 1024)
         .handler(new LoggingHandler(LogLevel.INFO))
         .childHandler(new ChannelInitializer<SocketChannel>()
{
           @Override
           protected void initChannel(SocketChannel ch) throws Exception {
             ChannelPipeline pipeline = ch.pipeline();
             // 添加用於處理粘包和拆包問題的處理器
             pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
             pipeline.addLast(new LengthFieldPrepender(4));
             // 添加自定義協議消息的編碼和解碼處理器
             pipeline.addLast(new MessageEncoder());
             pipeline.addLast(new MessageDecoder());
             // 添加具體的消息處理器
             pipeline.addLast(new ServerMessageHandler());
           }
         });

     ChannelFuture future = bootstrap.bind(8585).sync();
     future.channel().closeFuture().sync();
   } catch (InterruptedException e) {
     e.printStackTrace();
   } finally {
     bossGroup.shutdownGracefully();
     workerGroup.shutdownGracefully();
   }
 }
}
public class Client {
 public static void main(String[] args) {
   NioEventLoopGroup group = new NioEventLoopGroup();
   Bootstrap bootstrap = new Bootstrap();
   try {
     bootstrap.group(group)
         .channel(NioSocketChannel.class)
         .option(ChannelOption.TCP_NODELAY, Boolean.TRUE)
         .handler(new ChannelInitializer<SocketChannel>()
{
           @Override
           protected void initChannel(SocketChannel ch) throws Exception {
             ChannelPipeline pipeline = ch.pipeline();
             // 添加用於解決粘包和拆包問題的處理器
             pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
             pipeline.addLast(new LengthFieldPrepender(4));
             // 添加用於進行心跳檢測的處理器
             pipeline.addLast(new IdleStateHandler(1, 2, 0));
             // 添加用於根據自定義協議將消息與字節流進行相互轉換的處理器
             pipeline.addLast(new MessageEncoder());
             pipeline.addLast(new MessageDecoder());
             // 添加客戶端消息處理器
             pipeline.addLast(new ClientMessageHandler());
           }
         });

     ChannelFuture future = bootstrap.connect("127.0.0.1", 8585).sync();
     future.channel().closeFuture().sync();
   } catch (InterruptedException e) {
     e.printStackTrace();
   } finally {
     group.shutdownGracefully();
   }
 }
}


運行上述代碼以後,咱們能夠看到客戶端和服務器分別打印了以下數據:


// 客戶端
receive pong message: 1555123429356
[trx: d05024d2]1. receive response: nice to meet you too!
[trx: d05024d2]1. attachments: {hometown=wuhan, name=xufeng}
[trx: 66ee1438]2. receive response: nice to meet you too!
// 服務器
receive ping message: 1555123432279
[trx: f582444f]4. receive request: this is my 4 message.
[trx: f582444f]4. attachments: {name=xufeng}
相關文章
相關標籤/搜索