現象: 先連續發幾十個很小很小的包(<10 byte) 再忽然發一個大小64byte的包 這時你會發現mina就會出現如下錯誤java
java.nio.BufferUnderflowException at java.nio.HeapByteBuffer.get(Unknown Source) at org.apache.mina.core.buffer.AbstractIoBuffer.get(AbstractIoBuffer.java:419) at org.apache.mina.core.buffer.AbstractIoBuffer.get(AbstractIoBuffer.java:827) at com.labox.common.net.ProtocolHandler.messageReceived(ProtocolHandler.java:81) at org.apache.mina.core.filterchain.DefaultIoFilterChain$TailFilter.messageReceived(DefaultIoFilterChain.java:752) at org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:414) at org.apache.mina.core.filterchain.DefaultIoFilterChain.access$5(DefaultIoFilterChain.java:411) at org.apache.mina.core.filterchain.DefaultIoFilterChain$EntryImpl$1.messageReceived(DefaultIoFilterChain.java:832) at org.apache.mina.core.filterchain.DefaultIoFilterChain$HeadFilter.messageReceived(DefaultIoFilterChain.java:616) at org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:414) at org.apache.mina.core.filterchain.DefaultIoFilterChain.fireMessageReceived(DefaultIoFilterChain.java:408) at org.apache.mina.core.polling.AbstractPollingIoProcessor.read(AbstractPollingIoProcessor.java:582) at org.apache.mina.core.polling.AbstractPollingIoProcessor.process(AbstractPollingIoProcessor.java:542) at org.apache.mina.core.polling.AbstractPollingIoProcessor.process(AbstractPollingIoProcessor.java:534) at org.apache.mina.core.polling.AbstractPollingIoProcessor.access$7(AbstractPollingIoProcessor.java:532) at org.apache.mina.core.polling.AbstractPollingIoProcessor$Worker.run(AbstractPollingIoProcessor.java:861)
通過對mina的分析,這是由對包長度不對作成的(即,咱們發的包長是大於64byte的,但他的byteBuffer大小隻有64byte,當咱們嘗試讀取第65個byte就會出現這個錯誤)spring
mina怎會出現這種錯誤的呢??是否是有什麼配置能夠調整apache
找到mina讀取byte的方法AbstractPollingIoProcessor類的read(T session)api
此方法源代碼以下網絡
private void read(T session) { IoSessionConfig config = session.getConfig(); System.out.println("cap buffer size"+config.getReadBufferSize());//這句我本身加的 IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize()); final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation(); try { int readBytes = 0; int ret; try { if (hasFragmentation) { while ((ret = read(session, buf)) > 0) { readBytes += ret; if (!buf.hasRemaining()) { break; } } } else { ret = read(session, buf); if (ret > 0) { readBytes = ret; } } } finally { buf.flip(); } if (readBytes > 0) { session.getFilterChain().fireMessageReceived(buf); buf = null; if (hasFragmentation) { if (readBytes << 1 < config.getReadBufferSize()) { session.decreaseReadBufferSize(); } else if (readBytes == config.getReadBufferSize()) { session.increaseReadBufferSize(); } } } if (ret < 0) { scheduleRemove(session); } } catch (Throwable e) { if (e instanceof IOException) { scheduleRemove(session); } session.getFilterChain().fireExceptionCaught(e); } }
通過對這段代碼的分析終於發現問題所在了session
你們注意if (readBytes > 0) 這個塊下的代碼socket
你不難發現ide
if (hasFragmentation) { if (readBytes << 1 < config.getReadBufferSize()) { session.decreaseReadBufferSize(); } else if (readBytes == config.getReadBufferSize()) { session.increaseReadBufferSize(); } }
意思是if hasFragmentation==trueui
if 當前配置初始化ByteBuffer大小 > 當前讀取包的平方 爲 true 就把配置中初始化byteBuffer大小減半spa
else if 當前已讀取字節==配置包初始化大小 爲true時 把配置中初始化byteBuffer大小加倍
接下來結合我出錯的現象看看
當我接連發幾十個小於10byte的包時,這時配置中的初始化ByteBuffer大小就爲取小,默認最小爲64byte
當我再發一個大於64byte的包,但整個ByteBuffer只有64byte,那就出錯了。
接下來咱們來修正這個問題
方法一:不要改變默認初始化byteBuffer大小,要修改mina的源碼
找到org.apache.mina.transport.socket.nio.NioSocketSession 這個類的METADATA變量
把 new DefaultTransportMetadata()的第四個參數改爲false就ok了
方法二:本身寫read()方法中獲得byteBuffer實例的方法
從read()方法看出,他獲得byteBuffer實例是每次去請求的,若是咱們在這裏作一個cache,每次從cache中獲得,天然byteBuffer的大小也是固定的,只要按本身業務最大包大小去開就能夠了。
每一個線程用一個本身的ByteBuffer實例,這樣就不會有同步問題.
找到org.apache.mina.core.polling.AbstractPollingIoProcessor類中的read(T session)方法改爲
static ThreadLocal readCache=new ThreadLocal();//這個是放ByteBuffer實例的cache private void read(T session) { IoBuffer buf=readCache.get(); if(buf==null){ buf=IoBuffer.allocate(512);//512爲包默認大小 readCache.set(buf); }else{ buf.clear(); } try { int readBytes = 0; int ret; try { ret = read(session, buf); if (ret > 0) { readBytes = ret; } } finally { buf.flip(); } if (readBytes > 0) { session.getFilterChain().fireMessageReceived(buf); } if (ret < 0) { scheduleRemove(session); } } catch (Throwable e) { if (e instanceof IOException) { scheduleRemove(session); } session.getFilterChain().fireExceptionCaught(e); } }
另外發現一篇說mina處理斷包和粘包處理的
一. 解碼方法mina中有個內置類CumulativeProtocolDecoder是專門用來處理斷包和粘包的。該類的api文檔中有個實現的例子。
類org.apache.mina.filter.codec.CumulativeProtocolDecoder
public abstract class CumulativeProtocolDecoder extends ProtocolDecoderAdapter {
private final AttributeKey BUFFER = new AttributeKey(getClass(), "buffer");
public void decode(IoSession session, IoBuffer in,
ProtocolDecoderOutput out) throws Exception {
if (!session.getTransportMetadata().hasFragmentation()) { //用來判斷是否還有分幀(斷包)
while (in.hasRemaining()) {
if (!doDecode(session, in, out)) {
break;
}
}
return;
}
////處理斷包,省略
............................................
}
//須要實現的方法
protected abstract boolean doDecode(IoSession session, IoBuffer in,
ProtocolDecoderOutput out) throws Exception;
}
CumulativeProtocolDecoder是一個抽象類,必須繼承並實現其doDecode方法,用戶自定義協議的拆分就應該寫在doDecode方法中,下面的類MessageDecoder是一個實現的例子。MessageDecoder解碼網絡數據到一種有兩字節長度頭的自定義消息協議格式。
/**
* 斷包和粘包處理,處理後的消息爲一個或多個完整的數據消息
* @author blc
*/
public class MessageDecoder extends CumulativeProtocolDecoder {
/*
* (non-Javadoc)
*
* @see
* org.apache.mina.filter.codec.CumulativeProtocolDecoder#doDecode(org.apache
* .mina.core.session.IoSession, org.apache.mina.core.buffer.IoBuffer,
* org.apache.mina.filter.codec.ProtocolDecoderOutput)
*/
@Override
protected boolean doDecode(IoSession session, IoBuffer in,
ProtocolDecoderOutput out) throws Exception {
in.order(ServerConfig.ByteEndian); //字節序, ServerConfig.ByteEndian = ByteOrder.LITTLE_ENDIAN
//消息buf
IoBuffer buf = IoBuffer.allocate(ServerConfig.MessageMaxByte); //ServerConfig.MessageMaxByte 最大消息字節數
buf.order(ServerConfig.ByteEndian);
//考慮如下幾種狀況:
// 1. 一個ip包中只包含一個完整消息
// 2. 一個ip包中包含一個完整消息和另外一個消息的一部分
// 3. 一個ip包中包含一個消息的一部分
// 4. 一個ip包中包含兩個完整的數據消息或更多(循環處理在父類的decode中)
if (in.remaining() > 1) {
int length = in.getShort(in.position());
if (length < 4) {
throw new ServerException("Error net message. (Message Length="+length+")");
}
if (length > ServerConfig.MessageMaxByte) {
throw new ServerException("Error net message. Message Length("+length+") > MessageMaxByte("+ServerConfig.MessageMaxByte+")");
}
if (length > in.remaining()) return false;
//複製一個完整消息
byte[] bytes = new byte[length];
in.get(bytes);
buf.put(bytes);
buf.flip();
out.write(buf);
return true;
} else {
return false;
}
}
}
二. 使用
將上面的解碼器做爲一個過濾器配置到mina中便可,在spring中的配置方法以下:
<!-- 協議過濾器,包括解碼和譯碼 -->
<bean id="protocolCodecFilter" class="org.apache.mina.filter.codec.ProtocolCodecFilter">
<constructor-arg>
<bean id="factory" class="server.ClientConnServer.MessageCodecFactory"></bean>
</constructor-arg>
</bean>
<!-- 將協議過濾器配置到mina的過濾鏈中 -->
<bean id="filterChainBuilder" class="org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder">
<property name="filters">
<map>
<entry key="protocolCodecFilter" value-ref="protocolCodecFilter" />
</map>
</property>
</bean>
<!-- 處理器 -->
<bean id="clientConnHandler" class="server.ClientConnServer.ClientConnHandler" />
<!-- socket接收器,接收客戶端鏈接 -->
<bean id="ioAcceptor" class="org.apache.mina.transport.socket.nio.NioSocketAcceptor" destroy-method="unbind">
<!-- <property name="defaultLocalAddress" value=":161" />-->
<property name="handler" ref="clientConnHandler" />
<property name="reuseAddress" value="true" />
<property name="filterChainBuilder" ref="filterChainBuilder" />
</bean>
要配置協議過濾器,必須使用一個ProtocolCodecFactory ,下面是簡單實現
public class MessageCodecFactory implements ProtocolCodecFactory {
private final MessageEncoder encoder;
private final MessageDecoder decoder;
public MessageCodecFactory() {
encoder = new MessageEncoder();
decoder = new MessageDecoder();
}
/* (non-Javadoc)
* @see org.apache.mina.filter.codec.ProtocolCodecFactory#getDecoder(org.apache.mina.core.session.IoSession)
*/
@Override
public ProtocolDecoder getDecoder(IoSession session) throws Exception {
return decoder;
}
/* (non-Javadoc)
* @see org.apache.mina.filter.codec.ProtocolCodecFactory#getEncoder(org.apache.mina.core.session.IoSession)
*/
@Override
public ProtocolEncoder getEncoder(IoSession session) throws Exception {
return encoder;
}
}
/**
* 譯碼器,不作任何事情
*/
public class MessageEncoder extends ProtocolEncoderAdapter {
/* (non-Javadoc)
* @see org.apache.mina.filter.codec.ProtocolEncoder#encode(org.apache.mina.core.session.IoSession, java.lang.Object, org.apache.mina.filter.codec.ProtocolEncoderOutput)
*/
@Override
public void encode(IoSession session, Object message,
ProtocolEncoderOutput out) throws Exception {
//Do nothing
}
}
轉自http://blianchen.blog.163.com/blog/static/1310562992010101891522100/
搞定了
ps:不知這個是否是mina的bug,是否是還有別的方法配置的呢???
請教那位兄弟有更好的解決方法.
qq:85529766