java NIO 處理粘包 斷包問題

NIO socket是非阻塞的通信模式,與IO阻塞式的通信不一樣點在於NIO的數據要經過channel放到一個緩存池ByteBuffer中,而後再從這個緩存池中讀出數據,而IO的模式是直接從inputstream中read。因此對於NIO,因爲存在緩存池的大小限制和網速的不均勻會形成一次讀的操做放入緩存池中的數據不完整,便造成了斷包問題。同理,若是一次性讀入兩個及兩個以上的數據,則沒法分辨兩個數據包的界限問題,也就形成了粘包。對於NIO的SocketChannel每次觸發OP_READ事件時,發送端不必定僅僅寫入了一次,同理,發送端若是一次發送數據包過大,那麼發送端的一次寫入也可能會被拆分紅兩次OP_READ事件,因此OP_READ事件和發送端的OP_WRITE事件並非一一對應的。緩存

第一個問題:對於粘包問題的解決socket

粘包問題主要是因爲數據包界限不清,因此這個問題比較好解決,最好的解決辦法就是在發送數據包前事先發送一個int型數據,該數據表明將要發送的數據包的大小,這樣接收端能夠每次觸發OP_READ的時候先接受一個int大小的數據段到緩存池中,而後,緊接着讀出後續完整的大小的包,這樣就會處理掉粘包問題。由於channel.read()方法不能給讀取數據的大小的參數,因此沒法手動指定讀取數據段的大小。但每次調用channel.read()返回的是他實際讀取的大小,這樣,思路就有了:首先調整緩存池的大小固定爲要讀出數據段的大小,這樣保證不會過量讀出。因爲OP_READ和OP_WRITE不是一一對應的,因此一次OP_READ能夠While循環調用channel.read()不停讀取channel中的數據到緩存池,並捕獲其返回值,當返回值累計達到要讀取數據段大小時break掉循環,這樣保證數據讀取充足。因此這樣就完美解決粘包問題。ide

第二個問題:對於斷包問題的解決函數

斷包問題主要是因爲數據包過量讀入時,緩存池結尾處只有半個數據包,channel裏還有半個數據包,這樣形成了這個包沒法處理的問題。這個問題的解決思路是保證每次不過量讀入,這樣也就不存在斷包了。仍是由於channel.read()的讀取不可控的緣由,因此沒法從read函數中控制讀取大小,仍是從緩存池入手。方法是調整緩存池的大小爲要讀數據的大小,這樣就不會斷包。spa

下附某次開發過程的源代碼參考:線程

發送端:指針

private void sendIntoChannel() {

        Runnable run = new Runnable() {
            @Override
            public void run() {

                try {
                    ByteArrayOutputStream bOut;
                    ObjectOutputStream out;
                    CBaseDataBean cbdb;
                    ByteBuffer bb = ByteBuffer.allocate(MemCache);
                    while (true) {
                        cbdb = CloudServer.cdsq.read();//Blocking Method

                        //處理自我命令:斷開鏈接 退出線程
                        if (cbdb.getDataType() == CMsgTypeBean.MSG_TYPE_CUTDOWN) {
                            break;
                        }

                        bOut = new ByteArrayOutputStream();
                        out = new ObjectOutputStream(bOut);
                        out.writeObject(cbdb);
                        out.flush();
                        
                        //構造發送數據:整型數據頭+有效數據段
                        byte[] arr = bOut.toByteArray();
                        final int ObjLength = arr.length;   //獲取有效數據段長度                      
                        bb.clear();
                        bb.limit(IntLength + ObjLength);    //調整緩存池大小
                        bb.putInt(ObjLength);
                        bb.put(arr);
                        bb.position(0);                     //調整重置讀寫指針

                        SocketChannel channel = (SocketChannel) key.channel();
                        channel.write(bb);

                        out.close();
                        bOut.close();
                    }
                } catch (IOException ex) {
                }
            }
        };
        CloudServer.cstp.putNewThread(run);
    }
/**
     * 開闢線程分發消息
     */
    private void Dispatcher() {
        Runnable run = new Runnable() {

            @Override
            public void run() {
                try {
                    while (true) {
                        selector.selectNow();
                        Thread.sleep(100);
                        Iterator<SelectionKey> itor = selector.selectedKeys().iterator();
                        while (itor.hasNext()) {
                            SelectionKey selKey = itor.next();
                            itor.remove();

                            if (selKey.isValid() && selKey.isAcceptable()) {
                                finshAccept(selKey);
                            }

                            if (selKey.isValid() && selKey.isReadable()) {
                                //消息分發
                                Processer();
                            }
                        }
                    }
                } catch (IOException | InterruptedException ex) {
                    System.out.println(ex.toString());
                }
            }
        };
        CloudServer.cstp.putNewThread(run);
    }

    /**
     * 消息處理器
     */
    private void Processer() {
        ByteBuffer bbInt = ByteBuffer.allocate(IntLength);    //讀取INT頭信息的緩存池
        ByteBuffer bbObj = ByteBuffer.allocate(MemCache);     //讀取OBJ有效數據的緩存池
        SocketChannel channel = (SocketChannel)key.channel();
        ByteArrayInputStream bIn;
        ObjectInputStream in;
        CBaseDataBean cbdb;
        //有效數據長度
        int ObjLength;
        //從NIO信道中讀出的數據長度
        int readObj;
        try {
            //讀出INT數據頭
            while (channel.read(bbInt) == IntLength) {
                //獲取INT頭中標示的有效數據長度信息並清空INT緩存池
                ObjLength = bbInt.getInt(0);
                bbInt.clear();
                
                //清空有效數據緩存池設置有效緩存池的大小
                bbObj.clear();
                bbObj.limit(ObjLength);
                
                //循環讀滿緩存池以保證數據完整性
                readObj = channel.read(bbObj);
                while (readObj != ObjLength) {
                    readObj += channel.read(bbObj);
                }

                bIn = new ByteArrayInputStream(bbObj.array());
                in = new ObjectInputStream(bIn);
                cbdb = (CBaseDataBean) in.readObject();
                switch (cbdb.getDataType()) {
                    case CMsgTypeBean.MSG_TYPE_COMMAND:
                        rcv_msg_command(cbdb);
                        break;
                    case CMsgTypeBean.MSG_TYPE_CUTDOWN:
                        rcv_msg_cutdown();
                        break;
                    case CMsgTypeBean.MSG_TYPE_VERIFYFILE:
                        rcv_msg_verifyfile(cbdb);
                        break;
                    case CMsgTypeBean.MSG_TYPE_SENDFILE:
                        rcv_msg_sendfile(cbdb);
                        break;
                    case CMsgTypeBean.MSG_TYPE_DISPATCHTASK:
                        rcv_msg_dispchtask(cbdb);
                        break;
                }
                in.close();
            
            }
        } catch (ClassNotFoundException | IOException ex) {
        }
    }
相關文章
相關標籤/搜索