Mina實現Socket通訊完整過程


title: Mina服務端客戶端通訊
date: 2018-09-30 09:00:30
tags:編程

- [mina]
- [tcp]

categories:segmentfault

- [編程]

permalink: zxh

[TOC]緩存

前兩章節已經完整的介紹了理論部分,今天咱們就利用這些理論來實現tcp協議的c/s 通訊。首先咱們簡單回顧下以前的介紹,
在mina中咱們的客戶端和服務端簡直就是如出一轍,只是咱們用不一樣適配器。可是他的數據處理流程是同樣的。今天咱們就重點看看如何創建服務端、客戶端
而且處理二者之間的消息通訊處理微信

服務端

服務端和客戶端不一樣的就是咱們建立的監聽對象不一樣而已,客戶端發送消息到服務端,服務端須要經歷過濾器的處理才能到達消息中心,可是在過濾器中咱們就須要將消息進行解碼,而後纔會到消息接收的地方處理咱們的業務。正常狀況下咱們處理完消息須要對客戶端進行迴應。迴應的時候也會經歷過濾器中的編碼邏輯,進行數據編碼而後發送。信息發送到客戶端咱們能夠當作服務端的方向。也是須要進行編解碼的。下面看看服務端的建立代碼session

//建立監聽對象
IoAcceptor acceptor = new NioSocketAcceptor();
TextLineCodecFactory textLineCodecFactory =
        new TextLineCodecFactory(Charset.forName("utf-8"), LineDelimiter.WINDOWS.getValue(),
                LineDelimiter.WINDOWS.getValue());
//添加過濾器
acceptor.getFilterChain().addLast("logger",new LoggingFilter());
acceptor.getFilterChain().addLast("protocal",new ProtocolCodecFilter(
        textLineCodecFactory
));
//設置時間處理的handler
acceptor.setHandler(new ServerMessageHandler());
//設置讀取數據緩存區的大小
acceptor.getSessionConfig().setReadBufferSize(Constaint.READSIZE);
//設置多久沒有消息就進入空閒狀態
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE,Constaint.IDLETIME);
//綁定端口
try {
    acceptor.bind(new InetSocketAddress(Constaint.REMOTE_PORT));
} catch (IOException e) {
    logger.error(String.format("bind %s error",Constaint.REMOTE_PORT));
    e.printStackTrace();
}
logger.info(String.format("bind %s success",Constaint.REMOTE_PORT));

客戶端

//建立監聽對象
IoConnector connector = new NioSocketConnector();
TextLineCodecFactory textLineCodecFactory =
        new TextLineCodecFactory(Charset.forName("utf-8"), LineDelimiter.WINDOWS.getValue(),
                LineDelimiter.WINDOWS.getValue());
//添加過濾器
//日誌過濾器 。  sltf日誌設置
connector.getFilterChain().addLast("logger",new LoggingFilter());
//在這個過濾器中提供了編解碼,這裏的編碼是以信息中已\r\n結尾算是一條信息 
connector.getFilterChain().addLast("protocal",new ProtocolCodecFilter(
        new SocketFactory()
));
//設置時間處理的handler , 提供session生命週期的監聽函數,消息接受,發送的函數
connector.setHandler(new ClientMessageHandler());
//設置讀取數據緩存區的大小
connector.getSessionConfig().setReadBufferSize(Constaint.READSIZE);
//設置多久沒有消息就進入空閒狀態
connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE,Constaint.IDLETIME);
ConnectFuture future = connector.connect(new InetSocketAddress(Constaint.REMOTE_IP,Constaint.REMOTE_PORT));
//是異步處理,這裏不會形成阻塞
future.addListener(new IoFutureListener<IoFuture>() {
    @Override
    public void operationComplete(IoFuture ioFuture) {
        logger.info("鏈接準備完成");
        IoSession session = ioFuture.getSession();

    }
});

通訊

  • 其實上面服務端,客戶端兩邊建立好就應經在通訊了,在上面建立的時候咱們發現。建立的時候須要指定消息處理器(IoHandlerAdapter) , 這個在IoService中會排在IoFilter以後執行。在過濾器執行以後咱們就會調用咱們的消息處理器。
private static Logger logger = LogManager.getLogger(ServerMessageHandler.class);
    public void sessionCreated(IoSession session) throws Exception {
        super.sessionCreated(session);
        logger.info("sessionCreated");
    }

    public void sessionOpened(IoSession session) throws Exception {
        super.sessionOpened(session);
        try {
            IoBuffer buffer = IoBuffer.allocate(30);
            buffer.clear();
            buffer.putString("quit\r\n", Charset.forName("utf-8").newEncoder());
            buffer.flip();
            session.write(buffer);
        } catch (Exception e) {
            logger.error(e.toString());
        }
        logger.info("sessionOpened");
    }

    public void sessionClosed(IoSession session) throws Exception {
        super.sessionClosed(session);
        logger.info("sessionClosed");
    }

    public void sessionIdle(IoSession session, IdleStatus idleStatus) throws Exception {
        super.sessionIdle(session,idleStatus);
        try {
            IoBuffer buffer = IoBuffer.allocate(30);
            buffer.clear();
            buffer.putString("quit\r\n", Charset.forName("utf-8").newEncoder());
            buffer.flip();
            session.write(buffer);
        } catch (Exception e) {
            logger.error(e.toString());
        }
        //        logger.info("sessionIdle");
    }

    public void exceptionCaught(IoSession ioSession, Throwable throwable) throws Exception {
        logger.info("exceptionCaught");
        throwable.printStackTrace();
    }

    public void messageReceived(IoSession session, Object message) throws Exception {
        super.messageReceived(session, message);
        String info = message.toString();
        Date date = new Date(System.currentTimeMillis());
        SimpleDateFormat sdf = new  SimpleDateFormat("yy-MM-dd HH:mm:ss");
        String time = sdf.format(date);
        session.write(time);
        System.out.println("接收到的消息:"+info);
    }

    public void messageSent(IoSession session, Object message) throws Exception {
        super.messageSent(session, message);
        logger.info("messageSent");
    }
  • 這裏消息處理器,提供了幾個時刻能夠控制,好比session建立、銷燬的時候執行的地方。消息接收的地方,消息發送成功的地方。這些控制力度能夠根據咱們的須要進行適度的複寫。

自定義工廠編解碼

  • 工廠是提供編解碼的方法。這個工廠是加載在ProtocolCodecFilter這個過濾器中的。咱們也能夠自定義過濾器,在自定義的過濾器中咱們也能夠加載咱們自定義的工廠,實現編解碼。咱們在編解碼的地方,就能夠加入咱們的業務代碼。好比解碼經過約定的協議方式讀取到內容後經過ProtocolDecoderOutput 將消息寫出去就能夠在咱們的IoHandlerAdapter的messageReceived方法中獲取到消息。而後業務書寫。這樣作到代碼的解耦。
public class SocketFactory  implements ProtocolCodecFactory {
    private MessageDecoder decoder;
    private MessageEncoder encoder;

    public SocketFactory() {
        decoder = new MessageDecoder();
        encoder = new MessageEncoder();
    }

    public ProtocolDecoder getDecoder(IoSession session) throws Exception {
        return this.decoder;
    }

    public ProtocolEncoder getEncoder(IoSession session) throws Exception {
        return this.encoder;
    }
}

解碼器

  • 上面的工廠就是提供編解碼的。和咱們生活中同樣工廠提供功能,可是實際並非工廠作的,工廠可能只代理功能,僅僅是個加工廠而已。mina通訊也是如此。真正編解碼的並非工廠執行的,本節將揭露解碼者CumulativeProtocolDecoder
  • 解碼器寫好以後只須要在上面自定義工廠中建立就行了。至於自定義編碼器只須要繼承CumulativeProtocolDecoder這個類就行了。並且複寫doDecode方法就行了。這個方法的返回值是boolean類型。返回值不一樣表明意義不一。這裏須要重點理清楚異步

    • true: 返回true表示你已經從CumulativeProtocolDecoder的消息中消費了信息,在編碼器中返回true以前也應該調用ProtocolDecoderOutput 的wirte將消息發佈到IoHandAdaptor中進行業務處理。可是這裏會出現其餘狀況,應爲咱們服務端客戶端是長鏈接因此在咱們消息中消息是不斷髮過來的,咱們緩存中的消息多是完整一條消息,也可能不夠一整條消息,也多是一整條多了一點,
      一、若是不是一條完整(半包)的那麼咱們返回falsed等待客戶端繼續發送
      二、若是正好是一整條,那麼咱們接受到以後返回true的時候咱們緩存中就沒有數據了,在CumulativeProtocolDecoder會中止對解碼中doDecode的調用了,這種狀況不會出現意外
      三、數據比一條完整信息(粘包)多,那麼咱們處理到一條信息後也須要返回true,可是CumulativeProtocolDecoder會將剩餘的緩存繼續拼裝,剩餘消息就至關於內部進行了第二次解碼。若是不過那麼至關於上面第一種狀況

      記住三種狀況 半包 、 正常 、 粘包tcp

    • false: 返回false就是緩存中的數據不夠咱們一整條消息,須要繼續等待客戶端的消息。CumulativeProtocolDecoder中的緩存機制會不斷的將客戶端發過來的數據拼接到緩存中
public class MessageDecoder extends CumulativeProtocolDecoder {
    /**
     * 此方法return true : 表示父類中CumulativeProtocolDecoder會不斷的調用此方法進行消息的消費
     *       return false: 表示消息已經消費徹底了,緩存中就算有數據也不會再消費了。等待再次客戶端
     *       發送消息時會觸發消息發送接口,此時會將新舊消息拼接再一塊兒進行處理
     * @param ioSession
     * @param ioBuffer
     * @param protocolDecoderOutput
     * @return
     * @throws Exception
     */
    @Override
    protected boolean doDecode(IoSession ioSession, IoBuffer ioBuffer, ProtocolDecoderOutput protocolDecoderOutput) throws Exception {
        IoBuffer buffer = IoBuffer.allocate(10);
        while (ioBuffer.hasRemaining()) {
            if (ioBuffer.remaining()<3) {
                //繼續接受
                return false;
            }
            //獲取三個字節
            int oldLimit = ioBuffer.limit();
            ioBuffer.limit(ioBuffer.position()+3);
            String text = ioBuffer.getString(Charset.forName("UTF-8").newDecoder());
            protocolDecoderOutput.write(text);
            ioBuffer.limit(oldLimit);
            if (ioBuffer.hasRemaining()) {
                return true;
            }
        }
        return false;
    }
}

編碼器

  • 編碼器相對解碼器簡單不少,編碼器就是加入咱們的協議,正常狀況就是咱們業務代碼中消息是一個Java實體,咱們須要作的是將Java實體按照協議轉換成IoBuffer進行發送。可是咱們mina中發送消息是經過IoSession中write方法發送的。咱們查看源碼發如今IoSession.write(Object o),發送的若是是IoBuffer那麼就不通過咱們的編碼器,不然會通過咱們編碼器進行編碼最終將轉換後的IoBuffer發送出去。

public class MessageEncoder extends ProtocolEncoderAdapter {
    @Override
    public void encode(IoSession ioSession, Object o, ProtocolEncoderOutput protocolEncoderOutput) throws Exception {
        //TODO  根據協議編碼
        //組裝好以後  ioSession.write(IoBuffer)寫出
        System.out.println(o);
    }
}

總結

加入戰隊ide

<span id="addMe">加入戰隊</span>

微信公衆號

微信公衆號

相關文章
相關標籤/搜索