接着上篇《IoBuffer的封裝》. java
其實在mina的源碼中,IoService能夠總結成五部分service責任、Processor線程處理、handler處理器、接收器和鏈接器,分別對應着IoService、IoProcessor、IoHandler、IoAcceptor和IoConnector。在代碼的中有以下包跟IoService關係密切: nginx
org.apache.mina.core.service org.apache.mina.transport.* org.apache.mina.core.polling 這個包主要是實現了輪詢策略
其中core.service包中主要定義了上面五個部分的接口,以及IoHandler和IoProcessor的實現(Handler是一種常見的設計模式,具體的業務其實並無多少,只是在結構上將層次劃分的更清楚。Processor是mina內部定義的接口,通常不對外使用,用mina官方的話,主要performs actual I/O operations for IoSession. 這一部分我想放在IoSession中來說)。而在transport包中實現了具體的鏈接方式,固然,這部分也是咱們今天閱讀的重點。我想要關注的也是mina底層如何用NIO實現各類鏈接。 apache
因此這一節的重點是IoAcceptor和IoConnector,Handler和IoService只是稍稍帶過,寫點兒用法和構成。先看mina對IoService的介紹:IoService provides basic I/O Service and manages I/O Sessions within MINA. 設計模式
上面的圖簡單介紹了IoService的職責,以及其具體實現類AbstractIoService中的職責。在比較大的框架中,都是採用了大量的抽象類之間繼承,採用層級實現細節這樣的方式來組織代碼。因此在mina中看到Abstract開頭的類,並不只僅只是一個抽象,其實裏面也包含不少的實現了。 安全
IoService用來管理各類IO服務,在mina中,這些服務能夠包括session、filter、handler等。在AbstractIoService中,也是經過線程池來裝載這些服務的: 服務器
private final Executor executor; private final boolean createdExecutor; protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) { //省略。。。 if (executor == null) { this.executor = Executors.newCachedThreadPool(); createdExecutor = true; } else { this.executor = executor; createdExecutor = false; } threadName = getClass().getSimpleName() + '-' + id.incrementAndGet(); }
而後咱們關注一下service中的dispose方法,學習一下線程安全在這裏的用法: session
/** * A lock object which must be acquired when related resources are * destroyed. */ protected final Object disposalLock = new Object(); private volatile boolean disposing; private volatile boolean disposed; private final boolean createdExecutor; public final void dispose(boolean awaitTermination) { if (disposed) { return; } synchronized (disposalLock) { if (!disposing) { disposing = true; try { dispose0(); } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } } } if (createdExecutor) { ExecutorService e = (ExecutorService) executor; e.shutdownNow(); if (awaitTermination) { //Thread.currentThread().setName(); try { LOGGER.debug("awaitTermination on {} called by thread=[{}]", this, Thread.currentThread().getName()); e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); LOGGER.debug("awaitTermination on {} finished", this); } catch (InterruptedException e1) { LOGGER.warn("awaitTermination on [{}] was interrupted", this); // Restore the interrupted status Thread.currentThread().interrupt(); } } } disposed = true; }
爲了多線程之間變量內存的可見性,防止搶佔資源時候出現意想不到的問題,這裏用了volatile關鍵字修飾布爾型變量這樣的經典用法,同時使用內置鎖機制控制線程的訪問。與IoService比較相關的還有個TransportMetadata,這個類主要記錄了IoService的相關信息,在使用中,也不是很常見,因此略過這部分了。更多的線程運用在IoProcessor中會體現,這部分放到後面寫,今天的主要目的仍是鏈接IoAcceptor和IoConnector。 多線程
從如今開始就要複雜了,咱們先從服務器的接收端開始寫起,也就是IoAccpetor,先上一張宏觀的圖,來理清思路,圖來自mina官網: app
簡單介紹一下,鏈接的實現有四種: 框架
We have many of those implementing classes
咱們按照圖上用箭頭標出的兩條路來分析咱們最經常使用的NioSocketAcceptor,回顧一下調用這個類的過程:
// Create a TCP acceptor IoAcceptor acceptor = new NioSocketAcceptor(); // Associate the acceptor to an IoHandler instance (your application) acceptor.setHandler(this); // Bind : this will start the server... acceptor.bind(new InetSocketAddress(PORT)); System.out.println("Server started...");
從左邊的路走起:
接口IoAcceptor直接繼承了IoService接口,並定義了本身特有的操做。其操做的具體實現由AbstractIoAcceptor完成(注意是上圖左邊的類來實現的)。咱們繼續從左邊往下看,與IoAcceptor直接關聯的是SocketAcceptor接口,它們(IoAcceptor和SocketAcceptor)之間也是接口的繼承關係,因此根據前面的經驗,咱們能夠猜想,SocketAcceptor必定又新定義了一些屬於本身須要去實現的操做,這樣作確定是爲了與另外一種實現DatagaramAcceptor來區別,事實也確實如此,看下圖:
這裏須要注意的是上圖被框出來的部分,他們的返回類型均被縮小了(我記不得是向下轉型仍是向上轉型,應該是向下吧,如今返回的都是子類),這樣實現的好處是絕對的專注,避免了轉型上的錯誤。
在看NioSocketAcceptor以前,咱們仍是要先看右邊這條路:
public abstract class AbstractIoAcceptor extends AbstractIoService implements IoAcceptor
回顧一下,AbstractIoService實現了對session的管理,IoAcceptor定義了一些創建鏈接時用到的一系列方法,這樣一來,AbstractIoAcceptor一來有了對session使用的功能,二來須要實現創建鏈接裏須要用到的那些方法,理清楚了這些,咱們能夠看AbstractIoAcceptor的具體實現:
private final List<SocketAddress> defaultLocalAddresses = new ArrayList<SocketAddress>(); private final List<SocketAddress> unmodifiableDefaultLocalAddresses = Collections .unmodifiableList(defaultLocalAddresses); private final Set<SocketAddress> boundAddresses = new HashSet<SocketAddress>(); private boolean disconnectOnUnbind = true; /** * The lock object which is acquired while bind or unbind operation is performed. * Acquire this lock in your property setters which shouldn't be changed while * the service is bound. */ protected final Object bindLock = new Object();
這裏有三點要說:
l 這裏的Address不是用了list就是用了set,注意這些都是用來保存LocalAddress的,mina在設計的時候考慮的很全面,服務器可能會有多個網卡的啊。
l 解釋下unmodifiableList,JDK自帶方法:Returns an unmodifiable view of the specified list. This method allows modules to provide users with "read-only" access to internal lists. Query operations on the returned list "read through" to the specified list, and attempts to modify the returned list, whether direct or via its iterator, result in an UnsupportedOperationException. The returned list will be serializable if the specified list is serializable. Similarly, the returned list will implement RandomAccess if the specified list does.
l 時刻注意線程安全的問題。
AbstractIoAcceptor主要是對localAddress的操做,裏面涉及到集合類的使用和內置鎖的使用,其餘沒有什麼特別的,你們能夠看看源碼,容易理解。主要能夠看看裏面的bind方法,這個裏面涉及了兩層鎖。
AbstractPollingIoAcceptor在實現鏈接中相當重要的一個類,他是socket輪詢策略的主要實現,有那麼幾點要關注:
l polling strategy : The underlying sockets will be checked in an active loop and woke up when an socket needed to be processed.
l An Executor will be used for running client accepting and an AbstractPollingIoProcessor will be used for processing client I/O operations like reading, writing and closing.(將鏈接和業務分開,AbstractPollingIoProcessor部分會在session部分寫)。
l All the low level methods for binding, accepting, closing need to be provided by the subclassing implementation.
這個類光看用到的工具就知道他不簡單了,以前咱們見過的處理線程安全問題都只是用了內置鎖,這裏終於用到了concurrent包裏的東西了。這個類裏沒有出現NIO中selector這樣的東西,而是作了一種策略,這種策略是在線程處理上的,採用隊列和信號量協同處理socket到來時候鏈接的策略。推薦你們仔細看看這個類,裏面的註釋很詳細,我列幾個成員變量,你看看有興趣沒:
/** A lock used to protect the selector to be waked up before it's created */ private final Semaphore lock = new Semaphore(1); private final IoProcessor<S> processor; private final boolean createdProcessor; private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>(); private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>(); private final Map<SocketAddress, H> boundHandles = Collections.synchronizedMap(new HashMap<SocketAddress, H>()); private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture(); /** A flag set when the acceptor has been created and initialized */ private volatile boolean selectable; /** The thread responsible of accepting incoming requests */ private AtomicReference<Acceptor> acceptorRef = new AtomicReference<Acceptor>(); protected boolean reuseAddress = false;
在這個類裏還有一個Acceptor的內部類,實現runnable接口,主要用做接收客戶端的請求。這個類也有可看性。這裏面的東西不是很好寫,須要你們本身去細細品味。
看最後一個類,兩邊的焦點,NioSocketAcceptor。看以前在AbstractPollingIoAcceptor裏有句話:All the low level methods for binding, accepting, closing need to be provided by the subclassing implementation.這裏總該有NIO的一些東西了:
public final class NioSocketAcceptor extends AbstractPollingIoAcceptor<NioSession, ServerSocketChannel> implements SocketAcceptor { private volatile Selector selector;
我想看到的東西終於來了,selector,先不說,接着往下看:
@Override protected void init() throws Exception { selector = Selector.open(); } @Override protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception { SelectionKey key = handle.keyFor(selector); if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) { return null; } // accept the connection from the client SocketChannel ch = handle.accept(); if (ch == null) { return null; } return new NioSocketSession(this, processor, ch); } @Override protected ServerSocketChannel open(SocketAddress localAddress) throws Exception { // Creates the listening ServerSocket ServerSocketChannel channel = ServerSocketChannel.open(); boolean success = false; try { // This is a non blocking socket channel channel.configureBlocking(false); // Configure the server socket, ServerSocket socket = channel.socket(); // Set the reuseAddress flag accordingly with the setting socket.setReuseAddress(isReuseAddress()); // and bind. socket.bind(localAddress, getBacklog()); // Register the channel within the selector for ACCEPT event channel.register(selector, SelectionKey.OP_ACCEPT); success = true; } finally { if (!success) { close(channel); } } return channel; }
都是最基本的java NIO吧,是否是很熟悉,是否是有種相見恨晚的感受,是否是很簡單。固然簡單是相對的,由於之間作了那麼多鋪墊,因此程序設計真的是一種藝術,看你怎麼去設計。
只是你想過沒有,這是一個final的類,不能繼承,那咱們在調用的時候歷來沒有寫acceptor.open,這時候這個方法何時執行呢,在這個類裏面也沒有展現出來。這個疑問先留着,後面會說,這固然也是一種機制。
--------------------------------------------------------------
一寫發現有那麼多,原本想把IoConnector也寫的,但是一個Acceptor就出乎意料的多,IoConnector差很少結構,放到下次去寫了。下次會和上篇代碼同樣,給出一個簡單的閹割版實現,每次都作一個簡單的實現,源碼都讀完了,是否是就把mina也都寫了一遍,希望能實現。我其實想在最後寫個基於P2P的通訊框架,有點兒構思,可是沒實力寫,主要是結構組織的很差,因此還要多讀讀源碼。
因此我發現照着源碼作一個刪減版代碼出來的也是很好的一種讀源碼的方法,今天大神@ppdep開始讀nginx的源碼了,祝他好運。。。還有就是kafka看的我噁心,竟然連個依賴庫都沒有,構建工具也沒有,還得本身找。。