mina read方法出現BufferUnderflowException異常的解決辦法

現象: 先連續發幾十個很小很小的包(<10 byte) 再忽然發一個大小64byte的包 這時你會發現mina就會出現如下錯誤java

java.nio.BufferUnderflowException
 at java.nio.HeapByteBuffer.get(Unknown Source)
 at org.apache.mina.core.buffer.AbstractIoBuffer.get(AbstractIoBuffer.java:419)
 at org.apache.mina.core.buffer.AbstractIoBuffer.get(AbstractIoBuffer.java:827)
 at com.labox.common.net.ProtocolHandler.messageReceived(ProtocolHandler.java:81)
 at org.apache.mina.core.filterchain.DefaultIoFilterChain$TailFilter.messageReceived(DefaultIoFilterChain.java:752)
 at org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:414)
 at org.apache.mina.core.filterchain.DefaultIoFilterChain.access$5(DefaultIoFilterChain.java:411)
 at org.apache.mina.core.filterchain.DefaultIoFilterChain$EntryImpl$1.messageReceived(DefaultIoFilterChain.java:832)
 at org.apache.mina.core.filterchain.DefaultIoFilterChain$HeadFilter.messageReceived(DefaultIoFilterChain.java:616)
 at org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:414)
 at org.apache.mina.core.filterchain.DefaultIoFilterChain.fireMessageReceived(DefaultIoFilterChain.java:408)
 at org.apache.mina.core.polling.AbstractPollingIoProcessor.read(AbstractPollingIoProcessor.java:582)
 at org.apache.mina.core.polling.AbstractPollingIoProcessor.process(AbstractPollingIoProcessor.java:542)
 at org.apache.mina.core.polling.AbstractPollingIoProcessor.process(AbstractPollingIoProcessor.java:534)
 at org.apache.mina.core.polling.AbstractPollingIoProcessor.access$7(AbstractPollingIoProcessor.java:532)
 at org.apache.mina.core.polling.AbstractPollingIoProcessor$Worker.run(AbstractPollingIoProcessor.java:861)

通過對mina的分析,這是由對包長度不對作成的(即,咱們發的包長是大於64byte的,但他的byteBuffer大小隻有64byte,當咱們嘗試讀取第65個byte就會出現這個錯誤)spring

mina怎會出現這種錯誤的呢??是否是有什麼配置能夠調整apache

找到mina讀取byte的方法AbstractPollingIoProcessor類的read(T session)api

此方法源代碼以下網絡

  

private void read(T session) {
        IoSessionConfig config = session.getConfig();

        System.out.println("cap buffer size"+config.getReadBufferSize());//這句我本身加的
        IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());

        final boolean hasFragmentation =
            session.getTransportMetadata().hasFragmentation();

        try {
            int readBytes = 0;
            int ret;

            try {
                if (hasFragmentation) {
                    while ((ret = read(session, buf)) > 0) {
                        readBytes += ret;
                        if (!buf.hasRemaining()) {
                            break;
                        }
                    }
                } else {
                    ret = read(session, buf);
                    if (ret > 0) {
                        readBytes = ret;
                    }
                }
            } finally {
                buf.flip();
            }

            if (readBytes > 0) {
                session.getFilterChain().fireMessageReceived(buf);
                buf = null;

                if (hasFragmentation) {
                    if (readBytes << 1 < config.getReadBufferSize()) {
                        session.decreaseReadBufferSize();
                    } else if (readBytes == config.getReadBufferSize()) {
                        session.increaseReadBufferSize();
                    }
                }
            }
            if (ret < 0) {
                scheduleRemove(session);
            }
        } catch (Throwable e) {
            if (e instanceof IOException) {
                scheduleRemove(session);
            }
            session.getFilterChain().fireExceptionCaught(e);
        }
    }

 


 

通過對這段代碼的分析終於發現問題所在了session

你們注意if (readBytes > 0) 這個塊下的代碼socket

你不難發現ide

  

if (hasFragmentation) {
                    if (readBytes << 1 < config.getReadBufferSize()) {
                        session.decreaseReadBufferSize();
                    } else if (readBytes == config.getReadBufferSize()) {
                        session.increaseReadBufferSize();
                    }
                }

 

意思是if hasFragmentation==trueui

if 當前配置初始化ByteBuffer大小 > 當前讀取包的平方 爲 true 就把配置中初始化byteBuffer大小減半spa

else if 當前已讀取字節==配置包初始化大小  爲true時 把配置中初始化byteBuffer大小加倍 

接下來結合我出錯的現象看看

當我接連發幾十個小於10byte的包時,這時配置中的初始化ByteBuffer大小就爲取小,默認最小爲64byte

當我再發一個大於64byte的包,但整個ByteBuffer只有64byte,那就出錯了。

接下來咱們來修正這個問題

方法一:不要改變默認初始化byteBuffer大小,要修改mina的源碼

找到org.apache.mina.transport.socket.nio.NioSocketSession 這個類的METADATA變量

把 new DefaultTransportMetadata()的第四個參數改爲false就ok了

方法二:本身寫read()方法中獲得byteBuffer實例的方法

從read()方法看出,他獲得byteBuffer實例是每次去請求的,若是咱們在這裏作一個cache,每次從cache中獲得,天然byteBuffer的大小也是固定的,只要按本身業務最大包大小去開就能夠了。

每一個線程用一個本身的ByteBuffer實例,這樣就不會有同步問題.

找到org.apache.mina.core.polling.AbstractPollingIoProcessor類中的read(T session)方法改爲

  

static ThreadLocal readCache=new ThreadLocal();//這個是放ByteBuffer實例的cache

private void read(T session) {

IoBuffer buf=readCache.get();
if(buf==null){
 buf=IoBuffer.allocate(512);//512爲包默認大小
 readCache.set(buf);
}else{
 buf.clear();
}

try {
    int readBytes = 0;
    int ret;

    try {
            ret = read(session, buf);
            if (ret > 0) {
                readBytes = ret;
            }
    } finally {
        buf.flip();
    }

    if (readBytes > 0) {
        session.getFilterChain().fireMessageReceived(buf);
    }
    if (ret < 0) {
        scheduleRemove(session);
    }
} catch (Throwable e) {
    if (e instanceof IOException) {
        scheduleRemove(session);
    }
    session.getFilterChain().fireExceptionCaught(e);
}

}

 

 

另外發現一篇說mina處理斷包和粘包處理的

一.  解碼方法
mina中有個內置類CumulativeProtocolDecoder是專門用來處理斷包和粘包的。該類的api文檔中有個實現的例子。
類org.apache.mina.filter.codec.CumulativeProtocolDecoder
public abstract class CumulativeProtocolDecoder extends ProtocolDecoderAdapter {
    private final AttributeKey BUFFER = new AttributeKey(getClass(), "buffer");

    public void decode(IoSession session, IoBuffer in,
            ProtocolDecoderOutput out) throws Exception {
        if (!session.getTransportMetadata().hasFragmentation()) {       //用來判斷是否還有分幀(斷包)
            while (in.hasRemaining()) {
                if (!doDecode(session, in, out)) {
                    break;
                }
            }
            return;
        }
       
      ////處理斷包,省略
      ............................................

    }

    //須要實現的方法
    protected abstract boolean doDecode(IoSession session, IoBuffer in,
            ProtocolDecoderOutput out) throws Exception;
}

CumulativeProtocolDecoder是一個抽象類,必須繼承並實現其doDecode方法,用戶自定義協議的拆分就應該寫在doDecode方法中,下面的類MessageDecoder是一個實現的例子。MessageDecoder解碼網絡數據到一種有兩字節長度頭的自定義消息協議格式。
/**
 * 斷包和粘包處理,處理後的消息爲一個或多個完整的數據消息
 * @author blc
 */
public class MessageDecoder extends CumulativeProtocolDecoder {
    /*
     * (non-Javadoc)
     * 
     * @see
     * org.apache.mina.filter.codec.CumulativeProtocolDecoder#doDecode(org.apache
     * .mina.core.session.IoSession, org.apache.mina.core.buffer.IoBuffer,
     * org.apache.mina.filter.codec.ProtocolDecoderOutput)
     */
    @Override
    protected boolean doDecode(IoSession session, IoBuffer in,
            ProtocolDecoderOutput out) throws Exception {
        
        in.order(ServerConfig.ByteEndian);    //字節序, ServerConfig.ByteEndian = ByteOrder.LITTLE_ENDIAN
        
        //消息buf
        IoBuffer buf = IoBuffer.allocate(ServerConfig.MessageMaxByte);   //ServerConfig.MessageMaxByte 最大消息字節數
        buf.order(ServerConfig.ByteEndian);
        
        //考慮如下幾種狀況:
        //    1. 一個ip包中只包含一個完整消息
        //    2. 一個ip包中包含一個完整消息和另外一個消息的一部分
        //    3. 一個ip包中包含一個消息的一部分
        //    4. 一個ip包中包含兩個完整的數據消息或更多(循環處理在父類的decode中)

        if (in.remaining() > 1) {
            int length = in.getShort(in.position());
if (length < 4) {
                throw new ServerException("Error net message. (Message Length="+length+")");
            }
            if (length > ServerConfig.MessageMaxByte) {
                throw new ServerException("Error net message. Message Length("+length+") > MessageMaxByte("+ServerConfig.MessageMaxByte+")");
            }
            if (length > in.remaining()) return false;
            //複製一個完整消息
            byte[] bytes = new byte[length];
            in.get(bytes);
            buf.put(bytes);
            
            buf.flip();
            out.write(buf);
            return true;
        } else {
            return false;
        }
    }
}

二.  使用
將上面的解碼器做爲一個過濾器配置到mina中便可,在spring中的配置方法以下:
    <!-- 協議過濾器,包括解碼和譯碼 -->
    <bean id="protocolCodecFilter" class="org.apache.mina.filter.codec.ProtocolCodecFilter">
        <constructor-arg>
            <bean id="factory" class="server.ClientConnServer.MessageCodecFactory"></bean>
        </constructor-arg>
    </bean>
    <!-- 將協議過濾器配置到mina的過濾鏈中 -->
    <bean id="filterChainBuilder" class="org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder">
        <property name="filters">
            <map>
                <entry key="protocolCodecFilter" value-ref="protocolCodecFilter" />
            </map>
        </property>
    </bean>
    <!-- 處理器 -->
    <bean id="clientConnHandler" class="server.ClientConnServer.ClientConnHandler" />
    <!-- socket接收器,接收客戶端鏈接 -->
    <bean id="ioAcceptor" class="org.apache.mina.transport.socket.nio.NioSocketAcceptor" destroy-method="unbind">
        <!--        <property name="defaultLocalAddress" value=":161" />-->
        <property name="handler" ref="clientConnHandler" />
        <property name="reuseAddress" value="true" />
        <property name="filterChainBuilder" ref="filterChainBuilder" />
    </bean>


要配置協議過濾器,必須使用一個ProtocolCodecFactory ,下面是簡單實現
public class MessageCodecFactory implements ProtocolCodecFactory {
    private final MessageEncoder encoder;
    private final MessageDecoder decoder;
    
    public MessageCodecFactory() {
        encoder = new MessageEncoder();
        decoder = new MessageDecoder();
    }

    /* (non-Javadoc)
     * @see org.apache.mina.filter.codec.ProtocolCodecFactory#getDecoder(org.apache.mina.core.session.IoSession)
     */
    @Override
    public ProtocolDecoder getDecoder(IoSession session) throws Exception {
        return decoder;
    }

    /* (non-Javadoc)
     * @see org.apache.mina.filter.codec.ProtocolCodecFactory#getEncoder(org.apache.mina.core.session.IoSession)
     */
    @Override
    public ProtocolEncoder getEncoder(IoSession session) throws Exception {
        return encoder;
    }
}

/**
 * 譯碼器,不作任何事情
 */
public class MessageEncoder extends ProtocolEncoderAdapter {

    /* (non-Javadoc)
     * @see org.apache.mina.filter.codec.ProtocolEncoder#encode(org.apache.mina.core.session.IoSession, java.lang.Object, org.apache.mina.filter.codec.ProtocolEncoderOutput)
     */
    @Override
    public void encode(IoSession session, Object message,
            ProtocolEncoderOutput out) throws Exception {
        //Do nothing
    }

}

轉自http://blianchen.blog.163.com/blog/static/1310562992010101891522100/