Apache Mina Server 是一個網絡通訊應用框架,也就是說,它主要是對基於TCP/IP、UDP/IP協議棧的通訊框架(固然,也能夠提供JAVA 對象的序列化服務、虛擬機管道通訊服務等),Mina 能夠幫助咱們快速開發高性能、高擴展性的網絡通訊應用,Mina 提供了事件驅動、異步(Mina 的異步IO 默認使用的是JAVA NIO 做爲底層支持)操做的編程模型。Mina 主要有1.x 和2.x 兩個分支,這裏咱們講解最新版本2.0,若是你使用的是Mina 1.x,那麼可能會有一些功能並不適用。學習本文檔,須要你已掌握JAVA IO、JAVA NIO、JAVASocket、JAVA 線程及併發庫(java.util.concurrent.*)的知識。Mina 同時提供了網絡通訊的Server 端、Client 端的封裝,不管是哪端,Mina 在整個網統統信結構中都處於以下的位置:可見Mina 的API 將真正的網絡通訊與咱們的應用程序隔離開來,你只須要關心你要發送、接收的數據以及你的業務邏輯便可。一樣的,不管是哪端,Mina 的執行流程以下所示:html
Mina的底層依賴的主要是Java NIO庫,上層提供的是基於事件的異步接口。其總體的結構以下:前端
(1.) IoService:最底層的是IOService,負責具體的IO相關工做。這一層的典型表明有IOSocketAcceptor和IOSocketChannel,分別對應TCP協議下的服務端和客戶端的IOService。IOService的意義在於隱藏底層IO的細節,對上提供統一的基於事件的異步IO接口。每當有數據到達時,IOService會先調用底層IO接口讀取數據,封裝成IoBuffer,以後以事件的形式通知上層代碼,從而將Java NIO的同步IO接口轉化成了異步IO。因此從圖上看,進來的low-level IO通過IOService層後變成IO Event。具體的代碼能夠參考org.apache.mina.core.polling.AbstractPollingIoProcessor的私有內部類Processor。java
(2.) IoProcessor:這個接口在另外一個線程上,負責檢查是否有數據在通道上讀寫,也就是說它也擁有本身的Selector,這是與咱們使用JAVA NIO 編碼時的一個不一樣之處,一般在JAVA NIO 編碼中,咱們都是使用一個Selector,也就是不區分IoService與IoProcessor 兩個功能接口。另外,IoProcessor 負責調用註冊在IoService 上的過濾器,並在過濾器鏈以後調用IoHandler。
(3.) IoFilter:這個接口定義一組攔截器,這些攔截器能夠包括日誌輸出、黑名單過濾、數據的編碼(write 方向)與解碼(read 方向)等功能,其中數據的encode 與decode是最爲重要的、也是你在使用Mina 時最主要關注的地方。
(4.) IoHandler:這個接口負責編寫業務邏輯,也就是接收、發送數據的地方。須要有開發者本身來實現這個接口。IoHandler能夠當作是Mina處理流程的終點,每一個IoService都須要指定一個IoHandler。spring
(5.)IoSession:是對底層鏈接(服務器與客戶端的特定鏈接,該鏈接由服務器地址、端口以及客戶端地址、端口來決定)的封裝,一個IoSession對應於一個底層的IO鏈接(在Mina中UDP也被抽象成了鏈接)。經過IoSession,能夠獲取當前鏈接相關的上下文信息,以及向遠程peer發送數據。發送數據其實也是個異步的過程。發送的操做首先會逆向穿過IoFilterChain,到達IoService。但IoService上並不會直接調用底層IO接口來將數據發送出去,而是會將該次調用封裝成一個WriteRequest,放入session的writeRequestQueue中,最後由IoProcessor線程統一調度flush出去。因此發送操做並不會引發上層調用線程的阻塞。具體代碼能夠參考org.apache.mina.core.filterchain.DefaultIoFilterChain的內部類HeadFilter的filterWrite方法。數據庫
工做流程:apache
一圖勝千言,MINA的核心類圖:編程
1. 簡單的TCPServer:
(1.) 第一步:編寫IoService
按照上面的執行流程,咱們首先須要編寫IoService,IoService 自己既是服務端,又是客戶端,咱們這裏編寫服務端,因此使用IoAcceptor 實現,因爲IoAcceptor 是與協議無關的,由於咱們要編寫TCPServer,因此咱們使用IoAcceptor 的實現NioSocketAcceptor,實際上底層就是調用java.nio.channels.ServerSocketChannel 類。固然,若是你使用了Apache 的APR 庫,那麼你能夠選擇使AprSocketAcceptor 做爲TCPServer 的實現,據傳說Apache APR庫的性能比JVM 自帶的本地庫高出不少。那麼IoProcessor 是由指定的IoService 內部建立並調用的,咱們並不須要關心。數組
IoAcceptor ac
ceptor = new NioSocketAcceptor(); acceptor.getSessionConfig().setReadBufferSize(2048); acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); //設置過濾器 //...
//設置handler
//綁定端口 acceptor.bind(new InetSocketAddress(9124));
這段代碼咱們初始化了服務端的TCP/IP 的基於NIO 的套接字,而後調用IoSessionConfig設置讀取數據的緩衝區大小、讀寫通道均在10 秒內無任何操做就進入空閒狀態。瀏覽器
(2.) 第二步:編寫過濾器
這裏咱們處理最簡單的字符串傳輸,Mina 已經爲咱們提供了TextLineCodecFactory 編解碼器工廠來對字符串進行編解碼處理。緩存
這段代碼要在acceptor.bind()方法以前執行,由於綁定套接字以後就不能再作這些準備工做了。這裏先不用清楚編解碼器是如何工做的,這個是後面重點說明的內容,這裏你只須要清楚,咱們傳輸的以換行符爲標識的數據,因此使用了Mina 自帶的換行符編解碼器工廠。
(3.) 第三步:編寫IoHandler
這裏咱們只是簡單的打印Client 傳說過來的數據。
package com.dxz.minademo2; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IoSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TCPServerHandler extends IoHandlerAdapter { // 這裏咱們使用的SLF4J做爲日誌門面,至於爲何在後面說明。 private final static Logger log = LoggerFactory.getLogger(TCPServerHandler.class); @Override public void messageReceived(IoSession session, Object message) throws Exception { String str = message.toString(); System.out.println("The message received is [" + str + "]"); if (str.endsWith("quit")) { session.close(true); return; } } @Override public void sessionCreated(IoSession session) throws Exception { System.out.println("server session created"); super.sessionCreated(session); } @Override public void sessionOpened(IoSession session) throws Exception { System.out.println("server session Opened"); super.sessionOpened(session); } @Override public void sessionClosed(IoSession session) throws Exception { System.out.println("server session Closed"); super.sessionClosed(session); } }
而後咱們把這個IoHandler 註冊到IoService:
//設置handler acceptor.setHandler(new TCPServerHandler());
固然這段代碼也要在acceptor.bind()方法以前執行。而後咱們運行MyServer 中的main 方法,你能夠看到控制檯一直處於阻塞狀態,等待客戶端鏈接。
完成的代碼:
package com.dxz.minademo2; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.charset.Charset; import org.apache.mina.core.service.IoAcceptor; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.LineDelimiter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; public class TCPServer { public static void main(String[] args) throws IOException { IoAcceptor acceptor = new NioSocketAcceptor(); acceptor.getSessionConfig().setReadBufferSize(2048); acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); // 編寫過濾器 acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue())) ); //設置handler acceptor.setHandler(new TCPServerHandler()); //綁定端口 acceptor.bind(new InetSocketAddress(9124)); } }
測試:
此時,咱們用telnet 127.0.0.1 9123 訪問,而後輸入一些內容,當按下回車鍵,你會發現數據在Server 端被輸出,但要注意不要輸入中文,由於Windows 的命令行窗口不會對傳輸的數據進行UTF-8 編碼。當輸入quit 結尾的字符串時,鏈接被斷開。這裏注意你若是使用的操做系統,或者使用的Telnet 軟件的換行符是什麼,若是不清楚,能夠刪掉第二步中的兩個紅色的參數,使用TextLineCodec 內部的自動識別機制。
pom.xml
<!-- MINA集成 --> <dependency> <groupId>org.apache.mina</groupId> <artifactId>mina-core</artifactId> <version>2.0.7</version> </dependency> <dependency> <groupId>org.apache.mina</groupId> <artifactId>mina-integration-spring</artifactId> <version>1.1.7</version> </dependency>
2. 簡單的TCPClient:
這裏咱們實現Mina 中的TCPClient,由於前面說過不管是Server 端仍是Client 端,在Mina中的執行流程都是同樣的。惟一不一樣的就是IoService 的Client 端實現是IoConnector。
(1.) 第一步:編寫IoService並註冊過濾器
package com.dxz.minademo2; import java.net.InetSocketAddress; import java.nio.charset.Charset; import org.apache.mina.core.service.IoConnector; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.LineDelimiter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.transport.socket.nio.NioSocketConnector; public class TCPClient { public static void main(String[] args) { IoConnector connector = new NioSocketConnector(); connector.setConnectTimeoutMillis(30000); connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue()))); connector.setHandler(new TCPClientHandler("你好!\r\n 你們好!")); connector.connect(new InetSocketAddress("localhost", 9124)); } }
(2.) 第三步:編寫IoHandler
package com.dxz.minademo2; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IoSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TCPClientHandler extends IoHandlerAdapter { private final static Logger LOGGER = LoggerFactory.getLogger(TCPClientHandler.class); private final String values; public TCPClientHandler(String values) { this.values = values; } @Override public void sessionOpened(IoSession session) { session.write(values); } }
註冊IoHandler:
connector.setHandler(new ClientHandler("你好!\r\n 你們好!"));
而後咱們運行MyClient,你會發現MyServer 輸出以下語句:
The message received is [你好!]
The message received is [你們好!]
咱們看到服務端是按照收到兩條消息輸出的,由於咱們用的編解碼器是以換行符判斷數據是否讀取完畢的。
3. 介紹Mina的TCP的主要接口:
經過上面的兩個示例,你應該對Mina 如何編寫TCP/IP 協議棧的網絡通訊有了一些感性的認識。
(1.)IoService:
這個接口是服務端IoAcceptor、客戶端IoConnector 的抽象,提供IO 服務和管理IoSession的功能,它有以下幾個經常使用的方法:
A. TransportMetadata getTransportMetadata():
這個方法獲取傳輸方式的元數據描述信息,也就是底層到底基於什麼的實現,譬如:nio、apr 等。
B. void addListener(IoServiceListener listener):
這個方法能夠爲IoService 增長一個監聽器,用於監聽IoService 的建立、活動、失效、空閒、銷燬,具體能夠參考IoServiceListener 接口中的方法,這爲你參與IoService 的生命週期提供了機會。
C. void removeListener(IoServiceListener listener):
這個方法用於移除上面的方法添加的監聽器。
D. void setHandler(IoHandler handler):
這個方法用於向IoService 註冊IoHandler,同時有getHandler()方法獲取Handler。
E. Map<Long,IoSession> getManagedSessions():
這個方法獲取IoService 上管理的全部IoSession,Map 的key 是IoSession 的id。
F. IoSessionConfig getSessionConfig():
這個方法用於獲取IoSession 的配置對象,經過IoSessionConfig 對象能夠設置Socket 鏈接的一些選項。
(2.)IoAcceptor:
這個接口是TCPServer 的接口,主要增長了void bind()監聽端口、void unbind()解除對套接字的監聽等方法。這裏與傳統的JAVA 中的ServerSocket 不一樣的是IoAcceptor 能夠屢次調用bind()方法(或者在一個方法中傳入多個SocketAddress 參數)同時監聽多個端口。
3.)IoConnector:
這個接口是TCPClient 的接口, 主要增長了ConnectFuture connect(SocketAddressremoteAddress,SocketAddress localAddress)方法,用於與Server 端創建鏈接,第二個參數若是不傳遞則使用本地的一個隨機端口訪問Server 端。這個方法是異步執行的,一樣的,也能夠同時鏈接多個服務端。
(4.)IoSession:
這個接口用於表示Server 端與Client 端的鏈接,IoAcceptor.accept()的時候返回實例。
這個接口有以下經常使用的方法:
A. WriteFuture write(Object message):
這個方法用於寫數據,該操做是異步的。
B. CloseFuture close(boolean immediately):
這個方法用於關閉IoSession,該操做也是異步的,參數指定true 表示當即關閉,不然就在全部的寫操做都flush 以後再關閉。
C. Object setAttribute(Object key,Object value):
這個方法用於給咱們向會話中添加一些屬性,這樣能夠在會話過程當中均可以使用,相似於HttpSession 的setAttrbute()方法。IoSession 內部使用同步的HashMap 存儲你添加的自定義屬性。
D. SocketAddress getRemoteAddress():
這個方法獲取遠端鏈接的套接字地址。
E. void suspendWrite():
這個方法用於掛起寫操做,那麼有void resumeWrite()方法與之配對。對於read()方法一樣適用。
F. ReadFuture read():
這個方法用於讀取數據, 但默認是不能使用的, 你須要調用IoSessionConfig 的setUseReadOperation(true)纔可使用這個異步讀取的方法。通常咱們不會用到這個方法,由於這個方法的內部實現是將數據保存到一個BlockingQueue,假如是Server 端,由於大量的Client 端發送的數據在Server 端都這麼讀取,那麼可能會致使內存泄漏,但對於Client,可能有的時候會比較便利。
G. IoService getService():
這個方法返回與當前會話對象關聯的IoService 實例。
關於TCP鏈接的關閉:
不管在客戶端仍是服務端,IoSession 都用於表示底層的一個TCP 鏈接,那麼你會發現不管是Server 端仍是Client 端的IoSession 調用close()方法以後,TCP 鏈接雖然顯示關閉, 但主線程仍然在運行,也就是JVM 並未退出,這是由於IoSession 的close()僅僅是關閉了TCP的鏈接通道,並無關閉Server 端、Client 端的程序。你須要調用IoService 的dispose()方法中止Server 端、Client 端。
(5.)IoSessionConfig:
這個方法用於指定這次會話的配置,它有以下經常使用的方法:
A. void setReadBufferSize(int size):
這個方法設置讀取緩衝的字節數,但通常不須要調用這個方法,由於IoProcessor 會自動調整緩衝的大小。你能夠調用setMinReadBufferSize()、setMaxReadBufferSize()方法,這樣不管IoProcessor 不管如何自動調整,都會在你指定的區間。
B. void setIdleTime(IdleStatus status,int idleTime):
這個方法設置關聯在通道上的讀、寫或者是讀寫事件在指定時間內未發生,該通道就進入空閒狀態。一旦調用這個方法,則每隔idleTime 都會回調過濾器、IoHandler 中的sessionIdle()方法。
C. void setWriteTimeout(int time):
這個方法設置寫操做的超時時間。
D. void setUseReadOperation(boolean useReadOperation):
這個方法設置IoSession 的read()方法是否可用,默認是false。
(6.)IoHandler:
這個接口是你編寫業務邏輯的地方,從上面的示例代碼能夠看出,讀取數據、發送數據基本都在這個接口總完成,這個實例是綁定到IoService 上的,有且只有一個實例(沒有給一個IoService 注入一個IoHandler 實例會拋出異常)。它有以下幾個方法:
A. void sessionCreated(IoSession session):
這個方法當一個Session 對象被建立的時候被調用。對於TCP 鏈接來講,鏈接被接受的時候調用,但要注意此時TCP 鏈接並未創建,此方法僅表明字面含義,也就是鏈接的對象IoSession 被建立完畢的時候,回調這個方法。對於UDP 來講,當有數據包收到的時候回調這個方法,由於UDP 是無鏈接的。
B. void sessionOpened(IoSession session):
這個方法在鏈接被打開時調用,它老是在sessionCreated()方法以後被調用。對於TCP 來講,它是在鏈接被創建以後調用,你能夠在這裏執行一些認證操做、發送數據等。對於UDP 來講,這個方法與sessionCreated()沒什麼區別,可是緊跟其後執行。若是你每隔一段時間,發送一些數據,那麼sessionCreated()方法只會在第一次調用,可是sessionOpened()方法每次都會調用。
C. void sessionClosed(IoSession session) :
對於TCP 來講,鏈接被關閉時,調用這個方法。對於UDP 來講,IoSession 的close()方法被調用時纔會毀掉這個方法。
D. void sessionIdle(IoSession session, IdleStatus status) :
這個方法在IoSession 的通道進入空閒狀態時調用,對於UDP 協議來講,這個方法始終不會被調用。
E. void exceptionCaught(IoSession session, Throwable cause) :
這個方法在你的程序、Mina 自身出現異常時回調,通常這裏是關閉IoSession。
F. void messageReceived(IoSession session, Object message) :
接收到消息時調用的方法,也就是用於接收消息的方法,通常狀況下,message 是一個IoBuffer 類,若是你使用了協議編解碼器,那麼能夠強制轉換爲你須要的類型。一般咱們都是會使用協議編解碼器的, 就像上面的例子, 由於協議編解碼器是
TextLineCodecFactory,因此咱們能夠強制轉message 爲String 類型。
G. void messageSent(IoSession session, Object message) :
當發送消息成功時調用這個方法,注意這裏的措辭,發送成功以後,也就是說發送消息是不能用這個方法的。
發送消息的時機:
發送消息應該在sessionOpened()、messageReceived()方法中調用IoSession.write()方法完成。由於在sessionOpened()方法中,TCP 鏈接已經真正打開,一樣的在messageReceived()方法TCP 鏈接也是打開狀態,只不過二者的時機不一樣。sessionOpened()方法是在TCP 鏈接創建以後,接收到數據以前發送;messageReceived()方法是在接收到數據以後發送,你能夠完成依據收到的內容是什麼樣子,決定發送什麼樣的數據。由於這個接口中的方法太多,所以一般使用適配器模式IoHandlerAdapter,覆蓋你所感興趣的方法便可。
(7.)IoBuffer:
這個接口是對JAVA NIO 的ByteBuffer 的封裝,這主要是由於ByteBuffer 只提供了對基本數據類型的讀寫操做,沒有提供對字符串等對象類型的讀寫方法,使用起來更爲方便,另外,ByteBuffer 是定長的,若是想要可變,將很麻煩。IoBuffer 的可變長度的實現相似於StringBuffer。IoBuffer 與ByteBuffer 同樣,都是非線程安全的。本節的一些內容若是不清楚,能夠參考java.nio.ByteBuffer 接口。這個接口有以下經常使用的方法:
A. static IoBuffer allocate(int capacity,boolean useDirectBuffer):
這個方法內部經過SimpleBufferAllocator 建立一個實例,第一個參數指定初始化容量,第二個參數指定使用直接緩衝區仍是JAVA 內存堆的緩存區,默認爲false。
B. void free():
釋放緩衝區,以便被一些IoBufferAllocator 的實現重用,通常沒有必要調用這個方法,除非你想提高性能(但可能未必效果明顯)。
C. IoBuffer setAutoExpand(boolean autoExpand):
這個方法設置IoBuffer 爲自動擴展容量,也就是前面所說的長度可變,那麼能夠看出長度可變這個特性默認是不開啓的。
D. IoBuffer setAutoShrink(boolean autoShrink):
這個方法設置IoBuffer 爲自動收縮,這樣在compact()方法調用以後,能夠裁減掉一些沒有使用的空間。若是這個方法沒有被調用或者設置爲false,你也能夠經過調用shrink()方法手動收縮空間。
E. IoBuffer order(ByteOrder bo):
這個方法設置是Big Endian 仍是Little Endian,JAVA 中默認是Big Endian,C++和其餘語言通常是Little Endian。
F. IoBuffer asReadOnlyBuffer():
這個方法設置IoBuffer 爲只讀的。
G. Boolean prefixedDataAvailable(int prefixLength,int maxDataLength):
這個方法用於數據的最開始的一、二、4 個字節表示的是數據的長度的狀況,
prefixLentgh表示這段數據的前幾個字節(只能是一、二、4 的其中一個),表明的是這段數據的長度,
maxDataLength 表示最多要讀取的字節數。返回結果依賴於等式
remaining()-prefixLength>=maxDataLength,也就是總的數據-表示長度的字節,剩下的字節數要比打算讀取的字節數大或者相等。
H. String getPrefixedString(int prefixLength,CharsetDecoder decoder):
若是上面的方法返回true,那麼這個方法將開始讀取表示長度的字節以後的數據,注意要保持這兩個方法的prefixLength 的值是同樣的。
G、H 兩個方法在後面講到的PrefixedStringDecoder 中的內部實現使用。
IoBuffer 剩餘的方法與ByteBuffer 都是差很少的,額外增長了一些便利的操做方法,例如:
IoBuffer putString(String value,CharsetEncoder encoder)能夠方便的以指定的編碼方式存儲字符串、InputStream asInputStream()方法從IoBuffer 剩餘的未讀的數據中轉爲輸入流等。
(8.)IoFuture:
在Mina 的不少操做中,你會看到返回值是XXXFuture,實際上他們都是IoFuture 的子類,看到這樣的返回值,這個方法就說明是異步執行的,主要的子類有ConnectFuture、CloseFuture 、ReadFuture 、WriteFuture 。這個接口的大部分操做都和
java.util.concurrent.Future 接口是相似的,譬如:await()、awaitUninterruptibly()等,通常咱們經常使用awaitUninterruptibly()方法能夠等待異步執行的結果返回。這個接口有以下經常使用的方法:
A. IoFuture addListener(IoFutureListener<?> listener):
這個方法用於添加一個監聽器, 在異步執行的結果返回時監聽器中的回調方法operationComplete(IoFuture future),也就是說,這是替代awaitUninterruptibly()方法另外一種等待異步執行結果的方法,它的好處是不會產生阻塞。
B. IoFuture removeListener(IoFutureListener<?> listener):
這個方法用於移除指定的監聽器。
C. IoSession getSession():
這個方法返回當前的IoSession。舉個例子,咱們在客戶端調用connect()方法訪問Server 端的時候,實際上這就是一個異步執行的方法,也就是調用connect()方法以後當即返回,執行下面的代碼,而無論是否鏈接成功。那麼若是我想在鏈接成功以後執行一些事情(譬如:獲取鏈接成功後的IoSession對象),該怎麼辦呢?按照上面的說明,你有以下兩種辦法:
第一種:
package com.dxz.minademo3; import java.net.InetSocketAddress; import java.nio.charset.Charset; import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.service.IoConnector; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.LineDelimiter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.transport.socket.nio.NioSocketConnector; import com.dxz.minademo2.TCPClientHandler; public class TCPClient { public static void main(String[] args) { IoConnector connector = new NioSocketConnector(); connector.setConnectTimeoutMillis(30000); connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue()))); connector.setHandler(new TCPClientHandler("你好!\r\n 你們好!")); ConnectFuture future = connector.connect(new InetSocketAddress("localhost", 9124)); // 等待是否鏈接成功,至關因而轉異步執行爲同步執行。 future.awaitUninterruptibly(); // 鏈接成功後獲取會話對象。若是沒有上面的等待,因爲connect()方法是異步的,session可能會沒法獲取。 IoSession session = future.getSession(); System.out.println(session); } }
第二種:
package com.dxz.minademo3; import java.net.InetSocketAddress; import java.nio.charset.Charset; import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.future.IoFutureListener; import org.apache.mina.core.service.IoConnector; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.LineDelimiter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.transport.socket.nio.NioSocketConnector; import com.dxz.minademo2.TCPClientHandler; public class TCPClient { public static void main(String[] args) { IoConnector connector = new NioSocketConnector(); connector.setConnectTimeoutMillis(30000); connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue()))); connector.setHandler(new TCPClientHandler("你好!\r\n 你們好!")); ConnectFuture future = connector.connect(new InetSocketAddress("localhost", 9124)); future.addListener(new IoFutureListener<ConnectFuture>() { public void operationComplete(ConnectFuture future) { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } IoSession session = future.getSession(); System.out.println("++++++++++++++++++++++++++++"); } }); System.out.println("*************"); } }
結果:
************* ++++++++++++++++++++++++++++
爲了更好的看清楚使用監聽器是異步的,而不是像awaitUninterruptibly()那樣會阻塞主線程的執行,咱們在回調方法中暫停5 秒鐘,而後輸出+++,在最後輸出***。咱們執行代碼以後,你會發現首先輸出***(這證實了監聽器是異步執行的),而後IoSession 對象Created,系統暫停5 秒,而後輸出+++,最後IoSession 對象Opened,也就是TCP 鏈接創建。
4.日誌配置:
前面的示例代碼中提到了使用SLF4J 做爲日誌門面,這是由於Mina 內部使用的就是SLF4J,你也使用SLF4J 能夠與之保持一致性。Mina 若是想啓用日誌跟蹤Mina 的運行細節,你能夠配置LoggingFilter 過濾器,這樣你能夠看到Session 創建、打開、空閒等一系列細節在日誌中輸出,默認SJF4J 是按照DEBUG級別輸出跟蹤信息的,若是你想給某一類別的Mina 運行信息輸出指定日誌輸出級別,能夠調用LoggingFilter 的setXXXLogLevel(LogLevel.XXX)。
例:
這裏IoSession 被打開的跟蹤信息將以ERROR 級別輸出到日誌。
5.過濾器:
前面咱們看到了LoggingFilter、ProtocolCodecFilter 兩個過濾器,一個負責日誌輸出,一個負責數據的編解碼,經過最前面的Mina 執行流程圖,在IoProcessor 與IoHandler 之間能夠有不少的過濾器,這種設計方式爲你提供可插拔似的擴展功能提供了很是便利的方式,目前的Apache CXF、Apache Struts2 中的攔截器也都是同樣的設計思路。Mina 中的IoFilter 是單例的,這與CXF、Apache Struts2 沒什麼區別。IoService 實例上會綁定一個DefaultIoFilterChainBuilder 實例,DefaultIoFilterChainBuilder 會把使用內部的EntryImpl 類把全部的過濾器按照順序連在一塊兒,組成一個過濾器鏈。
DefaultIoFilterChainBuilder 類以下經常使用的方法:
A. void addFirst(String name,IoFilter filter):
這個方法把過濾器添加到過濾器鏈的頭部,頭部就是IoProcessor 以後的第一個過濾器。一樣的addLast()方法把過濾器添加到過濾器鏈的尾部。
B. void addBefore(String baseName,String name,IoFilter filter):
這個方法將過濾器添加到baseName 指定的過濾器的前面,一樣的addAfter()方法把過濾器添加到baseName 指定的過濾器的後面。這裏要注意不管是那種添加方法,每一個過濾器的名字(參數name)必須是惟一的。
C. IoFilter remove(Stirng name):
這個方法移除指定名稱的過濾器,你也能夠調用另外一個重載的remove()方法,指定要移除的IoFilter 的類型。
D. List<Entry> getAll():
這個方法返回當前IoService 上註冊的全部過濾器。默認狀況下,過濾器鏈中是空的,也就是getAll()方法返回長度爲0 的List,但實際Mina內部有兩個隱藏的過濾器:HeadFilter、TailFilter,分別在List 的最開始和最末端,很明顯,TailFilter 在最末端是爲了調用過濾器鏈以後,調用IoHandler。但這兩個過濾器對你來講是透明的,能夠忽略它們的存在。編寫一個過濾器很簡單,你須要實現IoFilter 接口,若是你只關注某幾個方法,能夠繼承IoFilterAdapter 適配器類。IoFilter 接口中主要包含兩類方法,一類是與IoHandler 中的方法名一致的方法,至關於攔截IoHandler 中的方法,另外一類是IoFilter 的生命週期回調方法,這些回調方法的執行順序和解釋以下所示:
(1.)init()在首次添加到鏈中的時候被調用,但你必須將這個IoFilter 用
ReferenceCountingFilter 包裝起來,不然init()方法永遠不會被調用。
(2.)onPreAdd()在調用添加到鏈中的方法時被調用,但此時還未真正的加入到鏈。
(3.)onPostAdd()在調用添加到鏈中的方法後被調,若是在這個方法中有異常拋出,則過濾器會當即被移除,同時destroy()方法也會被調用(前提是使用ReferenceCountingFilter包裝)。
(4.)onPreRemove()在從鏈中移除以前調用。
(5.)onPostRemove()在從鏈中移除以後調用。
(6.)destory()在從鏈中移除時被調用,使用方法與init()要求相同。
不管是哪一個方法,要注意必須在實現時調用參數nextFilter 的同名方法,不然,過濾器鏈的執行將被中斷,IoHandler 中的同名方法同樣也不會被執行,這就至關於Servlet 中的Filter 必須調用filterChain.doFilter(request,response)才能繼續前進是同樣的道理。
示例:
package com.dxz.minademo3; import org.apache.mina.core.filterchain.IoFilter; import org.apache.mina.core.filterchain.IoFilterChain; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; import org.apache.mina.core.write.WriteRequest; public class MyIoFilter implements IoFilter { @Override public void destroy() throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%�stroy"); } @Override public void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%exceptionCaught"); nextFilter.exceptionCaught(session, cause); } @Override public void filterClose(NextFilter nextFilter, IoSession session) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%filterClose"); nextFilter.filterClose(session); } @Override public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%filterWrite"); nextFilter.filterWrite(session, writeRequest); } @Override public void init() throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%init"); } @Override public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%messageReceived"); nextFilter.messageReceived(session, message); } @Override public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%messageSent"); nextFilter.messageSent(session, writeRequest); } @Override public void onPostAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPostAdd"); } @Override public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPostRemove"); } @Override public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPreAdd"); } @Override public void onPreRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPreRemove"); } @Override public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionClosed"); nextFilter.sessionClosed(session); } @Override public void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionCreated"); nextFilter.sessionCreated(session); } @Override public void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionIdle"); nextFilter.sessionIdle(session, status); } @Override public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionOpened"); nextFilter.sessionOpened(session); } }
咱們將這個攔截器註冊到上面的TCPServer 的IoAcceptor 的過濾器鏈中的最後一個:
acceptor.getFilterChain().addLast("myIoFilter", new ReferenceCountingFilter(new MyIoFilter()));
這裏咱們將MyIoFilter 用ReferenceCountingFilter 包裝起來,這樣你能夠看到init()、destroy()方法調用。咱們啓動客戶端訪問,而後關閉客戶端,你會看到執行順序以下所示:
%%%%%%%%%%%%%%%%%%%%%%%%%%%init
%%%%%%%%%%%%%%%%%%%%%%%%%%%onPreAdd %%%%%%%%%%%%%%%%%%%%%%%%%%%onPostAdd %%%%%%%%%%%%%%%%%%%%%%%%%%%sessionCreated server session created %%%%%%%%%%%%%%%%%%%%%%%%%%%sessionOpened server session Opened %%%%%%%%%%%%%%%%%%%%%%%%%%%messageReceived The message received is [你好!] %%%%%%%%%%%%%%%%%%%%%%%%%%%messageReceived The message received is [ 你們好!] %%%%%%%%%%%%%%%%%%%%%%%%%%%sessionIdle
IoHandler 的對應方法會跟在上面的對應方法以後執行,這也就是說從橫向(單獨的看一個過濾器中的全部方法的執行順序)上看,每一個過濾器的執行順序是上面所示的順序;從縱向(方法鏈的調用)上看,若是有filter一、filter2 兩個過濾器,sessionCreated()方法的執行順序以下所示:
filter1-sessionCreated filter2-sessionCreated IoHandler-sessionCreated。
這裏你要注意init、onPreAdd、onPostAdd 三個方法並非在Server 啓動時調用的,而是IoSession 對象建立以前調用的,也就是說IoFilterChain.addXXX()方法僅僅負責初始化過濾器並註冊過濾器,但並不調用任何方法,包括init()初始化方法也是在IoProcessor 開始工做的時候被調用。IoFilter 是單例的,那麼init()方法是否只被執行一次呢?這個是不必定的,由於IoFilter是被IoProcessor 調用的,而每一個IoService 一般是關聯多個IoProcessor,因此IoFilter的init()方法是在每一個IoProcessor 線程上只執行一次。關於Mina 的線程問題,咱們後面會詳細討論,這裏你只須要清楚,init()與destroy()的調用次數與IoProceesor 的個數有關,假如一個IoService 關聯了3 個IoProcessor,有五個併發的客戶端請求,那麼你會看到三次init()方法被調用,之後將再也不會調用。Mina中自帶的過濾器:
過濾器 說明
BlacklistFilter 設置一些IP 地址爲黑名單,不容許訪問。
BufferedWriteFilter 設置輸出時像BufferedOutputStream 同樣進行緩衝。
CompressionFilter 設置在輸入、輸出流時啓用JZlib 壓縮。
ConnectionThrottleFilter 這個過濾器指定同一個IP 地址(不含端口號)上的請求在多長的毫秒值內能夠有一個請求,若是小於指定的時間間隔就有連續兩個請求,那麼第二個請求將被忽略(IoSession.close())。正如Throttle 的名字同樣,調節訪問的頻率這個過濾器最好放在過濾器鏈的前面。
FileRegionWriteFilter 若是你想使用File 對象進行輸出,請使用這個過濾器。要注意,你須要使用WriteFuture 或者在
messageSent() 方法中關閉File 所關聯的FileChannel 通道。
StreamWriteFilter 若是你想使用InputStream 對象進行輸出,請使用這個過濾器。要注意,你須要使用WriteFuture或者在messageSent()方法中關閉File 所關聯的
FileChannel 通道。NoopFilter 這個過濾器什麼也不作,若是你想測試過濾器鏈是否起做用,能夠用它來測試。
ProfilerTimerFilter 這個過濾器用於檢測每一個事件方法執行的時間,因此最好放在過濾器鏈的前面。
ProxyFilter 這個過濾器在客戶端使用ProxyConnector 做爲實現時,會自動加入到過濾器鏈中,用於完成代理功能。
RequestResponseFilter 暫不知曉。
SessionAttributeInitializingFilter 這個過濾器在IoSession 中放入一些屬性(Map),一般放在過濾器的前面,用於放置一些初始化的信息。
MdcInjectionFilter 針對日誌輸出作MDC 操做,能夠參考LOG4J 的MDC、NDC 的文檔。
WriteRequestFilter CompressionFilter、RequestResponseFilter 的基類,用於包裝寫請求的過濾器。
還有一些過濾器,會在各節中詳細討論,這裏沒有列出,譬如:前面的LoggingFilger 日誌過濾器。
6.協議編解碼器:
multiplex:英 [ˈmʌltɪpleks] 美 [ˈmʌltəˌplɛks] adj.多元的,多倍的,複式的;多部的,複合的,多樣的,多重的;;[電訊]多路傳輸的n.多路;多廳影院,多劇場影劇院v.多路傳輸,多路複用;多重發訊
前面說過,協議編解碼器是在使用Mina 的時候你最須要關注的對象,由於在網絡傳輸的數據都是二進制數據(byte),而你在程序中面向的是JAVA 對象,這就須要你實如今發送數據時將JAVA 對象編碼二進制數據,而接收數據時將二進制數據解碼爲JAVA 對象(這個可不是JAVA 對象的序列化、反序列化那麼簡單的事情)。Mina 中的協議編解碼器經過過濾器ProtocolCodecFilter 構造,這個過濾器的構造方法須要一個ProtocolCodecFactory,這從前面註冊TextLineCodecFactory 的代碼就能夠看出來。
ProtocolCodecFactory 中有以下兩個方法:
public interface ProtocolCodecFactory { ProtocolEncoder getEncoder(IoSession session) throws Exception; ProtocolDecoder getDecoder(IoSession session) throws Exception; }
所以,構建一個ProtocolCodecFactory 須要ProtocolEncoder、ProtocolDecoder 兩個實例。你可能要問JAVA 對象和二進制數據之間如何轉換呢?這個要依據具體的通訊協議,也就是Server 端要和Client 端約定網絡傳輸的數據是什麼樣的格式,譬如:第一個字節表示數據長度,第二個字節是數據類型,後面的就是真正的數據(有多是文字、有多是圖片等等),而後你能夠依據長度從第三個字節向後讀,直到讀取到指定第一個字節指定長度的數據。
簡單的說,HTTP 協議就是一種瀏覽器與Web 服務器之間約定好的通訊協議,雙方按照指定的協議編解碼數據。咱們再直觀一點兒說,前面一直使用的TextLine 編解碼器就是在讀取網絡上傳遞過來的數據時,只要發現哪一個字節裏存放的是ASCII 的十、13 字符(/r、/n),就認爲以前的字節就是一個字符串(默認使用UTF-8 編碼)。以上所說的就是各類協議實際上就是網絡七層結構中的應用層協議,它位於網絡層(IP)、傳輸層(TCP)之上,Mina 的協議編解碼器就是讓你實現一套本身的應用層協議棧。
(6-1.)簡單的編解碼器示例:
下面咱們舉一個模擬電信運營商短信協議的編解碼器實現,假設通訊協議以下所示:
M sip:wap.fetion.com.cn SIP-C/2.0
S: 1580101xxxx
R: 1889020xxxx
L: 21
Hello World!
這裏的第一行表示狀態行,通常表示協議的名字、版本號等,第二行表示短信的發送號碼,第三行表示短信接收的號碼,第四行表示短信的字節數,最後的內容就是短信的內容。上面的每一行的末尾使用ASC II 的10(/n)做爲換行符,由於這是純文本數據,協議要
求雙方使用UTF-8 對字符串編解碼。實際上若是你熟悉HTTP 協議,上面的這個精簡的短信協議和HTTP 協議的組成是很是像的,第一行是狀態行,中間的是消息報頭,最後面的是消息正文。在解析這個短信協議以前,你須要知曉TCP 的一個事項,那就是數據的發送沒有規模性,所謂的規模性就是做爲數據的接收端,不知道到底何時數據算是讀取完畢,因此應用層協議在制定的時候,必須指定數據讀取的截至點。通常來講,有以下三種方式設置數據讀取的長度:
(1.)使用分隔符,譬如:TextLine 編解碼器。你可使用/r、/n、NUL 這些ASC II 中的特殊的字符來告訴數據接收端,你只要碰見分隔符,就表示數據讀完了,不用在那裏傻等着不知道還有沒有數據沒讀完啊?我可不能夠開始把已經讀取到的字節解碼爲指定的數據類型了啊?
(2.)定長的字節數,這種方式是使用長度固定的數據發送,通常適用於指令發送,譬如:數據發送端規定發送的數據都是雙字節,AA 表示啓動、BB 表示關閉等等。
(3.)在數據中的某個位置使用一個長度域,表示數據的長度,這種處理方式最爲靈活,上面的短信協議中的那個L 就是短信文字的字節數,其實HTTP 協議的消息報頭中的Content-Length 也是表示消息正文的長度,這樣數據的接收端就知道我到底讀到多長的
字節數就表示不用再讀取數據了。相比較解碼(字節轉爲JAVA 對象,也叫作拆包)來講,編碼(JAVA 對象轉爲字節,也叫作打包)就很簡單了,你只須要把JAVA 對象轉爲指定格式的字節流,write()就能夠了。下面咱們開始對上面的短信協議進行編解碼處理。
第一步,協議對象:
package com.dxz.minademo3; public class SmsObject { private String sender;// 短信發送者 private String receiver;// 短信接受者 private String message;// 短信內容 public String getSender() { return sender; } public void setSender(String sender) { this.sender = sender; } public String getReceiver() { return receiver; } public void setReceiver(String receiver) { this.receiver = receiver; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } }
第二步,編碼器:
在Mina 中編寫編碼器能夠實現ProtocolEncoder,其中有encode()、dispose()兩個方法須要實現。這裏的dispose()方法用於在銷燬編碼器時釋放關聯的資源,因爲這個方法通常咱們並不關心,因此一般咱們直接繼承適配器ProtocolEncoderAdapter。
package com.dxz.minademo3; import java.nio.charset.Charset; import java.nio.charset.CharsetEncoder; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolEncoderAdapter; import org.apache.mina.filter.codec.ProtocolEncoderOutput; public class CmccSipcEncoder extends ProtocolEncoderAdapter { private final Charset charset; public CmccSipcEncoder(Charset charset) { this.charset = charset; } @Override public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { SmsObject sms = (SmsObject) message; CharsetEncoder ce = charset.newEncoder(); IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true); String statusLine = "M sip:wap.fetion.com.cn SIP-C/2.0"; String sender = sms.getSender(); String receiver = sms.getReceiver(); String smsContent = sms.getMessage(); buffer.putString(statusLine + '\n', ce); buffer.putString("S: " + sender + '\n', ce); buffer.putString("R: " + receiver + '\n', ce); buffer.putString("L: " + (smsContent.getBytes(charset).length) + "\n", ce); buffer.putString(smsContent, ce); buffer.flip(); out.write(buffer); } }
這裏咱們依據傳入的字符集類型對message 對象進行編碼,編碼的方式就是按照短信協議拼裝字符串到IoBuffer 緩衝區,而後調用ProtocolEncoderOutput 的write()方法輸出字節流。這裏要注意生成短信內容長度時的紅色代碼,咱們使用String 類與Byte[]類型之間的轉換方法得到轉爲字節流後的字節數。
編碼器的編寫有如下幾個步驟:
A. 將 encode()方法中的message 對象強制轉換爲指定的對象類型;
B. 建立IoBuffer 緩衝區對象,並設置爲自動擴展;
C. 將轉換後的message 對象中的各個部分按照指定的應用層協議進行組裝,並put()到IoBuffer 緩衝區;
D. 當你組裝數據完畢以後,調用flip()方法,爲輸出作好準備,切記在write()方法以前,要調用IoBuffer 的flip()方法,不然緩衝區的position 的後面是沒有數據能夠用來輸出的,你必須調用flip()方法將position 移至0,limit 移至剛纔的position。這個flip()方法的含義請參看java.nio.ByteBuffer。
E. 最後調用ProtocolEncoderOutput 的write()方法輸出IoBuffer 緩衝區實例。
第三步,解碼器:
在Mina 中編寫解碼器,能夠實現ProtocolDecoder 接口,其中有decode()、finishDecode()、dispose()三個方法。這裏的finishDecode()方法能夠用於處理在IoSession 關閉時剩餘的未讀取數據,通常這個方法並不會被使用到,除非協議中未定義任何標識數據何時截止的約定,譬如:Http 響應的Content-Length 未設定,那麼在你認爲讀取完數據後,關閉TCP鏈接(IoSession 的關閉)後,就能夠調用這個方法處理剩餘的數據,固然你也能夠忽略調剩餘的數據。一樣的,通常狀況下,咱們只須要繼承適配器ProtocolDecoderAdapter,關注decode()方法便可。但前面說過解碼器相對編碼器來講,最麻煩的是數據發送過來的規模,以聊天室爲例,一個TCP 鏈接創建以後,那麼隔一段時間就會有聊天內容發送過來,也就是decode()方法會被往復調用,這樣處理起來就會很是麻煩。那麼Mina 中幸虧提供了CumulativeProtocolDecoder類,從名字上能夠看出累積性的協議解碼器,也就是說只要有數據發送過來,這個類就會去讀取數據,而後累積到內部的IoBuffer 緩衝區,可是具體的拆包(把累積到緩衝區的數據解碼爲JAVA 對象)交由子類的doDecode()方法完成,實際上CumulativeProtocolDecoder就是在decode()反覆的調用暴漏給子類實現的doDecode()方法。
具體執行過程以下所示:
A. 你的doDecode()方法返回true 時,CumulativeProtocolDecoder 的decode()方法會首先判斷你是否在doDecode()方法中從內部的IoBuffer 緩衝區讀取了數據,若是沒有,則會拋出非法的狀態異常,也就是你的doDecode()方法返回true 就表示你已經消費了本次數據(至關於聊天室中一個完整的消息已經讀取完畢),進一步說,也就是此時你必須已經消費過內部的IoBuffer 緩衝區的數據(哪怕是消費了一個字節的數據)。若是驗證過經過,那麼CumulativeProtocolDecoder 會檢查緩衝區內是否還有數據未讀取,若是有就繼續調用doDecode()方法,沒有就中止對doDecode()方法的調用,直到有新的數據被緩衝。
B. 當你的doDecode()方法返回false 時,CumulativeProtocolDecoder 會中止對doDecode()方法的調用,但此時若是本次數據還有未讀取完的,就將含有剩餘數據的IoBuffer 緩衝區保存到IoSession 中,以便下一次數據到來時能夠從IoSession 中提取合併。若是發現本次數據全都讀取完畢,則清空IoBuffer 緩衝區。簡而言之,當你認爲讀取到的數據已經夠解碼了,那麼就返回true,不然就返回false。這個CumulativeProtocolDecoder 其實最重要的工做就是幫你完成了數據的累積,由於這個工做是很煩瑣的。
咱們的這個短信協議解碼器使用/n(ASCII 的10 字符)做爲分解點,一個字節一個字節的讀取,那麼第一次發現/n 的字節位置以前的部分,必然就是短信協議的狀態行,依次類推,你就能夠解析出來發送者、接受者、短信內容長度。而後咱們在解析短信內容時,使用獲取到的長度進行讀取。所有讀取完畢以後, 而後構造SmsObject 短信對象, 使用ProtocolDecoderOutput 的write()方法輸出,最後返回false,也就是本次數據所有讀取完畢,告知CumulativeProtocolDecoder 在本次數據讀取中不須要再調用doDecode()方法了。這裏須要注意的是兩個狀態變量i、matchCount,i 用於記錄解析到了短信協議中的哪一行(/n),matchCount 記錄在當前行中讀取到了哪個字節。狀態變量在解碼器中常常被使用,咱們這裏的狀況比較簡單,由於咱們假定短信發送是在一次數據發送中完成的,因此狀態變量的使用也比較簡單。假如數據的發送被拆成了屢次(譬如:短信協議的短信內容、消息報頭被拆成了兩次數據發送),那麼上面的代碼勢必就會存在問題,由於當第二次調用doDecode()方法時,狀態變量i、matchCount 勢必會被重置,也就是原來的狀態值並無被保存。那麼咱們如何解決狀態保存的問題呢?答案就是將狀態變量保存在IoSession 中或者是Decoder 實例自身,但推薦使用前者,由於雖然Decoder 是單例的,其中的實例變量保存的狀態在Decoder 實例銷燬前始終保持,但Mina 並不保證每次調用doDecode()方法時都是同一個線程(這也就是說第一次調用doDecode()是IoProcessor-1 線程,第二次有可能就是IoProcessor-2 線程),這就會產生多線程中的實例變量的可視性(Visibility,具體請參考JAVA 的多線程知識)問題。IoSession中使用一個同步的HashMap 保存對象,因此你不須要擔憂多線程帶來的問題。使用IoSession 保存解碼器的狀態變量一般的寫法以下所示:
A. 在解碼器中定義私有的內部類Context,而後將須要保存的狀態變量定義在Context 中存儲。
B. 在解碼器中定義方法獲取這個Context 的實例,這個方法的實現要優先從IoSession 中獲取Context。
具體代碼示例以下所示:
// 上下文做爲保存狀態的內部類的名字,意思很明顯,就是讓狀態跟隨上下文,在整個調用過程當中均可以被保持。
package com.dxz.minademo3; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.AttributeKey; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.CumulativeProtocolDecoder; import org.apache.mina.filter.codec.ProtocolDecoderOutput; public class XXXDecoder extends CumulativeProtocolDecoder { private final AttributeKey CONTEXT = new AttributeKey(getClass(), "context"); public Context getContext(IoSession session) { Context ctx = (Context) session.getAttribute(CONTEXT); if (ctx == null) { ctx = new Context(); session.setAttribute(CONTEXT, ctx); } return ctx; } private class Context { // 狀態變量 } @Override protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { // TODO Auto-generated method stub return false; } }
注意這裏咱們使用了Mina 自帶的AttributeKey 類來定義保存在IoSession 中的對象的鍵值,這樣能夠有效的防止鍵值重複。另外,要注意在所有處理完畢以後,狀態要復位,譬如:聊天室中的一條消息讀取完畢以後,狀態變量要變爲初始值,以便下次處理時從新使用。
第四步,編解碼工廠:
package com.dxz.minademo3; import java.nio.charset.Charset; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFactory; import org.apache.mina.filter.codec.ProtocolDecoder; import org.apache.mina.filter.codec.ProtocolEncoder; public class CmccSipcCodecFactory implements ProtocolCodecFactory { private final CmccSipcEncoder encoder; private final CmccSipcDecoder decoder; public CmccSipcCodecFactory() { this(Charset.defaultCharset()); } public CmccSipcCodecFactory(Charset charSet) { this.encoder = new CmccSipcEncoder(charSet); this.decoder = new CmccSipcDecoder(charSet); } @Override public ProtocolDecoder getDecoder(IoSession session) throws Exception { return decoder; } @Override public ProtocolEncoder getEncoder(IoSession session) throws Exception { return encoder; } }
實際上這個工廠類就是包裝了編碼器、解碼器,經過接口中的getEncoder()、getDecoder()方法向ProtocolCodecFilter 過濾器返回編解碼器實例,以便在過濾器中對數據進行編解碼處理。
第五步,運行示例:
下面咱們修改最一開始的示例中的MyServer、MyClient 的代碼,以下所示:
package com.dxz.minademo3; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.charset.Charset; import org.apache.mina.core.service.IoAcceptor; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.LineDelimiter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.filter.util.ReferenceCountingFilter; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; import com.dxz.minademo2.TCPServerHandler; public class TCPServer3 { public static void main(String[] args) throws IOException { IoAcceptor acceptor = new NioSocketAcceptor(); acceptor.getSessionConfig().setReadBufferSize(2048); acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); // 編寫過濾器 acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new CmccSipcCodecFactory(Charset.forName("UTF-8")))); // 設置handler acceptor.getFilterChain().addLast("myIoFilter", new ReferenceCountingFilter(new MyIoFilter())); // 設置handler acceptor.setHandler(new TCPServerHandler()); // 綁定端口 acceptor.bind(new InetSocketAddress(9124)); System.out.println(acceptor.getSessionConfig()); } }
client
package com.dxz.minademo3; import java.net.InetSocketAddress; import java.nio.charset.Charset; import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.future.IoFutureListener; import org.apache.mina.core.service.IoConnector; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.LineDelimiter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.transport.socket.nio.NioSocketConnector; import com.dxz.minademo2.TCPClientHandler; public class TCPClient3 { public static void main(String[] args) { IoConnector connector = new NioSocketConnector(); connector.setConnectTimeoutMillis(30000); connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new CmccSipcCodecFactory(Charset.forName("UTF-8")))); connector.setHandler(new TCPClientHandler("你好!\r\n 你們好!")); ConnectFuture future = connector.connect(new InetSocketAddress("localhost", 9124)); future.addListener(new IoFutureListener<ConnectFuture>() { public void operationComplete(ConnectFuture future) { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } IoSession session = future.getSession(); System.out.println("++++++++++++++++++++++++++++"); } }); System.out.println("*************"); } }
最後咱們在MyIoHandler 中接收這條短信息:
package com.dxz.minademo3; import org.apache.mina.core.filterchain.IoFilter; import org.apache.mina.core.filterchain.IoFilterChain; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; import org.apache.mina.core.write.WriteRequest; public class MyIoFilter implements IoFilter { @Override public void destroy() throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%�stroy"); } @Override public void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%exceptionCaught"); nextFilter.exceptionCaught(session, cause); } @Override public void filterClose(NextFilter nextFilter, IoSession session) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%filterClose"); nextFilter.filterClose(session); } @Override public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%filterWrite"); nextFilter.filterWrite(session, writeRequest); } @Override public void init() throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%init"); } @Override public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception { System.out.println("-%%%%%%%%%%%%%%%%%%%%%%%%%%%messageReceived"); SmsObject sms = (SmsObject) message; System.out.println("The message received is [" + sms.getMessage() + "]"); System.out.println("-----------------messageReceived"); nextFilter.messageReceived(session, message); } @Override public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%messageSent"); nextFilter.messageSent(session, writeRequest); } @Override public void onPostAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPostAdd"); } @Override public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPostRemove"); } @Override public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPreAdd"); } @Override public void onPreRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPreRemove"); } @Override public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionClosed"); nextFilter.sessionClosed(session); } @Override public void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionCreated"); nextFilter.sessionCreated(session); } @Override public void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionIdle"); nextFilter.sessionIdle(session, status); } @Override public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionOpened"); nextFilter.sessionOpened(session); } }
你會看到Server 端的控制檯輸出以下信息:
%%%%%%%%%%%%%%%%%%%%%%%%%%%init
%%%%%%%%%%%%%%%%%%%%%%%%%%%onPreAdd %%%%%%%%%%%%%%%%%%%%%%%%%%%onPostAdd %%%%%%%%%%%%%%%%%%%%%%%%%%%sessionCreated server session created %%%%%%%%%%%%%%%%%%%%%%%%%%%sessionOpened server session Opened -%%%%%%%%%%%%%%%%%%%%%%%%%%%messageReceived The message received is [你好!Hello World!] -----------------messageReceived The message received is [com.dxz.minademo3.SmsObject@2b884014]
(6-2.)複雜的解碼器:
下面咱們講解一下如何在解碼器中保存狀態變量,也就是真正的實現上面所說的Context。
咱們假設這樣一種狀況,有兩條短信:
M sip:wap.fetion.com.cn SIP-C/2.0
S: 1580101xxxx
R: 1889020xxxx
L: 21
Hello World!
M sip:wap.fetion.com.cn SIP-C/2.0
S: 1580101xxxx
R: 1889020xxxx
L: 21
Hello World!
他們按照上面的顏色標識發送,也就是說紅色部分、藍色部分、綠色部分分別發送(調用三次IoSession.write()方法),那麼若是你還用上面的CmccSipcDecoder,將沒法工做,由於第一次數據流(紅色部分)發送過取時,數據是不完整的,沒法解析出一條短信息,當二次數據流(藍色部分)發送過去時,已經能夠解析出第一條短信息了,可是第二條短信仍是不完整的,須要等待第三次數據流(綠色部分)的發送。注意:因爲模擬數據發送的規模性問題很麻煩,因此這裏採用了這種極端的例子說明問題,雖不具備典型性,但很能說明問題,這就足夠了,因此不要追究這種發送消息是否在真實環境中存在,更不要追究其合理性。
CmccSispcDecoder 類改成以下的寫法:
- public class CmccSipcDecoder extends CumulativeProtocolDecoder {
- private final Charset charset;
- private final AttributeKey CONTEXT = new AttributeKey(getClass(),
- "context");
- public CmccSipcDecoder(Charset charset) {
- this.charset = charset;
- }
- @Override
- protected boolean doDecode(IoSession session, IoBuffer in,
- ProtocolDecoderOutput out) throws Exception {
- Context ctx = getContext(session);
- CharsetDecoder cd = charset.newDecoder();
- int matchCount = ctx.getMatchCount();
- int line = ctx.getLine();
- IoBuffer buffer = ctx.innerBuffer;
- String statusLine = ctx.getStatusLine(),
- sender = ctx.getSender(),
- receiver = ctx.getReceiver(),
- length = ctx.getLength(),
- sms = ctx.getSms();
- while (in.hasRemaining()) {
- byte b = in.get();
- matchCount++;
- buffer.put(b);
- if (line < 4 && b == 10) {
- if (line == 0) {
- buffer.flip();
- statusLine = buffer.getString(matchCount, cd);
- statusLine = statusLine.substring(0,
- statusLine.length() - 1);
- matchCount = 0;
- buffer.clear();
- ctx.setStatusLine(statusLine);
- }
- if (line == 1) {
- buffer.flip();
- sender = buffer.getString(matchCount, cd);
- sender = sender.substring(0, sender.length() - 1);
- matchCount = 0;
- buffer.clear();
- ctx.setSender(sender);
- }
- if (line == 2) {
- buffer.flip();
- receiver = buffer.getString(matchCount, cd);
- receiver = receiver.substring(0, receiver.length() -
- 1);
- matchCount = 0;
- buffer.clear();
- ctx.setReceiver(receiver);
- }
- if (line == 3) {
- buffer.flip();
- length = buffer.getString(matchCount, cd);
- length = length.substring(0, length.length() - 1);
- matchCount = 0;
- buffer.clear();
- ctx.setLength(length);
- }
- line++;
- } else if (line == 4) {
- if (matchCount == Long.parseLong(length.split(": ")[1]))
- {
- buffer.flip();
- sms = buffer.getString(matchCount, cd);
- ctx.setSms(sms);
- // 因爲下面的break,這裏須要調用else外面的兩行代碼
- ctx.setMatchCount(matchCount);
- ctx.setLine(line);
- break;
- }
- }
- ctx.setMatchCount(matchCount);
- ctx.setLine(line);
- }
- if (ctx.getLine() == 4
- && Long.parseLong(ctx.getLength().split(": ")[1]) == ctx
- .getMatchCount()) {
- SmsObject smsObject = new SmsObject();
- smsObject.setSender(sender.split(": ")[1]);
- smsObject.setReceiver(receiver.split(": ")[1]);
- smsObject.setMessage(sms);
- out.write(smsObject);
- ctx.reset();
- return true;
- } else {
- return false;
- }
- }
- private Context getContext(IoSession session) {
- Context context = (Context) session.getAttribute(CONTEXT);
- if (context == null){
- context = new Context();
- session.setAttribute(CONTEXT, context);
- }
- return context;
- }
- private class Context {
- private final IoBuffer innerBuffer;
- private String statusLine = "";
- private String sender = "";
- private String receiver = "";
- private String length = "";
- private String sms = "";
- public Context() {
- innerBuffer = IoBuffer.allocate(100).setAutoExpand(true);
- }
- private int matchCount = 0;
- private int line = 0;
- public int getMatchCount() {
- return matchCount;
- }
- public void setMatchCount(int matchCount) {
- this.matchCount = matchCount;
- }
- public int getLine() {
- return line;
- }
- public void setLine(int line) {
- this.line = line;
- }
- public String getStatusLine() {
- return statusLine;
- }
- public void setStatusLine(String statusLine) {
- this.statusLine = statusLine;
- }
- public String getSender() {
- return sender;
- }
- public void setSender(String sender) {
- this.sender = sender;
- }
- public String getReceiver() {
- return receiver;
- }
- public void setReceiver(String receiver) {
- this.receiver = receiver;
- }
- public String getLength() {
- return length;
- }
- public void setLength(String length) {
- this.length = length;
- }
- public String getSms() {
- return sms;
- }
- public void setSms(String sms) {
- this.sms = sms;
- }
- public void reset() {
- this.innerBuffer.clear();
- this.matchCount = 0;
- this.line = 0;
- this.statusLine = "";
- this.sender = "";
- this.receiver = "";
- this.length = "";
- this.sms = "";
- }
- }
- }
這裏咱們作了以下的幾步操做:
(1.) 全部記錄狀態的變量移到了Context 內部類中,包括記錄讀到短信協議的哪一行的line。每一行讀取了多少個字節的matchCount,還有記錄解析好的狀態行、發送者、接受者、短信內容、累積數據的innerBuffer 等。這樣就能夠在數據不能徹底解碼,等待下一次doDecode()方法的調用時,還能承接上一次調用的數據。
(2.) 在 doDecode()方法中主要的變化是各類狀態變量首先是從Context 中獲取,而後操做以後,將最新的值setXXX()到Context 中保存。
(3.) 這裏注意doDecode()方法最後的判斷,當認爲不夠解碼爲一條短信息時,返回false,也就是在本次數據流解碼中不要再調用doDecode()方法;當認爲已經解碼出一條短信息時,輸出短消息,而後重置全部的狀態變量,返回true,也就是若是本次數據流解碼中還有沒解碼完的數據,繼續調用doDecode()方法。下面咱們對客戶端稍加改造,來模擬上面的紅、藍、綠三次發送聊天短信息的狀況:
MyClient:
- ConnectFuture future = connector.connect(new InetSocketAddress(
- HOSTNAME, PORT));
- future.awaitUninterruptibly();
- session = future.getSession();
- for (int i = 0; i < 3; i++) {
- SmsObject sms = new SmsObject();
- session.write(sms);
- System.out.println("****************" + i);
- }
這裏咱們爲了方便演示,不在IoHandler 中發送消息,而是直接在MyClient 中發送,你要注意的是三次發送都要使用同一個IoSession,不然就不是從同一個通道發送過去的了。
CmccSipcEncoder:
- public void encode(IoSession session, Object message,
- ProtocolEncoderOutput out) throws Exception {
- SmsObject sms = (SmsObject) message;
- CharsetEncoder ce = charset.newEncoder();
- String statusLine = "M sip:wap.fetion.com.cn SIP-C/2.0";
- String sender = "15801012253";
- String receiver = "15866332698";
- String smsContent = "你好!Hello World!";
- IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true);
- buffer.putString(statusLine + '/n', ce);
- buffer.putString("S: " + sender + '/n', ce);
- buffer.putString("R: " + receiver + '/n', ce);
- buffer.flip();
- out.write(buffer);
- IoBuffer buffer2 = IoBuffer.allocate(100).setAutoExpand(true);
- buffer2.putString("L: " + (smsContent.getBytes(charset).length)
- + "/n",ce);
- buffer2.putString(smsContent, ce);
- buffer2.putString(statusLine + '/n', ce);
- buffer2.flip();
- out.write(buffer2);
- IoBuffer buffer3 = IoBuffer.allocate(100).setAutoExpand(true);
- buffer3.putString("S: " + sender + '/n', ce);
- buffer3.putString("R: " + receiver + '/n', ce);
- buffer3.putString("L: " + (smsContent.getBytes(charset).length)
- + "/n",ce);
- buffer3.putString(smsContent, ce);
- buffer3.putString(statusLine + '/n', ce);
- buffer3.flip();
- out.write(buffer3);
- }
上面的這段代碼要配合MyClient來操做,你須要作的是在MyClient中的紅色輸出語句處設置斷點,而後第一調用時CmccSipcEncoder中註釋掉藍、綠色的代碼,也就是發送兩條短信息的第一部分(紅色的代碼),依次類推,也就是MyClient的中的三次斷點中,分別執行CmccSipcEncoder中的紅、藍、綠三段代碼,也就是模擬兩條短信的三段發送。你會看到Server端的運行結果是:當MyClient第一次到達斷點時,沒有短信息被讀取到,當MyClient第二次到達斷點時,第一條短信息輸出,當MyClient第三次到達斷點時,第二條短信息輸出。
Mina中自帶的解碼器:
解碼器 說明
CumulativeProtocolDecoder 累積性解碼器,上面咱們重點說明了這個解碼器的用法。
SynchronizedProtocolDecoder 這個解碼器用於將任何一個解碼器包裝爲一個線程安全的解碼器,用於解決上面說的每次執行decode()方法時可能線程不是上一次的線程的問題,但這樣會在高併發時,大大下降系統的性能。
TextLineDecoder 按照文本的換行符( Windows:/r/n 、Linux:/n、Mac:/r)解碼數據。
PrefixedStringDecoder 這個類繼承自CumulativeProtocolDecoder類,用於讀取數據最前端的一、二、4 個字節表示後面的數據長度的數據。譬如:一個段數據的前兩個字節表示後面的真實數據的長度,那麼你就能夠用這個方法進行解碼。
(6-3.)多路分離的解碼器:
假設一段數據發送過來以後,須要根據某種條件決定使用哪一個解碼器,而不是像上面的例子,固定使用一個解碼器,那麼該如何作呢?幸虧Mina 提供了org.apache.mina.filter.codec.demux 包來完成這種多路分離(Demultiplexes)的解碼工做,也就是同時註冊多個解碼器,而後運行時依據傳入的數據決定到底使用哪一個解碼器來工做。所謂多路分離就是依據條件分發到指定的解碼器,譬如:上面的短信協議進行擴展,能夠依據狀態行來判斷使用1.0 版本的短信協議解碼器仍是2.0版本的短信協議解碼器。
下面咱們使用一個簡單的例子,說明這個多路分離的解碼器是如何使用的,需求以下所示:
(1.) 客戶端傳入兩個int 類型的數字,還有一個char 類型的符號。
(2.) 若是符號是+,服務端就是用1 號解碼器,對兩個數字相加,而後把結果返回給客戶端。
(3.) 若是符號是-,服務端就使用2 號解碼器,將兩個數字變爲相反數,而後相加,把結果返回給客戶端。
Demux 開發編解碼器主要有以下幾個步驟:
A. 定義Client 端、Server 端發送、接收的數據對象。
B. 使用Demux 編寫編碼器是實現MessageEncoder<T>接口,T 是你要編碼的數據對象,這個MessageEncoder 會在DemuxingProtocolEncoder 中調用。
C. 使用Demux 編寫編碼器是實現MessageDecoder 接口,這個MessageDecoder 會在DemuxingProtocolDecoder 中調用。
D. 在 DemuxingProtocolCodecFactory 中調用addMessageEncoder()、addMessageDecoder()方法組裝編解碼器。
MessageEncoder的接口以下所示:
public interface MessageEncoder<T> { void encode(IoSession session, T message, ProtocolEncoderOutput out) throws Exception; }
你注意到消息編碼器接口與在ProtocolEncoder 中沒什麼不一樣,區別就是Object message被泛型具體化了類型,你不須要手動的類型轉換了。
MessageDecoder的接口以下所示:
public interface MessageDecoder { static MessageDecoderResult OK = MessageDecoderResult.OK; static MessageDecoderResult NEED_DATA = MessageDecoderResult.NEED_DATA; static MessageDecoderResult NOT_OK = MessageDecoderResult.NOT_OK; MessageDecoderResult decodable(IoSession session, IoBuffer in); MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception; void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception; }
(1.)decodable()方法有三個返回值,分別表示以下的含義:
A. MessageDecoderResult.NOT_OK:表示這個解碼器不適合解碼數據,而後檢查其它解碼器,若是都不知足會拋異常;
B. MessageDecoderResult.NEED_DATA:表示當前的讀入的數據不夠判斷是否可以使用這個解碼器解碼,而後再次調用decodable()方法檢查其它解碼器,若是都是NEED_DATA,則等待下次輸入;
C. MessageDecoderResult.OK: 表示這個解碼器能夠解碼讀入的數據, 而後則調用MessageDecoder 的decode()方法。這裏注意decodable()方法對參數IoBuffer in 的任何操做在方法結束以後,都會復原,也就是你沒必要擔憂在調用decode()方法時,position 已經不在緩衝區的起始位置。這個方法至關因而預讀取,用於判斷是不是可用的解碼器。
(2.)decode()方法有三個返回值,分別表示以下的含義:
A. MessageDecoderResult.NOT_OK:表示解碼失敗,會拋異常;
B. MessageDecoderResult.NEED_DATA:表示數據不夠,須要讀到新的數據後,再次調用decode()方法。
C. MessageDecoderResult.OK:表示解碼成功。
代碼演示:
(1.)客戶端發送的數據對象:
(2.)服務端發送的返回結果對象:
package com.dxz.minademo4; public class ResultMessage { private int result = 0; public int getResult() { return result; } public void setResult(int result) { this.result = result; } }
(3.)客戶端使用的SendMessage的編碼器:
package com.dxz.minademo4; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolEncoderOutput; import org.apache.mina.filter.codec.demux.MessageEncoder; public class SendMessageEncoder implements MessageEncoder<SendMessage> { @Override public void encode(IoSession session, SendMessage message, ProtocolEncoderOutput out) throws Exception { IoBuffer buffer = IoBuffer.allocate(10); buffer.putChar(message.getSymbol()); buffer.putInt(message.getI()); buffer.putInt(message.getJ()); buffer.flip(); out.write(buffer); } }
這裏咱們的SendMessage、ResultMessage 中的字段都是用長度固定的基本數據類型,這樣IoBuffer 就不須要自動擴展了,提升性能。按照一個char、兩個int 計算,這裏的IoBuffer只須要10 個字節的長度就能夠了。
(4.)服務端使用的SendMessage的1號解碼器:
package com.dxz.minademo4; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolDecoderOutput; import org.apache.mina.filter.codec.demux.MessageDecoder; import org.apache.mina.filter.codec.demux.MessageDecoderResult; public class SendMessageDecoderPositive implements MessageDecoder { @Override public MessageDecoderResult decodable(IoSession session, IoBuffer in) { if (in.remaining() < 2) return MessageDecoderResult.NEED_DATA; else { char symbol = in.getChar(); if (symbol == '+') { return MessageDecoderResult.OK; } else { return MessageDecoderResult.NOT_OK; } } } @Override public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { SendMessage sm = new SendMessage(); sm.setSymbol(in.getChar()); sm.setI(in.getInt()); sm.setJ(in.getInt()); out.write(sm); return MessageDecoderResult.OK; } @Override public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception { // undo } }
由於客戶端發送的SendMessage 的前兩個字節(char)就是符號位,因此咱們在decodable()方法中對此條件進行了判斷,以後讀到兩個字節,而且這兩個字節表示的字符是+時,才認爲這個解碼器可用。
(5.)服務端使用的SendMessage的2號解碼器:
package com.dxz.minademo4; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolDecoderOutput; import org.apache.mina.filter.codec.demux.MessageDecoder; import org.apache.mina.filter.codec.demux.MessageDecoderResult; public class SendMessageDecoderNegative implements MessageDecoder { @Override public MessageDecoderResult decodable(IoSession session, IoBuffer in) { if (in.remaining() < 2) return MessageDecoderResult.NEED_DATA; else { char symbol = in.getChar(); if (symbol == '-') { return MessageDecoderResult.OK; } else { return MessageDecoderResult.NOT_OK; } } } @Override public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { SendMessage sm = new SendMessage(); sm.setSymbol(in.getChar()); sm.setI(-in.getInt()); sm.setJ(-in.getInt()); out.write(sm); return MessageDecoderResult.OK; } @Override public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception { // undo } }
(6.)服務端使用的ResultMessage的編碼器:
package com.dxz.minademo4; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolEncoderOutput; import org.apache.mina.filter.codec.demux.MessageEncoder; public class ResultMessageEncoder implements MessageEncoder<ResultMessage> { @Override public void encode(IoSession session, ResultMessage message, ProtocolEncoderOutput out) throws Exception { IoBuffer buffer = IoBuffer.allocate(4); buffer.putInt(message.getResult()); buffer.flip(); out.write(buffer); } }
(7.)客戶端使用的ResultMessage的解碼器:
package com.dxz.minademo4; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolDecoderOutput; import org.apache.mina.filter.codec.demux.MessageDecoder; import org.apache.mina.filter.codec.demux.MessageDecoderResult; public class ResultMessageDecoder implements MessageDecoder { @Override public MessageDecoderResult decodable(IoSession session, IoBuffer in) { if (in.remaining() < 4) return MessageDecoderResult.NEED_DATA; else if (in.remaining() == 4) return MessageDecoderResult.OK; else return MessageDecoderResult.NOT_OK; } @Override public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { ResultMessage rm = new ResultMessage(); rm.setResult(in.getInt()); out.write(rm); return MessageDecoderResult.OK; } @Override public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception { // undo } }
(8.)組裝這些編解碼器的工廠:
package com.dxz.minademo4; import org.apache.mina.filter.codec.demux.DemuxingProtocolCodecFactory; public class MathProtocolCodecFactory extends DemuxingProtocolCodecFactory { public MathProtocolCodecFactory(boolean server) { if (server) { super.addMessageEncoder(ResultMessage.class, ResultMessageEncoder.class); super.addMessageDecoder(SendMessageDecoderPositive.class); super.addMessageDecoder(SendMessageDecoderNegative.class); } else { super.addMessageEncoder(SendMessage.class, SendMessageEncoder.class); super.addMessageDecoder(ResultMessageDecoder.class); } } }
這個工廠類咱們使用了構造方法的一個布爾類型的參數,以便其能夠在Server 端、Client端同時使用。咱們以Server 端爲例,你能夠看到調用兩次addMessageDecoder()方法添加了1 號、2 號解碼器,其實DemuxingProtocolDecoder 內部在維護了一個MessageDecoder數組,用於保存添加的全部的消息解碼器,每次decode()的時候就調用每一個MessageDecoder的decodable()方法逐個檢查,只要發現一個MessageDecoder 不是對應的解碼器,就從數組中移除,直到找到合適的MessageDecoder,若是最後發現數組爲空,就表示沒找到對應的MessageDecoder,最後拋出異常。
(9.)Server端:
package com.dxz.minademo4; import java.net.InetSocketAddress; import org.apache.mina.core.service.IoAcceptor; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; public class Server { public static void main(String[] args) throws Exception { IoAcceptor acceptor = new NioSocketAcceptor(); LoggingFilter lf = new LoggingFilter(); acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 5); acceptor.getFilterChain().addLast("logger", lf); acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MathProtocolCodecFactory(true))); acceptor.setHandler(new ServerHandler()); acceptor.bind(new InetSocketAddress(9123)); } }
(10.)Server端使用的IoHandler:
package com.dxz.minademo4; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ServerHandler extends IoHandlerAdapter { private final static Logger log = LoggerFactory.getLogger(ServerHandler.class); @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { session.close(true); } @Override public void messageReceived(IoSession session, Object message) throws Exception { SendMessage sm = (SendMessage) message; log.info("The message received is [ " + sm.getI() + " " + sm.getSymbol() + " " + sm.getJ() + " ]"); ResultMessage rm = new ResultMessage(); rm.setResult(sm.getI() + sm.getJ()); session.write(rm); } }
(11.)Client端:
package com.dxz.minademo4; import java.net.InetSocketAddress; import org.apache.mina.core.service.IoConnector; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.nio.NioSocketConnector; public class Client { public static void main(String[] args) throws Throwable { IoConnector connector = new NioSocketConnector(); connector.setConnectTimeoutMillis(30000); connector.getFilterChain().addLast("logger", new LoggingFilter()); connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MathProtocolCodecFactory(false))); connector.setHandler(new ClientHandler()); connector.connect(new InetSocketAddress("localhost", 9123)); } }
(12.)Client端的IoHandler:
package com.dxz.minademo4; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IoSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ClientHandler extends IoHandlerAdapter { private final static Logger LOGGER = LoggerFactory.getLogger(ClientHandler.class); @Override public void sessionOpened(IoSession session) throws Exception { SendMessage sm = new SendMessage(); sm.setI(100); sm.setJ(99); sm.setSymbol('+'); session.write(sm); } @Override public void messageReceived(IoSession session, Object message) { ResultMessage rs = (ResultMessage) message; LOGGER.info(String.valueOf(rs.getResult())); } }
你嘗試改變ClientHandler中的Synbol中的紅色代碼中的正負號,會看到服務端使用了兩個不一樣的解碼器對其進行處理。
The message received is [ 100 + 99 ]
The message received is [ -100 - -99 ]
7.線程模型配置:
Mina 中的不少執行環節都使用了多線程機制,用於提升性能。Mina 中默認在三個地方使用了線程:
(1.) IoAcceptor:
這個地方用於接受客戶端的鏈接創建,每監聽一個端口(每調用一次bind()方法),都啓用一個線程,這個數字咱們不能改變。這個線程監聽某個端口是否有請求到來,一旦發現,則建立一個IoSession 對象。由於這個動做很快,因此有一個線程就夠了。
(2.) IoConnector:
這個地方用於與服務端創建鏈接,每鏈接一個服務端(每調用一次connect()方法),就啓用一個線程,咱們不能改變。一樣的,這個線程監聽是否有鏈接被創建,一旦發現,則建立一個IoSession 對象。由於這個動做很快,因此有一個線程就夠了。
(3.) IoProcessor:
這個地方用於執行真正的IO 操做,默認啓用的線程個數是CPU 的核數+1,譬如:單CPU 雙核的電腦,默認的IoProcessor 線程會建立3 個。這也就是說一個IoAcceptor 或者IoConnector 默認會關聯一個IoProcessor 池,這個池中有3 個IoProcessor。由於IO 操做耗費資源,因此這裏使用IoProcessor 池來完成數據的讀寫操做,有助於提升性能。這也就是前面說的IoAccetor、IoConnector 使用一個Selector,而IoProcessor 使用本身單獨的Selector 的緣由。那麼爲何IoProcessor 池中的IoProcessor 數量只比CPU 的核數大1 呢?由於IO 讀寫操做是耗費CPU 的操做,而每一核CPU 同時只能運行一個線程,所以IoProcessor 池中的IoProcessor 的數量並非越多越好。
這個IoProcessor 的數量能夠調整,以下所示:
IoAcceptor acceptor=new NioSocketAcceptor(5);
IoConnector connector=new NioSocketConnector(5);
這樣就會將IoProcessor 池中的數量變爲5 個,也就是說能夠同時處理5 個讀寫操做。還記得前面說過Mina 的解碼器要使用IoSession 保存狀態變量,而不是Decoder 自己,這是由於Mina 不保證每次執行doDecode()方法的都是同一個IoProcessor 這句話嗎?其實這個問題的根本緣由是IoProcessor 是一個池,每次IoSession 進入空閒狀態時(無讀些數據發生),IoProcessor 都會被回收到池中,以便其餘的IoSession 使用,因此當IoSession從空閒狀態再次進入繁忙狀態時,IoProcessor 會再次分配給其一個IoProcessor 實例,而此時已經不能保證仍是上一次繁忙狀態時的那個IoProcessor 了。你還會發現IoAcceptor 、IoConnector 還有一個構造方法, 你能夠指定一個java.util.concurrent.Executor 類做爲線程池對象,那麼這個線程池對象是作什麼用的呢?其實就是用於建立(1.)、(2.)中的用於監聽是否有TCP 鏈接創建的那個線程,默認狀況下,使用Executors.newCachedThreadPool()方法建立Executor 實例,也就是一個無界的線程池(具體內容請參看JAVA 的併發庫)。你們不要試圖改變這個Executor 的實例,也就是使用內置的便可,不然可能會形成一些莫名其妙的問題,譬如:性能在某個訪問量級別時,忽然降低。由於無界線程池是有多少個Socket 創建,就分配多少個線程,若是你改成Executors 的其餘建立線程池的方法,建立了一個有界線程池,那麼一些請求將沒法獲得及時響應,從而出現一些問題。
下面咱們完整的綜述一下Mina 的工做流程:
(1.) 當 IoService 實例建立的時候,同時一個關聯在IoService上的IoProcessor 池、線程池也被建立;
(2.) 當 IoService 創建套接字(IoAcceptor 的bind()或者是IoConnector 的connect()方法被調用)時,IoService 從線程池中取出一個線程,監聽套接字端口;
(3.) 當 IoService 監聽到套接字上有鏈接請求時,創建IoSession 對象,從IoProcessor池中取出一個IoProcessor 實例執行這個會話通道上的過濾器、IoHandler;
(4.) 當這條IoSession 通道進入空閒狀態或者關閉時,IoProcessor 被回收。上面說的是Mina 默認的線程工做方式,那麼咱們這裏要講的是如何配置IoProcessor 的多線程工做方式。由於一個IoProcessor 負責執行一個會話上的全部過濾器、IoHandler,也就是對於IO 讀寫操做來講,是單線程工做方式(就是按照順序逐個執行)。假如你想讓某個事件方法(譬如:sessionIdle()、sessionOpened()等)在單獨的線程中運行(也就是非IoProcessor 所在的線程),那麼這裏就須要用到一個ExecutorFilter 的過濾器。你能夠看到IoProcessor 的構造方法中有一個參數是java.util.concurrent.Executor,也就是可讓IoProcessor 調用的過濾器、IoHandler 中的某些事件方法在線程池中分配的線程上獨立運行,而不是運行在IoProcessor 所在的線程。
例:
acceptor.getFilterChain().addLast("exceutor", new ExecutorFilter());
咱們看到是用這個功能,簡單的一行代碼就能夠了。那麼ExecutorFilter 還有許多重載的構造方法,這些重載的有參構造方法,參數主要用於指定以下信息:
(1.) 指定線程池的屬性信息,譬如:核心大小、最大大小、等待隊列的性質等。你特別要關注的是ExecutorFilter 內部默認使用的是OrderedThreadPoolExecutor 做爲線程池的實現,從名字上能夠看出是保證各個事件在多線程執行中的順序(譬如:各個事件方法的執行是排他的,也就是不可能出現兩個事件方法被同時執行;messageReceived()老是在sessionClosed() 方法以前執行), 這是由於多線程的執行是異步的, 若是沒有OrderedThreadPoolExecutor 來保證IoHandler 中的方法的調用順序,可能會出現嚴重的問題。可是若是你的代碼確實沒有依賴於IoHandler 中的事件方法的執行順序,那麼你可使用UnorderedThreadPoolExecutor 做爲線程池的實現。所以,你也最好不要改變默認的Executor 實現,不然,事件的執行順序就會混亂,譬如:messageReceived()、messageSent()方法被同時執行。
(2.) 哪些事件方法被關注,也就哪些事件方法用這個線程池執行。線程池能夠異步執行的事件類型是位於IoEventType 中的九個枚舉值中除了SESSION_CREATED 以外的其他八個,這說明Session 創建的事件只能與IoProcessor 在同一個線程上執行。
public enum IoEventType { SESSION_CREATED, SESSION_OPENED, SESSION_CLOSED, MESSAGE_RECEIVED, MESSAGE_SENT, SESSION_IDLE, EXCEPTION_CAUGHT, WRITE, CLOSE, }
默認狀況下,沒有配置關注的事件類型,有以下六個事件方法會被自動使用線程池異步執行:
IoEventType.EXCEPTION_CAUGHT,
IoEventType.MESSAGE_RECEIVED,
IoEventType.MESSAGE_SENT,
IoEventType.SESSION_CLOSED,
IoEventType.SESSION_IDLE,
IoEventType.SESSION_OPENED
其實ExecutorFilter 的工做機制很簡單,就是在調用下一個過濾器的事件方法時,把其交給Executor 的execute(Runnable runnable)方法來執行,其實你本身在IoHandler 或者某個過濾器的事件方法中開啓一個線程,也能夠完成一樣的功能,只不過這樣作,你就失去了程序的可配置性,線程調用的代碼也會徹底耦合在代碼中。但要注意的是絕對不能開啓線程讓其執行sessionCreated()方法。若是你真的打算使用這個ExecutorFilter,那麼最好想清楚它該放在過濾器鏈的哪一個位置,針對哪些事件作異步處理機制。通常ExecutorFilter 都是要放在ProtocolCodecFilter 過濾器的後面,也就是不要讓編解碼運行在獨立的線程上,而是要運行在IoProcessor 所在的線程,由於編解碼處理的數據都是由IoProcessor 讀取和發送的,不必開啓新的線程,不然性能反而會降低。通常使用ExecutorFilter 的典型場景是將業務邏輯(譬如:耗時的數據庫操做)放在單獨的線程中運行,也就是說與IO 處理無關的操做能夠考慮使用ExecutorFilter 來異步執行。