Netty網絡編程實戰 - 通訊框架,私有協議、生產級報文追蹤、認證機制、自動空閒檢測、斷線自動重連

前言
bootstrap

前面咱們已經基於不一樣維度介紹關於Netty的不少知識了,包括通訊原理、框架工做機制、核心組件、應用實戰,以及不一樣場景對不一樣方案的選擇等等。那麼咱們此次就研究一下咱們項目中基於Netty端對端開發中如何搭建一個完整的應用框架,以供開發人員嵌入他們關注的各類應用部件等。緩存


實現Netty應用級框架須要考慮哪些因素服務器

image.png

不少人問,咱們在基於某種網絡通訊框架構建咱們本身的應用框架的時候,究竟須要考慮到哪些方面?咱們如何構建一個與業務解耦的應用基礎設施、定製協議格式、健壯性的機制等來支撐咱們的開發呢?你們也能夠在下方的留言討論,而就我的的理解和相關實踐經驗,我認爲至少應考慮到如下的問題:網絡

  1. 網絡通訊協議的選擇,方案的比較, 咱們應基於TCP?UDP?仍是應用層的一些成熟協議?...session

  2. 網絡I/O模型該採用何種?BIO?NIO?IO複用?AIO?仍是信號驅動IO呢?架構

  3. 底層通訊框架咱們要定製,仍是沿用已有的成熟框架?框架

  4. 咱們場景是否須要統必定製全局可複用的交互協議、報文等?異步

  5. 是否應該建設一種高效可靠的消息編解碼機制支撐快速通訊?ide

  6. 來自客戶端的鏈接、業務請求等是否須要有認證和鑑權?工具

  7. 當業務通訊發生異常了,可否方便看到和追蹤通訊報文細節(更便於咱們一步一步查找問題緣由)??

  8. 當服務端由於非預期緣由斷開或崩潰,而後發現修復重啓後是否每一個客戶端都要手動再鏈接一下??

  9. 當服務器空閒一段時間後,是否該有一種機制自動觸發心跳檢測網絡的健康情況??

   ... ...

  還有不少本文就不一一列舉。那咱們今天就以上考慮到的問題點來手寫實現一個基於Netty通訊框架的應用級框架,而後驗證咱們的問題是否能得以圓滿解決。


應用框架實戰

  在如下應用框架中咱們將給出以上問題的解決方案

  1. 網絡通訊協議的選擇,方案的比較, 咱們應基於TCP?UDP?仍是應用層的一些成熟協議? 採用TCP/IP

  2. 網絡I/O模型該採用何種?BIO?NIO?IO複用?AIO?仍是信號驅動IO呢?採用NIO非阻塞模型

  3. 底層通訊框架咱們要定製,仍是沿用已有的成熟框架? 基於Netty框架

  4. 咱們場景是否須要統必定製全局可複用的交互協議、報文等? 私有協議定義和交互約定方式

  5. 是否應該建設一種高效可靠的消息編解碼機制支撐快速通訊?採用ByteToMessage/MessageToByte/Kryo

  6. 來自客戶端的鏈接、業務請求等服務端是否須要有認證和鑑權?採用服務端白名單

  7. 當業務通訊發生異常了,可否方便看到和追蹤通訊報文細節(更便於咱們一步一步查找問題緣由)??採用Netty內置Logging機制

  8. 當服務端由於非預期緣由斷開或崩潰,而後發現修復重啓後是否每一個客戶端都要手動再鏈接一下?? 設計斷線自動嘗試重連

  9. 當服務器空閒一段時間後,是否該有一種機制自動觸發心跳檢測網絡的健康情況??採用IdleState和自動觸發機制


1、基礎設施部分

/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:Kryo序列化器單例
*/
public class KryoBuilder {
   private KryoBuilder(){}

   /**
    * 獲取單例
    *
@return
   
*/
   
public static Kryo getInstance(){
       return SingleKryo.builder;
   
}
   private static class SingleKryo{
       private static Kryo builder = new Kryo();
   
}

   /**
    * 構建kryo對象和註冊
    *
@return
   
*/
   
public static Kryo build(){
       Kryo kryo = getInstance();
       
kryo.setRegistrationRequired(false);
       
kryo.register(Arrays.asList("").getClass(), new ArraysAsListSerializer());
       
kryo.register(GregorianCalendar.class, new GregorianCalendarSerializer());
       
kryo.register(InvocationHandler.class, new JdkProxySerializer());
       
kryo.register(BigDecimal.class, new DefaultSerializers.BigDecimalSerializer());
       
kryo.register(BigInteger.class, new DefaultSerializers.BigIntegerSerializer());
       
kryo.register(Pattern.class, new RegexSerializer());
       
kryo.register(BitSet.class, new BitSetSerializer());
       
kryo.register(URI.class, new URISerializer());
       
kryo.register(UUID.class, new UUIDSerializer());
       
UnmodifiableCollectionsSerializer.registerSerializers(kryo);
       
SynchronizedCollectionsSerializer.registerSerializers(kryo);

       
kryo.register(HashMap.class);
       
kryo.register(ArrayList.class);
       
kryo.register(LinkedList.class);
       
kryo.register(HashSet.class);
       
kryo.register(TreeSet.class);
       
kryo.register(Hashtable.class);
       
kryo.register(Date.class);
       
kryo.register(Calendar.class);
       
kryo.register(ConcurrentHashMap.class);
       
kryo.register(SimpleDateFormat.class);
       
kryo.register(GregorianCalendar.class);
       
kryo.register(Vector.class);
       
kryo.register(BitSet.class);
       
kryo.register(StringBuffer.class);
       
kryo.register(StringBuilder.class);
       
kryo.register(Object.class);
       
kryo.register(Object[].class);
       
kryo.register(String[].class);
       
kryo.register(byte[].class);
       
kryo.register(char[].class);
       
kryo.register(int[].class);
       
kryo.register(float[].class);
       
kryo.register(double[].class);

       return
kryo;
   
}
}
/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:Kryo序列化器
*/
public class KryoSerializer {
   private static final Kryo kryo = KryoBuilder.build();

   
/**
    * 序列化
    *
    *
@param object
   
* @param buf
   
*/
   
public static void serialize(Object object, ByteBuf buf) {
       ByteArrayOutputStream stream = new ByteArrayOutputStream();
       try
{
           Output out = new Output(stream);
           
kryo.writeClassAndObject(out, object);
           
out.flush();
           
out.close();

           byte
[] bytes = stream.toByteArray();
           
stream.flush();
           
stream.close();
           
/**
            * 寫入buffer
            */
           
buf.writeBytes(bytes);
       
} catch (Exception e) {
           e.printStackTrace();
       
}
   }

   /**
    * 反序列化
    *
@param buf 數據緩衝
    *
@return
   
*/
   
public static Object deserialize(ByteBuf buf) {
       try(ByteBufInputStream stream = new ByteBufInputStream(buf)) {
           Input input = new Input(stream);
           return
kryo.readClassAndObject(input);
       
} catch (Exception e) {
           e.printStackTrace();
       
}
       return null;
   
}
}
/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:Kryo編碼器
*/
public class KryoEncoder extends MessageToByteEncoder<Message> {
   /**
    * 編碼實現
    *
@param channelHandlerContext 處理器上下文
    *
@param message 報文
    *
@param byteBuf 對端數據緩衝
    *
@throws Exception
    */
   
@Override
   
protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception {
       KryoSerializer.serialize(message, byteBuf);
       
channelHandlerContext.flush();
   
}
}
/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:Kryo解碼器
*/
public class KryoDecoder extends ByteToMessageDecoder {
   /**
    * 解碼實現
    *
@param channelHandlerContext 處理器上下文
    *
@param byteBuf 對端緩衝
    *
@param list 反序列化列表
    *
@throws Exception
    */
   
@Override
   
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
       Object object = KryoSerializer.deserialize(byteBuf);
       
list.add(object);
   
}
}
/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:系統快捷工具類
*/
public final class Utility {
   /**
    * 構建報文
    *
@param sessionId 會話Id
    *
@param msg 主體
    *
@return
   
*/
   
public static Message buildMessage(int sessionId, Object msg, byte type){
       //獲取校驗碼
       
final int OFFSET = 9;
       int
seed = sessionId+(sessionId > OFFSET ? sessionId : sessionId+OFFSET);
       
String crc = CRC.getCRC16(seed);
       
MessageHeader header = new MessageHeader();
       
header.setCrc(crc);
       
header.setLength(calcBufferLen(msg));
       
header.setSessionId(sessionId);
       
header.setType(type);

       
Message message = new Message();
       
message.setHeader(header);
       
message.setBody(msg);

       return
message;
   
}

   /**
    * 是否IP認證經過
    *
@return
   
*/
   
public static boolean isIPPassed(String ip){
       for (String p : Constant.WHITELIST){
           if(ip.equals(p)){
               return true;
           
}
       }
       return false;
   
}

   /**
    * 計算報文長度
    *
@param msg 報文對象
    *
@return int
    */
   
private static int calcBufferLen(Object msg){
       try(ByteArrayOutputStream stream = new ByteArrayOutputStream();
           
ObjectOutputStream output = new ObjectOutputStream(stream)){
           output.writeObject(msg);
           return
stream.toByteArray().length;
       
}catch (IOException e){
           e.printStackTrace();
       
}
       return 0;
   
}
}
/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:私有報文
*/
public final class Message {//<T extends Object>
   
/**
    * 報文頭
    */
   
private MessageHeader header;
   
/**
    * 報文主體
    */
   
private Object body;
   public
MessageHeader getHeader() {
       return header;
   
}

   public void setHeader(MessageHeader header) {
       this.header = header;
   
}

   public Object getBody() {
       return body;
   
}

   public void setBody(Object body) {
       this.body = body;
   
}

   @Override
   
public String toString() {
       return "Message [header=" + this.header + "][body="+this.body+"]";
   
}
}
/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:報文頭
*/
public final class MessageHeader {
   /**
    * CRC校驗碼
    */
   
private String crc;
   
/**
    * 會話id
    */
   
private int sessionId;
   
/**
    * 報文長度
    */
   
private int length;
   
/**
    * 報文類型碼
    */
   
private byte type;
   
/**
    * 報文優先級
    */
   
private int priority;
   
/**
    * 報文附件
    */
   
private Map<String,Object> attachment = new HashMap<>();

   public
String getCrc() {
       return crc;
   
}

   public void setCrc(String crc) {this.crc = crc;}

   public int getSessionId() {
       return sessionId;
   
}

   public void setSessionId(int sessionId) {
       this.sessionId = sessionId;
   
}

   public int getLength() {
       return length;
   
}

   public void setLength(int length) {
       this.length = length;
   
}

   public byte getType() {
       return type;
   
}

   public void setType(byte type) {
       this.type = type;
   
}

   public int getPriority() {
       return priority;
   
}

   public void setPriority(int priority) {
       this.priority = priority;
   
}

   public Map<String, Object> getAttachment() {
       return attachment;
   
}

   public void setAttachment(Map<String, Object> attachment) {
       this.attachment = attachment;
   
}
   @Override
   
public String toString() {
       return "MessageHeader [crc=" + this.crc + ", length=" + this.length
               
+ ", sessionId=" + this.sessionId + ", type=" + this.type + ", priority="
               
+ this.priority + ", attachment=" + this.attachment + "]";
   
}
}
/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:數據報文類型
*/
public enum MessageType {
   /**
    * 認證請求
    */
   
AUTH_REQUEST((byte)0),
   
/**
    * 認證應答
    */
   
AUTH_REPLY((byte)1),
   
/**
    * 心跳請求
    */
   
HEARTBEAT_REQUEST((byte)2),
   
/**
    * 心跳應答
    */
   
HEARTBEAT_REPLY((byte)3),
   
/**
    * 普通請求
    */
   
REQUEST((byte)4),
   
/**
    * 普通應答
    */
   
REPLY((byte)5);

   public byte
getValue() {
       return value;
   
}

   private final byte value;
   
MessageType(byte b) {
       this.value = b;
   
};
}

2、客戶端部分

/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:框架客戶端啓動器類
*/
public class ClientStarter {
   /**
    * 日誌處理
    */
   
private static final Log log = LogFactory.getLog(ClientStarter.class);

   
/**
    * 客戶端啓動
    *
@param args
   
*/
   
public static void main(String[] args) throws Exception {
       ClientTask client = new ClientTask(Constant.SERV_HOST, Constant.SERV_PORT);
       new
Thread(client).start();
       while
(!client.isConnected()){
           synchronized (client){
               client.wait();
           
}
       }
       log.info("與服務器鏈接已創建,準備通訊...");
       
/**
        * 採用在控制檯適時輸入消息主體的方式,發送報文
        */
       
Scanner scanner = new Scanner(System.in);
       for
(;;){
           String body = scanner.next();
           if
(null != body && !"".equals(body)){
               if(!body.equalsIgnoreCase("exit")){
                   client.send(body);
               
}else{
                   client.close();
                   
/**
                    * 等待鏈接正常關閉通知
                    */
                   
while (client.isConnected()){
                       synchronized(client){
                           client.wait();
                       
}
                   }
                   scanner.close();
                   
System.exit(0);//提示正常退出
               
}
           }
       }
   }
}
/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:客戶端封裝
*/
public class ClientTask implements Runnable {
   private final String host;
   private final int
port;
   public
ClientTask(String host, int port) {
       this.host = host;
       this
.port = port;
   
}
   /**
    * 日誌處理
    */
   
private static final Log log = LogFactory.getLog(ClientTask.class);
   
/**
    * 報文計數器
    */
   
public final static AtomicInteger counter = new AtomicInteger(0);
   
/**
    * 這裏用1個後臺線程,定時執行檢測客戶端鏈接是否斷開,若非用戶斷開則自動嘗試重連
    */
   
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
   
/**
    * 客戶端鏈接通道
    */
   
private Channel channel;
   
/**
    * 工做線程池組
    */
   
private EventLoopGroup group = new NioEventLoopGroup();
   
/**
    * 是否意外關閉:出異常或網絡斷開(區別於人爲主動關閉)
    */
   
private volatile boolean except_closed = true;
   
/**
    * 是否鏈接成功
    */
   
private volatile boolean connected = false;
   private static
Object _obj = new Object();

   
/**
    * 是否鏈接
    *
@return
   
*/
   
public boolean isConnected(){
       return this.connected;
   
}

   /**
    * 執行客戶端
    */
   
@Override
   
public void run() {
       try {
           this.connect();
       
} catch (Exception e) {
           e.printStackTrace();
       
}
   }
   /**
    * 鏈接服務器端
    */
   
public void connect() throws Exception{
       try {
           Bootstrap bootstrap = new Bootstrap();
           
bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    //設置TCP底層保溫發送不延遲
                   
.option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializerExt());

           
//(1)異步鏈接服務器端,等待發送和接收報文
           
ChannelFuture future = bootstrap.connect(new InetSocketAddress(this.host, this.port)).sync();
           this
.channel = future.sync().channel();
           
//(2)通知其餘等待線程,鏈接已創建
           
synchronized(this){
               this.connected = true;
               this
.notifyAll();
           
}
           this.channel.closeFuture().sync();
       
} finally {
           //檢測並執行重鏈接
           
this.reconnect();
       
}
   }

   /**
    * 關閉鏈接:非正常關閉
    */
   
public void close(){
       this.except_closed = false;
       this
.channel.close();
   
}

   /**
    * 發送報文
    *
@param body 報文主體
    */
   
public void send(String body){
       if(null != this.channel && this.channel.isActive()){
           Message message = Utility.buildMessage(counter.incrementAndGet(), body, MessageType.REQUEST.getValue());
           this
.channel.writeAndFlush(message);
           return;
       
}
       log.info("通訊還沒有創建,請稍後再試...");
   
}
   /**
    * 執行重鏈接
    *
@throws Exception
    */
   
private void reconnect() throws Exception{
       //主動關閉被檢測到
       
if(this.except_closed){
           log.info("鏈接非正常關閉,準備嘗試重連...");
           this
.executor.execute(new ReconnectTask(this.host, this.port));
       
}else{
           //主動關閉鏈接:釋放資源
           
this.relese();
       
}
   }

   /**
    * 關閉鏈接釋放資源,通知其它等待的線程
    *
@throws Exception
    */
   
private void relese() throws Exception{
       this.channel = null;
       this
.group.shutdownGracefully().sync();
       synchronized
(this){
           this.connected = false;
           this
.notifyAll();
       
}
   }

   /**
    * 嘗試重連服務器任務
    */
   
private class ReconnectTask implements Runnable{
       private final String h;
       private final int
p;
       public
ReconnectTask(String h, int p) {
           this.h = h;
           this
.p = p;
       
}
       /**
        * 嘗試重連
        */
       
@Override
       
public void run() {
           try {
               //間隔1秒重試一次
               
Thread.sleep(1000);
               
connect();
           
} catch (Exception e) {
               e.printStackTrace();
           
}
       }
   }
}
/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:客戶端通道初始化器擴展
*/
public class ChannelInitializerExt extends ChannelInitializer<SocketChannel> {
   /**
    * 初始化通道
    *
@param channel 通道
    *
@throws Exception 異常
    */
   
@Override
   
protected void initChannel(SocketChannel channel) throws Exception {
       ChannelPipeline pipeline = channel.pipeline();
       
//(1)報文粘包處理
       
pipeline.addLast(new LengthFieldBasedFrameDecoder(Constant.MAX_MSG_LEN,0,2,0,2));
       
//(2)給報文增長分割長度
       
pipeline.addLast(new LengthFieldPrepender(2));
       
//(3)報文解碼器
       
pipeline.addLast(new KryoDecoder());
       
//(4)報文編碼器
       
pipeline.addLast(new KryoEncoder());
       
//(5)鏈接超時檢測
       
pipeline.addLast(new ReadTimeoutHandler(Constant.TIMEOUT_SECONDS));
       
//(6)認證請求
       
pipeline.addLast(new AuthenticationHandler());
       
//(7)心跳處理:發送心跳
       
pipeline.addLast(new HeartbeatHandler());
   
}
}
/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:客戶端認證請求
*/
public class AuthenticationHandler extends ChannelInboundHandlerAdapter {
   /**
    * 日誌處理
    */
   
private static final Log log = LogFactory.getLog(AuthenticationHandler.class);
   
/**
    * 全局計數器
    */
   
private final static AtomicInteger counter = new AtomicInteger(0);
   
/**
    * 通道開啓事件
    *
@param ctx 處理器上下文
    *
@throws Exception
    */
   
@Override
   
public void channelActive(ChannelHandlerContext ctx) throws Exception {
       /**
        * 發起認證請求
        */
       
Message message = Utility.buildMessage(counter.incrementAndGet(), "Auth Request",
               
MessageType.AUTH_REQUEST.getValue());
       
ctx.writeAndFlush(message);
   
}

   /**
    * 處理網絡讀取事件
    *
@param ctx 處理器上下文
    *
@param msg 報文
    *
@throws Exception
    */
   
@Override
   
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
       Message message = (Message)msg;
       if
(null != message){
           MessageHeader header = message.getHeader();
           
//處理認證應答
           
if(null != header && header.getType() == MessageType.AUTH_REPLY.getValue()){
               String body = message.getBody().toString();
               if
(body.equals(AuthenticationResult.FAILED)){
                   log.info("Authentication failed, channel close..");
                   
ctx.close();
                   return;
               
}
               log.info("Authentication is ok: "+message);
           
}
       }
       ctx.fireChannelRead(msg);
   
}

   /**
    * 客戶端認證異常處理
    *
@param ctx
   
* @param cause
   
* @throws Exception
    */
   
@Override
   
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
       cause.printStackTrace();
       
ctx.fireExceptionCaught(cause);
   
}
}
/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:客戶端心跳處理器
*/
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
   /**
    * 日誌處理
    */
   
private static final Log log = LogFactory.getLog(AuthenticationHandler.class);
   
/**
    * 心跳定時任務
    */
   
private volatile ScheduledFuture<?> scheduleHeartbeat;
   
/**
    * 處理客戶端心跳請求報文
    *
@param ctx 處理器上下文
    *
@param msg 消息對象
    *
@throws Exception
    */
   
@Override
   
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
       Message message = (Message)msg;
       if
(null != message){
           MessageHeader header = message.getHeader();
           
//處理認證應答
           
if(header.getType() == MessageType.AUTH_REPLY.getValue()){
               //登陸完成後,開啓客戶端對服務端心跳
               
this.scheduleHeartbeat = ctx.executor().scheduleAtFixedRate(new HeartbeatTask(ctx),
                       
0, Constant.HEARTBEAT_TIMEOUT,
                       
TimeUnit.MILLISECONDS);
               return;
           
}
           //處理心跳應答
           
if(header.getType() == MessageType.HEARTBEAT_REPLY.getValue()){
               log.info("Client recevied server heartbeat: "+message);
               
ReferenceCountUtil.release(msg);
               return;
           
}
       }
       ctx.fireChannelRead(msg);
   
}

   /**
    * 客戶端捕獲心跳異常
    *
@param ctx 處理器上下文
    *
@param cause 異常
    *
@throws Exception
    */
   
@Override
   
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
       cause.printStackTrace();
       if
(null != this.scheduleHeartbeat){
           this.scheduleHeartbeat.cancel(true);
           this
.scheduleHeartbeat = null;
       
}
       //傳遞給TailHandler處理
       
ctx.fireExceptionCaught(cause);
   
}

   /**
    * 定義心跳任務
    */
   
private class HeartbeatTask implements Runnable{
       /**
        * 心跳計數器
        */
       
private final AtomicInteger counter = new AtomicInteger(0);
       private final
ChannelHandlerContext ctx;
       public
HeartbeatTask(ChannelHandlerContext ctx) {
           this.ctx = ctx;
       
}
       /**
        * 心跳任務執行
        */
       
@Override
       
public void run() {
           //客戶端心跳報文
           
Message heartbeat = Utility.buildMessage(this.counter.incrementAndGet(), Constant.HEARTBEAT_ACK,
                   
MessageType.HEARTBEAT_REQUEST.getValue());
           this
.ctx.writeAndFlush(heartbeat);
       
}
   }
}

3、服務器部分

/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:框架服務端啓動器類
*/
public class ServerStarter {
   /**
    * 日誌處理
    */
   
private static final Log log = LogFactory.getLog(ServerStarter.class);
   
/**
    * 服務端啓動
    *
@param args
   
*/
   
public static void main(String[] args) throws Exception {
       //主線程池組:負責處理鏈接
       
EventLoopGroup main = new NioEventLoopGroup();
       
//工做線程池組:負責請求對應的業務Handler處理
       
EventLoopGroup work = new NioEventLoopGroup();

       
ServerBootstrap bootstrap = new ServerBootstrap();
       
bootstrap.group(main, work)
                .channel(NioServerSocketChannel.class)
                //設置底層協議接收緩存隊列最大長度
               
.option(ChannelOption.SO_BACKLOG, Constant.TCP_MAX_QUEUE_SIZE)
                .childHandler(new ChannelInitializerExt());
       
//綁定端口,等待同步報文
       
bootstrap.bind(Constant.SERV_PORT).sync();
       
log.info("Server started and listen port: "+Constant.SERV_PORT+"...");
   
}
}
/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:服務器通道初始化器
*/
public class ChannelInitializerExt extends ChannelInitializer<SocketChannel> {
   /**
    * 初始化處理器
    *
@param channel 鏈接通道
    *
@throws Exception 異常
    */
   
@Override
   
protected void initChannel(SocketChannel channel) throws Exception {
       ChannelPipeline pipeline = channel.pipeline();
       
//(1)日誌打印處理:能夠打印報文字節碼
       
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
       
//(2)處理粘包問題:帶長度
       
pipeline.addLast(new LengthFieldBasedFrameDecoder(Constant.MAX_MSG_LEN,0,2,0,2));
       
//(3)報文編碼器:消息發送增長分隔符
       
pipeline.addLast(new LengthFieldPrepender(2));
       
//(4)私有報文解碼
       
pipeline.addLast(new KryoDecoder());
       
//(5)私有報文編碼
       
pipeline.addLast(new KryoEncoder());
       
//(6)通道鏈接超時檢測,發送心跳
       
pipeline.addLast(new ReadTimeoutHandler(Constant.TIMEOUT_SECONDS));
       
//(7)身份認證應答
       
pipeline.addLast(new AuthenticationHandler());
       
//(8)心跳應答
       
pipeline.addLast(new HeartbeatHandler());
       
//(9)其餘業務處理...
       
pipeline.addLast(new OtherServiceHandler());
   
}
}
/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:服務端身份認證處理器
*/
public class AuthenticationHandler extends ChannelInboundHandlerAdapter {
   /**
    * 日誌處理
    */
   
private static final Log log = LogFactory.getLog(AuthenticationHandler.class);
   
/**
    * 定義認證業務計數器
    */
   
private static final AtomicInteger counter = new AtomicInteger(0);
   
/**
    * 緩存已認證ip列表
    */
   
private static final List<String>  authedIPList = new LinkedList<>();
   
/**
    * 認證業務處理
    *
@param ctx
   
* @param msg
   
* @throws Exception
    */
   
@Override
   
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
       Message message = (Message)msg;
       
Message authMessage;
       if
(null != message){
           MessageHeader header = message.getHeader();
           
//處理認證請求
           
if(null != header && header.getType() == MessageType.AUTH_REQUEST.getValue()){
               String ip = ctx.channel().remoteAddress().toString();
               
String result;
               
//重複登陸
               
if(authedIPList.contains(ip)){
                   result = AuthenticationResult.REPEAT_AUTH.toString();
               
}else{
                   //是否ip認證經過
                   
if(Utility.isIPPassed(ip)){
                       authedIPList.add(ip);
                       
result = AuthenticationResult.SUCCESS.toString();
                   
}else{
                       result = AuthenticationResult.FAILED.toString();
                   
}
               }
               authMessage = Utility.buildMessage(counter.incrementAndGet(), result, MessageType.AUTH_REPLY.getValue());
               
ctx.writeAndFlush(authMessage);
               
//釋放對象,再也不向後傳遞
               
ReferenceCountUtil.release(msg);
               
log.info("Server reply client auth request:"+authMessage);
               return;
           
}
       }
       ctx.fireChannelRead(msg);
   
}

   /**
    * 認證處理器捕獲異常處理
    *
@param ctx 處理器上下文
    *
@param cause 異常
    *
@throws Exception
    */
   
@Override
   
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
       cause.printStackTrace();
       
authedIPList.remove(ctx.channel().remoteAddress().toString());
       
ctx.close();
       
ctx.fireExceptionCaught(cause);
   
}
}
/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:服務器端心跳包處理器
*/
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
   /**
    * 日誌處理
    */
   
private static final Log log = LogFactory.getLog(HeartbeatHandler.class);
   
/**
    * 會話計數器
    */
   
private final AtomicInteger counter = new AtomicInteger(0);
   
/**
    * 處理心跳報文
    *
@param ctx 處理器上下文
    *
@param msg 消息報文
    *
@throws Exception
    */
   
@Override
   
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
       Message message = (Message)msg;
       if
(null != message){
           MessageHeader header = message.getHeader();
           
/**
            * 處理心跳請求
            */
           
if(null != header && header.getType() == MessageType.HEARTBEAT_REQUEST.getValue()){
               log.info("Server recevied client heartbeat: "+message);
               
//應答報文
               
Message heartbeat = Utility.buildMessage(counter.incrementAndGet(), Constant.HEARTBEAT_ACK,
                       
MessageType.HEARTBEAT_REPLY.getValue());
               
ctx.writeAndFlush(heartbeat);
               
//引用計數器釋放對象
               
ReferenceCountUtil.release(msg);
               return;
           
}
       }
       ctx.fireChannelRead(msg);
   
}
}
/**
*
@author andychen https://blog.51cto.com/14815984
*
@description:其它業務處理
*/
public class OtherServiceHandler extends SimpleChannelInboundHandler<Message> {
   /**
    * 日誌處理
    */
   
private static final Log log = LogFactory.getLog(OtherServiceHandler.class);
   
/**
    * 讀取對端發送的報文
    *
@param ctx 處理器上下文
    *
@param message 報文
    *
@throws Exception
    */
   
@Override
   
protected void channelRead0(ChannelHandlerContext ctx, Message message) throws Exception {
       log.info(message);
   
}
   /**
    * 鏈接斷開事件
    *
@param ctx 上下文
    *
@throws Exception
    */
   
@Override
   
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
       log.info("["+ctx.channel().remoteAddress()+"]斷開了鏈接...");
   
}
}


運行檢驗

image.png

image.png

image.png

image.png

image.png

開發小結

以上就是咱們基於Netty實現的一整套通訊應用框架的和核心代碼。全部的業務開發均可定製和構建相似此的基礎應用框架,開發Handler處理器的業務人員可任意嵌入被解耦化的業務領域Handler。可採用Handler自動注入Netty管道的方式零侵入框架,支持更多更復雜的業務!但願本文能給你們提供靠譜和一站式的借鑑。你們有任何關於Netty的問題能夠在下方留言,謝謝關注!

相關文章
相關標籤/搜索