基於Mina的配置中心(三)

基於Mina的配置中心(三)

在第二章裏咱們已經自定義了包MessagePack。接下來咱們要定義編碼器和解碼器。java

  • 編碼器: 把 java對象轉爲二進制編碼,由於在網絡中傳輸的是二進制數據。
  • 解碼器:把二進制數據轉爲 java對象,也就是編碼的逆向過程。

編碼解碼器工廠 MessageProtocolCodecFactory

首先咱們要自定義一個編碼器工廠,就像TextLineCodecFactory同樣。因爲這個寫法是固定的,因此就不放代碼了,具體能夠到GitHub查看源代碼。git

編碼器 MessageProtocolEncoder

這個東西的寫法也是固定的。惟一不一樣的地方就是放置數據。github

@Override
public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {  MessagePack pack = (MessagePack) message;  //設置緩衝區大小,並自動增加  IoBuffer ioBuffer = IoBuffer.allocate(pack.getLen()).setAutoExpand(true);  log.info("MessageProtocolEncoder_encode_length:{}", pack.getLen());  //放置長度  ioBuffer.putInt(pack.getLen());  //放置模塊代碼  ioBuffer.putInt(pack.getModule());  if (StringUtils.isNotBlank(pack.getBody())) {  log.info("MessageProtocolEncoder_encode_length:{}", pack.getBody().getBytes().length);  //放置字節數組  ioBuffer.putString(pack.getBody(), charset.newEncoder());  }  ioBuffer.flip();  out.write(ioBuffer); } 複製代碼

咱們把從MessagePack中取出的數據,按照順序放到IoBuffer中,而後使用out.write(ioBuffer);把消息寫出去,實際上是把二進制數據存儲到了一個隊列中Queue<Object> messageQueueweb

實際上是在這裏把消息發出去的org.apache.mina.filter.codec.ProtocolCodecFilter#filterWriteapache

解碼器 MessageProtocolDecoder

其實解碼器會麻煩一點,咱們腦殼裏要有這個包的模型,包的開頭是length(總長度),而後是module(模塊代碼),最後是Json字符串(Message),在解碼時,還要判斷一下是不是完整的包。json

像下面這樣:數組

@Override
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {  // 包頭的長度  // 拆包時,若是可讀數據的長度小於包頭的長度,就不進行讀取  if (in.remaining() < MessagePack.PACK_HEAD_LEN) {  return false;  } else {  //標記當前position,以便後繼的reset操做能恢復position位置  in.mark();  // 獲取總長度  int length = in.getInt();  //獲取模塊代碼  int module = in.getInt();  log.info("CustomProtocolDecoder_doDecode_length:{}, module:{}", length, module);  // 若是可讀取數據的長度 小於 總長度 - 包頭的長度 ,則結束拆包,等待下一次  if (in.remaining() < (length - MessagePack.PACK_HEAD_LEN)) {  in.reset();  return false;  } else {  //重置回覆position位置到操做前 並讀取一條完整記錄  in.reset();  byte[] bytes = new byte[length];  // 獲取長度4個字節、模塊4個字節、內容,即獲取完整消息  in.get(bytes, 0, length);  String content = new String(bytes, MessagePack.PACK_HEAD_LEN, length - MessagePack.PACK_HEAD_LEN, charset);  // 封裝爲自定義的java對象  MessagePack pack = new MessagePack(module, content);  out.write(pack);  // 若是讀取一條記錄後,還存在數據(粘包),則再次進行調用  return in.remaining() > 0;  }  } } 複製代碼

MessageProtocolCodecFactory中,引入這兩個類。編碼解碼器完成,如今須要把它配置到Mina的配置類中。服務器

MinaServerConfig 中添加下面代碼。網絡

/**  * 編解碼器filter  */ @Bean public ProtocolCodecFilter protocolCodecFilter() {  return new ProtocolCodecFilter(new MessageProtocolCodecFactory()); } 複製代碼

心跳檢測

心跳檢測簡單點說就是客戶端每隔一段時間,向服務器發送一個消息,也就是心跳包,讓服務器知道鏈接狀態沒有問題,客戶端正常在線。若是沒有發送,在必定時間內超過指定次數,服務器會認爲客戶端掉線了,爲了節省資源,會關閉鏈接。session

心跳檢測通常有下面幾種類型:

  1. 活躍型: 小心跳請求包被接受到後,當即發出心跳反饋。
  2. 半活躍型:發送心跳請求,不在意有沒有心跳反饋。可是接收到心跳請求後,也會當即發出心跳反饋。
  3. 聾子型:主動發送心跳請求,不想發送任何心跳反饋,可是接收到心跳請求後,也會當即發出心跳反饋。
  4. 持續監聽型:既不想發送心跳請求也不想發送心跳反饋。

這裏咱們使用被動型,服務器接受客戶端心跳請求,當在規定時間內沒有收到時 將客戶端鏈接關閉。

package com.lww.mina.filter;
 import com.alibaba.fastjson.JSONObject; import com.lww.mina.protocol.MessagePack; import com.lww.mina.util.Const; import lombok.extern.slf4j.Slf4j; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;  /**  * 被動型心跳機制  *  * @author lww  * @date 2020-07-06 22:40  */ @Slf4j public class ServerKeepAliveFactoryImpl implements KeepAliveMessageFactory {   /**  * 用來判斷接收到的消息是否是一個心跳請求包,是就返回true[接收端使用]  */  @Override  public boolean isRequest(IoSession session, Object message) {  if (message instanceof MessagePack) {  MessagePack pack = (MessagePack) message;  if (pack.getModule() == Const.HEART_BEAT) {  log.info("收到 心跳請求 ServerKeepAliveFactoryImpl_isRequest_pack:{}", JSONObject.toJSONString(message));  return true;  }  }  return false;  }   /**  * 用來判斷接收到的消息是否是一個心跳回復包,是就返回true[發送端使用]  */  @Override  public boolean isResponse(IoSession session, Object message) {  return false;  }   /**  * 在須要發送心跳時,用來獲取一個心跳請求包[發送端使用]  */  @Override  public Object getRequest(IoSession session) {  return null;  }   /**  * 在須要回覆心跳時,用來獲取一個心跳回復包[接收端使用]  */  @Override  public Object getResponse(IoSession session, Object message) {  MessagePack pack = (MessagePack) message;  // 將超時次數置爲0  session.setAttribute(Const.TIME_OUT_KEY, 0);  log.info("響應 心跳請求 ServerKeepAliveFactoryImpl_getResponse_request:{}", JSONObject.toJSONString(message));  return new MessagePack(Const.HEART_BEAT, "heart");  } } 複製代碼

而後一樣把心跳檢測也配置到 MinaServerConfig 中。

/**  * 心跳檢測  */ @Bean public ServerKeepAliveFactoryImpl keepAliveFactoryImpl() {  return new ServerKeepAliveFactoryImpl(); }  /**  * 心跳filter  */ @Bean public KeepAliveFilter keepAliveFilter(ServerKeepAliveFactoryImpl keepAliveFactory) {  // 注入心跳工廠,讀寫空閒  KeepAliveFilter filter = new KeepAliveFilter(keepAliveFactory, IdleStatus.BOTH_IDLE);  // 設置是否forward到下一個filter  filter.setForwardEvent(true);  // 設置心跳頻率 5秒一次  filter.setRequestInterval(Const.HEART_BEAT_RATE);  return filter; } 複製代碼

最後將過濾器注入到mina的鏈式管理器中,還有開啓minaserver服務,並設置對應的參數。

/**  * 將過濾器注入到mina的鏈式管理器中  */ @Bean public DefaultIoFilterChainBuilder defaultIoFilterChainBuilder(ExecutorFilter executorFilter,  LoggingFilter loggingFilter, ProtocolCodecFilter protocolCodecFilter, KeepAliveFilter keepAliveFilter) {  DefaultIoFilterChainBuilder chainBuilder = new DefaultIoFilterChainBuilder();  Map<String, IoFilter> filters = new LinkedHashMap<>();  //多線程過濾器  filters.put("executor", executorFilter);  //日誌  filters.put("logger", loggingFilter);  //編碼 解碼  filters.put("codec", protocolCodecFilter);  //心跳  filters.put("keepAliveFilter", keepAliveFilter);  chainBuilder.setFilters(filters);  return chainBuilder; }  /**  * 開啓mina的server服務,並設置對應的參數  */ @Bean public IoAcceptor ioAcceptor(DefaultIoFilterChainBuilder filterChainBuilder) throws IOException {  IoAcceptor acceptor = new NioSocketAcceptor();  //設置緩衝區大小  acceptor.getSessionConfig().setReadBufferSize(config.getReadBufferSize());  //設置空閒狀態時間,10秒沒操做就進入空閒狀態  acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, config.getIdelTimeOut());  //過濾器鏈  acceptor.setFilterChainBuilder(filterChainBuilder);  //處理器 這個 handler 處理全部的鏈接事件  acceptor.setHandler(new MinaServerHandler());  //綁定地址  acceptor.bind(new InetSocketAddress(config.getAddress(), config.getPort()));  return acceptor; } 複製代碼

這裏有一個問題:

//設置空閒狀態時間,10秒沒操做就進入空閒狀態
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, config.getIdelTimeOut()); 複製代碼

這個空閒時間是沒有生效的,由於使用了心跳檢測,空閒狀態時間就是心跳檢測的時間,因此也就是5秒。

配置類已經基本完成了,還有一個MinaServerHandler,這個handler就是處理客戶端消息的處理器。
能夠先建立出來,這樣配置類不會報錯。

package com.lww.mina.handler;
 import org.apache.mina.core.service.IoHandlerAdapter;  /**  * 處理客戶端發送的消息  *  * @author lww  * @date 2020-07-06 22:53  */ public class MinaServerHandler extends IoHandlerAdapter {  } 複製代碼

最後結構圖:

總結

配置類基本完成了,咱們這樣配置了編碼解碼、心跳檢測後,Mina會自動調用,是否是簡單了不少?固然還剩下一個handler,已經粘了太多代碼了,第四章再繼續吧。

第四章會完成handler,還有Session管理,還有當配置更新時,推到客戶端。完成了這些,基本上Server端就差很少完成了,而後會寫Client端,Client纔是含金量更高的東西。敬請期待!

本次的代碼沒有所有粘出來,有興趣的能夠去Github查看。

項目源碼

歡迎你們關注個人公衆號,共同窗習,一塊兒進步。加油🤣

本文使用 mdnice 排版

相關文章
相關標籤/搜索