基於NIO的消息路由的實現(四) 服務端通信主線程(2)斷包和粘包的處理

原本我打算單獨開一章,專門說明粘包和斷包,可是以爲這個事兒我在作的時候挺頭疼的,可是對於別人或許不那麼重要,因而就在這裏寫吧。java

那麼何謂粘包、何謂斷包呢?程序員

  • 粘包:咱們知道客戶端在寫入報文給服務端的時候,首先要將須要寫入的內容寫入Buffer,以ByteBuffer爲例,若是你Buffer定義的足夠大,而且你發送的報文足夠快,此時就會產生粘包現象,舉例來講 你發送一個 報文「 M|A」,而後你有發送了一個「M|B」,若是產生粘包,服務端從緩衝區裏面讀出的就是「M|AM|B」,這樣的字符串;也就是說,客戶端的第一條報文和第二條報文粘在了一塊兒。服務器

  • 斷包:斷包每每是在粘包以後產生的,按照剛纔的例子,假設你的緩衝區大小設置爲4(固然沒人會設置這麼小的緩衝區,舉例子,湊合看吧),若是你發送的報文足夠快,就會產生髮送給服務器的報文變爲這樣:第一個包「M|AM」,第二個包「|B」app

在大多數的NIO例子中,均不包括此過程的處理,並且不少的例子中也不會浮現這個狀況,甚至程序上線,若是系統壓力不大,這樣的狀況都出現的不多(尤爲是斷包)。值得慶幸的是,這兩種狀況,我均重現了,我在客戶端不作任何停頓的狀況下,for循環發送10萬條報文給服務端,當個人緩衝區服務端緩衝區設置爲4096,客戶端緩衝區設置爲1024的時候,出現的頻率仍是蠻高的,能夠加大緩衝區來減小斷包的狀況發生,可是不能避免,粘包則是必然發生的。socket

好、我回答在第7小點中提到的問題,爲何要在通信協議的外層在加上四位?這四位就是用來標記我報文指令的長度的,一旦我知道了這個長度,我就能夠根據長度對斷包和粘包進行相關的處理。具體代碼以下:debug

/**
     * 處理斷包和粘包現象
     *
     * @param socketChannel
     * @param byteBuffer
     */
    private void handlePacket(SocketChannel socketChannel, ByteBuffer byteBuffer) {
        //標記讀取緩衝區起始位置
        int location = 0;
        //若是緩衝區從0到limit的數量大於包體大小標記數字
        while (byteBuffer.remaining() > PACKET_HEAD_LENGTH) {
            //包體大小標記
            String strBsize;
            //若是endPacket的字節length大於0,則證實:斷包的前一截爲包含包頭和包體的;
            if (endPacketStr.getBytes().length > 0) {

                String strPacket = endPacketStr.substring(PACKET_HEAD_LENGTH) + new String(byteBuffer.array(), 0, remainBodySize);
                byteBuffer.position(remainBodySize);
                location = remainBodySize;
//                                    if(logger.isDebugEnabled()) {
                logger.info("【斷包處理】(包含包體)合併後的報文:" + strPacket + ",緩衝區的position:" + location);
//                                    }
                offerPacket(socketChannel, strPacket);
                //處理完畢,清理斷包的前一截,以便於下次使用;
                endPacketStr = "";
                //清理後一截報文的字節數標記;
                remainBodySize = 0;
                continue;
                //若是endBufferStr的字節length大於0,則證實:斷包的前一截僅包含包頭或包頭的一部分,不包含包體;
            } else if (endBufferStr.getBytes().length > 0) {

                strBsize = (new StringBuffer(endBufferStr).append(new
                        String(byteBuffer.array(), location, PACKET_HEAD_LENGTH - endBufferStr.getBytes().length))).toString();

                //移動緩衝區position
                byteBuffer.position(PACKET_HEAD_LENGTH - endBufferStr.getBytes().length);
                location = byteBuffer.position();
                //獲得包體大小
                int byteBufferSize = Integer.parseInt(strBsize.trim());
                //進行報文合併,把保存的僅包含包頭或包頭一部分的前一截與後一截合併
                String strPacket = endBufferStr + (new String(byteBuffer.array(), PACKET_HEAD_LENGTH - endBufferStr.getBytes().length, byteBufferSize));
                byteBuffer.position(location + byteBufferSize);//將緩衝區的位置移動到下一個包體大小標記位置
                location = byteBuffer.position();
                logger.info("【斷包處理】(不包含包體)合併後的報文:" + strPacket + ",緩衝區的position:" + location);
                offerPacket(socketChannel, strPacket);
                endBufferStr = "";
                continue;
                //進入正常處理(規範的報文處理,不考慮斷包)
            } else {
                strBsize = new String(byteBuffer.array(), location, PACKET_HEAD_LENGTH);
                //移動緩衝區position
                byteBuffer.position(location + PACKET_HEAD_LENGTH);
            }
            if (logger.isDebugEnabled()) {
                logger.debug("收到客戶端包體大小:" + strBsize + ",查看position變化:" + byteBuffer.position());
            }
            //獲得包體大小
            int byteBufferSize = Integer.parseInt(strBsize.trim());
            //若是從緩衝區當前位置到limit大於包體大小,證實粘包了,進行包體處理。等於則爲正常包體,不存在粘包現象。
            if (byteBuffer.remaining() >= byteBufferSize) {

                String strPacket = endBufferStr + (new String(byteBuffer.array(), PACKET_HEAD_LENGTH + location, byteBufferSize));
                byteBuffer.position(location + PACKET_HEAD_LENGTH + byteBufferSize);//將緩衝區的位置移動到下一個包體大小標記位置
                if (logger.isDebugEnabled()) {
                    logger.debug("收到客戶端包體內容:" + strPacket + ",2查看position變化:" + byteBuffer.position());
                }
                //將字符串報文封裝爲類
                offerPacket(socketChannel, strPacket);
                location = byteBuffer.position();//設定讀取緩衝區起始位置
                //若是緩衝區當前位置到limit小於包體,證實斷包了,進行斷包處理
            } else {

                endPacketStr = new String(byteBuffer.array(), location, byteBuffer.limit() - location);
                remainBodySize = Integer.parseInt(endPacketStr.substring(0, PACKET_HEAD_LENGTH).trim()) - endPacketStr.getBytes().length + PACKET_HEAD_LENGTH;
                //已經找到斷包前半截,因此把整個buffer的position調整至最後,再也不處理。等待新的key進入
                byteBuffer.position(byteBuffer.limit());
                logger.info("處理斷包僅包含完整包頭的尾部報文,緩衝區位置:" + location + ",緩衝區limit:" + byteBuffer.limit() + ",包含徹底包頭的剩餘字符:" + endPacketStr + ",bodySize:" + remainBodySize);

            }
        }
        //處理僅包含包頭前一截的報文;
        if (byteBuffer.remaining() > 0) {

            //緩衝區中剩餘的僅包含包頭前一截的報文
            endBufferStr = new String(byteBuffer.array(), location, byteBuffer.limit() - location);

            logger.info("處理斷包僅包含包頭前一截的尾部報文,緩衝區位置:" + location + ",緩衝區limit:" + byteBuffer.limit() + ",不包含徹底包頭的剩餘字符:" + endBufferStr);
            //移動緩衝區指針到最後,表明已經保存了前一截報文,無需再進行處理;
            byteBuffer.position(byteBuffer.limit());
        }
        //我也不知道這是否有用,能不能釋放內存資源
        byteBuffer.clear();
    }

這塊兒極可能有不合理的地方,由於對於一個接近40歲的程序員來講,邏輯在頭腦中已經比較混亂了。我知道要對以下幾種狀況進行處理:指針

一、粘包,粘包比較好處理,主要是根據包頭的前四位,肯定包體的大小,而後移動buffer的位置(position),把整個包讀出來放入隊列就好了;code

二、斷包:斷包分爲兩種狀況,第一種從包頭開始就斷了,這是你沒法得到包體大小,須要把前面的一截保存起來,就必須等下一個報文來了以後,把他們連在一塊兒,而後再作處理;第二種,已經讀到完整的包頭,仍然須要把前面一截保存起來,肯定後面還有多少,而後再處理;我利用了三個類成員:隊列

//斷包處理,前一截包含完整包頭;
private String endPacketStr = "";
//斷包處理,前一截不包含完整包頭;
private String endBufferStr = "";
//斷包處理,前一截包含完整包頭時,包體的大小標記;
private int remainBodySize = 0;

注意這些類的成員須要在使用後,清空,以便於下次使用,不然就亂套了。這塊兒代碼,我寫完就沒再看過,挺費神。若是有人能提供更好地辦法,不勝感激。內存

相關文章
相關標籤/搜索