title: Mina服務端客戶端通訊
date: 2018-09-30 09:00:30
tags:編程
- [mina] - [tcp]
categories:segmentfault
- [編程]
[TOC]緩存
前兩章節已經完整的介紹了理論部分,今天咱們就利用這些理論來實現tcp協議的c/s 通訊。首先咱們簡單回顧下以前的介紹,
在mina中咱們的客戶端和服務端簡直就是如出一轍,只是咱們用不一樣適配器。可是他的數據處理流程是同樣的。今天咱們就重點看看如何創建服務端、客戶端
而且處理二者之間的消息通訊處理微信
服務端和客戶端不一樣的就是咱們建立的監聽對象不一樣而已,客戶端發送消息到服務端,服務端須要經歷過濾器的處理才能到達消息中心,可是在過濾器中咱們就須要將消息進行解碼,而後纔會到消息接收的地方處理咱們的業務。正常狀況下咱們處理完消息須要對客戶端進行迴應。迴應的時候也會經歷過濾器中的編碼邏輯,進行數據編碼而後發送。信息發送到客戶端咱們能夠當作服務端的方向。也是須要進行編解碼的。下面看看服務端的建立代碼session
//建立監聽對象 IoAcceptor acceptor = new NioSocketAcceptor(); TextLineCodecFactory textLineCodecFactory = new TextLineCodecFactory(Charset.forName("utf-8"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue()); //添加過濾器 acceptor.getFilterChain().addLast("logger",new LoggingFilter()); acceptor.getFilterChain().addLast("protocal",new ProtocolCodecFilter( textLineCodecFactory )); //設置時間處理的handler acceptor.setHandler(new ServerMessageHandler()); //設置讀取數據緩存區的大小 acceptor.getSessionConfig().setReadBufferSize(Constaint.READSIZE); //設置多久沒有消息就進入空閒狀態 acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE,Constaint.IDLETIME); //綁定端口 try { acceptor.bind(new InetSocketAddress(Constaint.REMOTE_PORT)); } catch (IOException e) { logger.error(String.format("bind %s error",Constaint.REMOTE_PORT)); e.printStackTrace(); } logger.info(String.format("bind %s success",Constaint.REMOTE_PORT));
//建立監聽對象 IoConnector connector = new NioSocketConnector(); TextLineCodecFactory textLineCodecFactory = new TextLineCodecFactory(Charset.forName("utf-8"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue()); //添加過濾器 //日誌過濾器 。 sltf日誌設置 connector.getFilterChain().addLast("logger",new LoggingFilter()); //在這個過濾器中提供了編解碼,這裏的編碼是以信息中已\r\n結尾算是一條信息 connector.getFilterChain().addLast("protocal",new ProtocolCodecFilter( new SocketFactory() )); //設置時間處理的handler , 提供session生命週期的監聽函數,消息接受,發送的函數 connector.setHandler(new ClientMessageHandler()); //設置讀取數據緩存區的大小 connector.getSessionConfig().setReadBufferSize(Constaint.READSIZE); //設置多久沒有消息就進入空閒狀態 connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE,Constaint.IDLETIME); ConnectFuture future = connector.connect(new InetSocketAddress(Constaint.REMOTE_IP,Constaint.REMOTE_PORT)); //是異步處理,這裏不會形成阻塞 future.addListener(new IoFutureListener<IoFuture>() { @Override public void operationComplete(IoFuture ioFuture) { logger.info("鏈接準備完成"); IoSession session = ioFuture.getSession(); } });
private static Logger logger = LogManager.getLogger(ServerMessageHandler.class); public void sessionCreated(IoSession session) throws Exception { super.sessionCreated(session); logger.info("sessionCreated"); } public void sessionOpened(IoSession session) throws Exception { super.sessionOpened(session); try { IoBuffer buffer = IoBuffer.allocate(30); buffer.clear(); buffer.putString("quit\r\n", Charset.forName("utf-8").newEncoder()); buffer.flip(); session.write(buffer); } catch (Exception e) { logger.error(e.toString()); } logger.info("sessionOpened"); } public void sessionClosed(IoSession session) throws Exception { super.sessionClosed(session); logger.info("sessionClosed"); } public void sessionIdle(IoSession session, IdleStatus idleStatus) throws Exception { super.sessionIdle(session,idleStatus); try { IoBuffer buffer = IoBuffer.allocate(30); buffer.clear(); buffer.putString("quit\r\n", Charset.forName("utf-8").newEncoder()); buffer.flip(); session.write(buffer); } catch (Exception e) { logger.error(e.toString()); } // logger.info("sessionIdle"); } public void exceptionCaught(IoSession ioSession, Throwable throwable) throws Exception { logger.info("exceptionCaught"); throwable.printStackTrace(); } public void messageReceived(IoSession session, Object message) throws Exception { super.messageReceived(session, message); String info = message.toString(); Date date = new Date(System.currentTimeMillis()); SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd HH:mm:ss"); String time = sdf.format(date); session.write(time); System.out.println("接收到的消息:"+info); } public void messageSent(IoSession session, Object message) throws Exception { super.messageSent(session, message); logger.info("messageSent"); }
public class SocketFactory implements ProtocolCodecFactory { private MessageDecoder decoder; private MessageEncoder encoder; public SocketFactory() { decoder = new MessageDecoder(); encoder = new MessageEncoder(); } public ProtocolDecoder getDecoder(IoSession session) throws Exception { return this.decoder; } public ProtocolEncoder getEncoder(IoSession session) throws Exception { return this.encoder; } }
解碼器寫好以後只須要在上面自定義工廠中建立就行了。至於自定義編碼器只須要繼承CumulativeProtocolDecoder這個類就行了。並且複寫doDecode方法就行了。這個方法的返回值是boolean類型。返回值不一樣表明意義不一。這裏須要重點理清楚異步
記住三種狀況 半包 、 正常 、 粘包tcp
public class MessageDecoder extends CumulativeProtocolDecoder { /** * 此方法return true : 表示父類中CumulativeProtocolDecoder會不斷的調用此方法進行消息的消費 * return false: 表示消息已經消費徹底了,緩存中就算有數據也不會再消費了。等待再次客戶端 * 發送消息時會觸發消息發送接口,此時會將新舊消息拼接再一塊兒進行處理 * @param ioSession * @param ioBuffer * @param protocolDecoderOutput * @return * @throws Exception */ @Override protected boolean doDecode(IoSession ioSession, IoBuffer ioBuffer, ProtocolDecoderOutput protocolDecoderOutput) throws Exception { IoBuffer buffer = IoBuffer.allocate(10); while (ioBuffer.hasRemaining()) { if (ioBuffer.remaining()<3) { //繼續接受 return false; } //獲取三個字節 int oldLimit = ioBuffer.limit(); ioBuffer.limit(ioBuffer.position()+3); String text = ioBuffer.getString(Charset.forName("UTF-8").newDecoder()); protocolDecoderOutput.write(text); ioBuffer.limit(oldLimit); if (ioBuffer.hasRemaining()) { return true; } } return false; } }
public class MessageEncoder extends ProtocolEncoderAdapter { @Override public void encode(IoSession ioSession, Object o, ProtocolEncoderOutput protocolEncoderOutput) throws Exception { //TODO 根據協議編碼 //組裝好以後 ioSession.write(IoBuffer)寫出 System.out.println(o); } }
加入戰隊ide