架構設計:系統間通訊(4)——IO通訊模型和JAVA實踐 中篇

四、多路複用IO模型

在「上篇」文章中,咱們已經提到了使用多線程解決高併發場景的問題所在,這篇文章咱們開始解決。java

4-一、現實場景

咱們試想一下這樣的現實場景:linux

一個餐廳同時有100位客人到店,固然到店第一件要作的事情就是點菜。可是問題來了,餐廳老闆爲了節約人力成本目前只有一位大堂服務員拿着惟一的一本菜單等待客人進行服務。git

那麼最笨(可是最簡單)的方法是(方法A),不管有多少客人等待點餐,服務員都把僅有的一份菜單遞給其中一位客人,而後站在客人身旁等待這個客人完成點菜過程。在記錄客人點菜內容後,把點菜記錄交給後堂廚師。而後是第二位客人。。。。而後是第三位客人。很明顯,只有腦門被門夾過的老闆,纔會這樣設置服務流程。由於隨後的80位客人,再等待超時後就會離店(還會給差評)。github

因而還有一種方法(方法B),老闆立刻新僱傭99名服務員,同時印製99本新的菜單。每一名服務員手持一本菜單負責一位客人(關鍵不僅在服務員,還在於菜單。由於沒有菜單客人也沒法點菜)。在客人點完菜後,記錄點菜內容交給後堂廚師(固然爲了更高效,後堂廚師最好也有100名)。這樣每一位客人享受的就是VIP服務咯,固然客人不會走,可是人力成本但是一個大頭哦(虧死你)。apache

另一種方法(方法C),就是改進點菜的方式,當客人到店後,本身申請一本菜單。想好本身要點的菜後,就呼叫服務員。服務員站在本身身邊後記錄客人的菜單內容。將菜單遞給廚師的過程也要進行改進,並非每一份菜單記錄好之後,都要交給後堂廚師。服務員能夠記錄好多份菜單後,同時交給廚師就好了。那麼這種方式,對於老闆來講人力成本是最低的;對於客人來講,雖然再也不享受VIP服務而且要進行必定的等待,可是這些都是可接受的;對於服務員來講,基本上她的時間都沒有浪費,基本上被老闆壓幹了最後一滴油水。數組

若是您是老闆,您會採用哪一種方式呢?緩存

  • 到店狀況:併發量。到店狀況不理想時,一個服務員一本菜單,固然是足夠了。因此不一樣的老闆在不一樣的場合下,將會靈活選擇服務員和菜單的配置。
  • 客人:客戶端請求
  • 點餐內容:客戶端發送的實際數據
  • 老闆:操做系統
  • 人力成本:系統資源
  • 菜單:文件狀態描述符。操做系統對於一個進程可以同時持有的文件狀態描述符的個數是有限的,在linux系統中$ulimit -n查看這個限制值,固然也是能夠(而且應該)進行內核參數調整的。
  • 服務員:操做系統內核用於IO操做的線程(內核線程)
  • 廚師:應用程序線程(固然廚房就是應用程序進程咯)
  • 餐單傳遞方式:包括了阻塞式和非阻塞式兩種
  • 方法A:阻塞式/非阻塞式 同步IO
  • 方法B:使用線程進行處理的阻塞式/非阻塞式同步IO
  • 方法C:阻塞式/非阻塞式 多路複用IO

4-二、典型的多路複用IO實現

目前流行的多路複用IO實現主要包括四種:select、poll、epoll、kqueue。下表是他們的一些重要特性的比較:服務器

多路複用IO技術最適用的是「高併發」場景,所謂高併發是指1毫秒內至少同時有上千個鏈接請求準備好。其餘狀況下多路複用IO技術發揮不出來它的優點。另外一方面,使用JAVA NIO進行功能實現,相對於傳統的Socket套接字實現要複雜一些,因此實際應用中,須要根據本身的業務需求進行技術選擇。網絡

五、JAVA對多路複用IO的支持

5-一、重要概念:Channel

通道,被創建的一個應用程序和操做系統交互事件、傳遞內容的渠道(注意是鏈接到操做系統)。一個通道會有一個專屬的文件狀態描述符。那麼既然是和操做系統進行內容的傳遞,那麼說明應用程序能夠經過通道讀取數據,也能夠經過通道向操做系統寫數據。多線程

JDK API中的Channel的描述是:

A channel represents an open connection to an entity such as a hardware device,a file,a network socket,or a program component that is capable of performing one or more distinct I/O operations,for example reading or writing.

A channel is either open or closed.A channel is open upon creation,and once closed it remains closed.Once a channel is closed,any attempt to invoke an I/O operation upon it will cause a ClosedChannelException to be thrown.Whether or not a channel is open may be tested by invoking its isOpen method.

 JAVA NIO框架中,自由的Channel通道包括:

  • 全部被Selector(選擇器)註冊的通道,只能是繼承了SelectableChannel類的子類。如上圖所示
  • ServerSocketChannel:應用服務器程序的監聽通道。只有經過這個通道,應用程序才能向操做系統註冊支持「多路複用IO」的端口監聽。同時支持UDP協議和TCP協議。
  • SocketChannel:TCP Socket 套接字的監聽通道,一個Socket套接字對應了一個客戶端IP:端口到服務器IP:端口的通訊鏈接。
  • DatagramChannel:UDP數據報文的監聽通道。

5-二、重要概念:Buffer

數據緩存區:在JAVA NIO框架中,爲了保證每一個通道的數據讀寫速度 JAVA NIO框架爲每一種須要支持數據讀寫的通道集成了Buffer的支持。

這句話怎麼理解呢?例如ServerSocketChannel通道它只支持對OP_ACCEPT事件的監聽,因此它是不能直接進行網絡數據內容讀寫的。因此ServerSocketChannel是沒有集成Buffer的。

Buffer有兩種工做模式:寫模式和讀模式。在讀模式下,應用程序只能從Buffer中讀取數據,不能進行寫操做。可是在寫模式下,應用程序是而能夠進行讀操做的,這就表示可能會出現髒讀的狀況。因此一旦您決定要從Buffer中讀取數據,必定要將Buffer的狀態改成讀模式。

  • position:緩存區目前正在操做的數據塊位置
  • limit:緩存區最大能夠進行操做的位置。緩存區的讀寫狀態正是由這個屬性控制的。
  • capacity:緩存區的最大容量。這個容量是在緩存區建立時進行指定的。因爲高併發時通道數量每每會很龐大,因此每個緩存區的容量最好不要過大。

在下文JAVA NIO框架的代碼實例中,咱們將進行Buffer緩存區操做的演示。

5-三、重要概念:Selector

Selector的英文含義是「選擇器」,不過根據咱們詳細介紹的Selector的崗位職責,您能夠把它稱之爲「輪詢代理器」、「事件訂閱器」、「channel容器管理機」都行。

  • 事件訂閱和Channel管理:

應用程序將向Selector對象註冊須要它關注的Channel,以及具體的某一個Channel會對哪些IO事件感興趣。Selector中也會維護一個「已經註冊的Channel」的容器。如下代碼來自WindowsSelectorImpl實現類中,對已經註冊的Channel的管理容器:

// Initial capacity of the poll array
private final int INIT_CAP = 8;
// Maximum number of sockets for select().
// Should be INIT_CAP times a power of 2
private final static int MAX_SELECTABLE_FDS = 1024;

// The list of SelectableChannels serviced by this Selector. Every mod
// MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll
// array,  where the corresponding entry is occupied by the wakeupSocket
private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];
  •  輪詢代理:

   應用層再也不經過阻塞或者非阻塞模式直接詢問操做系統 「事件有沒有發生」,而是由Selector代其訪問。

  • 實現不一樣操做系統的支持:

   以前已經提到過,多路複用IO技術是須要操做系統進行支持的,其特色就是操做系統能夠同時掃描同一個端口上不一樣網絡鏈接的時間。因此做爲上層的JVM,必需要爲不一樣操做系統的多路複用IO實現編寫不一樣的代碼。一樣我使用的測試環境是Windows,它對應的實現類是sun.io.ch.WindowsSelectorImpl:

5-四、JAVA NIO框架簡要設計分析

經過上文的描述,咱們知道了多路複用IO技術是操做系統的內核實現。在不一樣的操做系統,甚至同一系列操做系統的不一樣版本中所實現的多路複用IO技術都是不同的。那麼做爲跨平臺的JAVA JVM來講如何適用多種多樣的多路複用IO技術實現呢?面向對象的威力就顯現出來了:不管使用哪一種實現方式,他們都會有「選擇器「、」通道「、「緩存」這幾個操做要素,那麼能夠爲不一樣的多路複用IO技術建立一個統一的抽象組,而且爲不一樣的操做系統進行具體的實現。JAVA NIO中對各類多路複用IO支持,主要的基礎是java.nio.channels.spi.SelectorProvider抽象類,其中的幾個主要抽象方法包括:

  • public abstract DatagramChannel openDatagramChannel():建立和這個操做系統匹配的UDP通道實現。
  • public abstract AbstractSelector openSelector():建立和這個操做系統匹配的NIO選擇器,就像上文所述,不一樣的操做系統,不一樣的版本默認支持的NIO模型是不同的。
  • public abstract ServerSocketChannel open ServerSocketChannel():建立和這個NIO模型匹配的服務器端通道。
  • public abstract SocketChannel openSocketChannel():建立和這個NIO模型匹配的TCP Socket套接字通道(用來反映客戶端的TCP鏈接)

因爲JAVA NIO框架的整個設計是很大的,因此咱們只能還原一部分咱們關心的問題。這裏咱們以JAVA NIO框架中對於不一樣多路複用IO技術的選擇器進行實例化建立的方式爲例子,以點窺豹觀全局:

 

很明顯,不一樣的SelectorProvider實現對應了不一樣的選擇器。由具體的SelectorProvider實現進行建立。另外說明一下,實際上netty底層也是經過這個設計得到具體使用的NIO模型,咱們後文講解Netty時,會講到這個問題。如下代碼是Netty 4.0中NioServerSocketChannel進行實例化時的核心代碼片斷:

private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            /**
             *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
             *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
             *
             *  See <a href="See https://github.com/netty/netty/issues/2308">#2308</a>.
             */
            return provider.openServerSocketChannel();
        } catch (IOException e) {
            throw new ChannelException(
                    "Failed to open a server socket.", e);
        }
    }

 

5-五、JAVA實例

下面,咱們使用JAVA NIO框架,實現一個支持多路複用IO的服務器端(實際上客戶端是否使用多路複用IO技術,對整個系統架構的性能提高相關性不大):

package testNSocket;

import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;

public class SocketServer1 {

    static {
        BasicConfigurator.configure();
    }

    /**
     * 日誌
     */
    private static final Log LOGGER = LogFactory.getLog(SocketServer1.class);

    public static void main(String[] args) throws Exception {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        ServerSocket serverSocket = serverChannel.socket();
        serverSocket.setReuseAddress(true);
        serverSocket.bind(new InetSocketAddress(83));

        Selector selector = Selector.open();
        //注意、服務器通道只能註冊SelectionKey.OP_ACCEPT事件
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);

        try {
            while(true) {
                //若是條件成立,說明本次詢問selector,並無獲取到任何準備好的、感興趣的事件
                //java程序對多路複用IO的支持也包括了阻塞模式 和非阻塞模式兩種。
                if(selector.select(100) == 0) {
                    //================================================
                    //      這裏視業務狀況,能夠作一些然並卵的事情
                    //================================================
                    continue;
                }
                //這裏就是本次詢問操做系統,所獲取到的「所關心的事件」的事件類型(每個通道都是獨立的)
                Iterator<SelectionKey> selecionKeys = selector.selectedKeys().iterator();

                while(selecionKeys.hasNext()) {
                    SelectionKey readyKey = selecionKeys.next();
                    //這個已經處理的readyKey必定要移除。若是不移除,就會一直存在在selector.selectedKeys集合中
                    //待到下一次selector.select() > 0時,這個readyKey又會被處理一次
                    selecionKeys.remove();

                    SelectableChannel selectableChannel = readyKey.channel();
                    if(readyKey.isValid() && readyKey.isAcceptable()) {
                        SocketServer1.LOGGER.info("======channel通道已經準備好=======");
                        /*
                         * 當server socket channel通道已經準備好,就能夠從server socket channel中獲取socketchannel了
                         * 拿到socket channel後,要作的事情就是立刻到selector註冊這個socket channel感興趣的事情。
                         * 不然沒法監聽到這個socket channel到達的數據
                         * */
                        ServerSocketChannel serverSocketChannel = (ServerSocketChannel)selectableChannel;
                        SocketChannel socketChannel = serverSocketChannel.accept();
                        registerSocketChannel(socketChannel , selector);

                    } else if(readyKey.isValid() && readyKey.isConnectable()) {
                        SocketServer1.LOGGER.info("======socket channel 創建鏈接=======");
                    } else if(readyKey.isValid() && readyKey.isReadable()) {
                        SocketServer1.LOGGER.info("======socket channel 數據準備完成,能夠去讀==讀取=======");
                        readSocketChannel(readyKey);
                    }
                }
            }
        } catch(Exception e) {
            SocketServer1.LOGGER.error(e.getMessage() , e);
        } finally {
            serverSocket.close();
        }
    }

    /**
     * 在server socket channel接收到/準備好 一個新的 TCP鏈接後。
     * 就會向程序返回一個新的socketChannel。<br>
     * 可是這個新的socket channel並無在selector「選擇器/代理器」中註冊,
     * 因此程序還無法經過selector通知這個socket channel的事件。
     * 因而咱們拿到新的socket channel後,要作的第一個事情就是到selector「選擇器/代理器」中註冊這個
     * socket channel感興趣的事件
     * @param socketChannel 新的socket channel
     * @param selector selector「選擇器/代理器」
     * @throws Exception
     */
    private static void registerSocketChannel(SocketChannel socketChannel , Selector selector) throws Exception {
        socketChannel.configureBlocking(false);
        //socket通道能夠且只能夠註冊三種事件SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT
        socketChannel.register(selector, SelectionKey.OP_READ , ByteBuffer.allocate(2048));
    }

    /**
     * 這個方法用於讀取從客戶端傳來的信息。
     * 而且觀察從客戶端過來的socket channel在通過屢次傳輸後,是否完成傳輸。
     * 若是傳輸完成,則返回一個true的標記。
     * @param socketChannel
     * @throws Exception
     */
    private static void readSocketChannel(SelectionKey readyKey) throws Exception {
        SocketChannel clientSocketChannel = (SocketChannel)readyKey.channel();
        //獲取客戶端使用的端口
        InetSocketAddress sourceSocketAddress = (InetSocketAddress)clientSocketChannel.getRemoteAddress();
        Integer resoucePort = sourceSocketAddress.getPort();

        //拿到這個socket channel使用的緩存區,準備讀取數據
        //在後文,將詳細講解緩存區的用法概念,實際上重要的就是三個元素capacity,position和limit。
        ByteBuffer contextBytes = (ByteBuffer)readyKey.attachment();
        //將通道的數據寫入到緩存區,注意是寫入到緩存區。
        //因爲以前設置了ByteBuffer的大小爲2048 byte,因此能夠存在寫入不完的狀況
        //不要緊,咱們後面來調整代碼。這裏咱們暫時理解爲一次接受能夠完成
        int realLen = -1;
        try {
            realLen = clientSocketChannel.read(contextBytes);
        } catch(Exception e) {
            //這裏拋出了異常,通常就是客戶端由於某種緣由終止了。因此關閉channel就好了
            SocketServer1.LOGGER.error(e.getMessage());
            clientSocketChannel.close();
            return;
        }

        //若是緩存區中沒有任何數據(但實際上這個不太可能,不然就不會觸發OP_READ事件了)
        if(realLen == -1) {
            SocketServer1.LOGGER.warn("====緩存區沒有數據?====");
            return;
        }

        //將緩存區從寫狀態切換爲讀狀態(實際上這個方法是讀寫模式互切換)。
        //這是java nio框架中的這個socket channel的寫請求將所有等待。
        contextBytes.flip();
        //注意中文亂碼的問題,我我的喜愛是使用URLDecoder/URLEncoder,進行解編碼。
        //固然java nio框架自己也提供編解碼方式,看我的咯
        byte[] messageBytes = contextBytes.array();
        String messageEncode = new String(messageBytes , "UTF-8");
        String message = URLDecoder.decode(messageEncode, "UTF-8");

        //若是收到了「over」關鍵字,纔會清空buffer,並回發數據;
        //不然不清空緩存,還要還原buffer的「寫狀態」
        if(message.indexOf("over") != -1) {
            //清空已經讀取的緩存,並重新切換爲寫狀態(這裏要注意clear()和capacity()兩個方法的區別)
            contextBytes.clear();
            SocketServer1.LOGGER.info("端口:" + resoucePort + "客戶端發來的信息======message : " + message);

            //======================================================
            //          固然接受完成後,能夠在這裏正式處理業務了        
            //======================================================

            //回發數據,並關閉channel
            ByteBuffer sendBuffer = ByteBuffer.wrap(URLEncoder.encode("回發處理結果", "UTF-8").getBytes());
            clientSocketChannel.write(sendBuffer);
            clientSocketChannel.close();
        } else {
            SocketServer1.LOGGER.info("端口:" + resoucePort + "客戶端信息還未接受完,繼續接受======message : " + message);
            //這是,limit和capacity的值一致,position的位置是realLen的位置
            contextBytes.position(realLen);
            contextBytes.limit(contextBytes.capacity());
        }
    }
}
  •  serverChannel.register(Selector sel, int ops, Object att):實際上register(Selector sel, int ops, Object att)方法是ServerSocketChannel類的父類AbstractSelectableChannel提供的一個方法,表示只要繼承了AbstractSelectableChannel類的子類均可以註冊到選擇器中。經過觀察整個AbstractSelectableChannel繼承關係,下圖中的這些類能夠被註冊到選擇器中:

  • SelectionKey.OP_ACCEPT:不一樣的Channel對象能夠註冊的「我關心的事件」是不同的。例如ServerSocketChannel除了可以被容許關注OP_ACCEPT時間外,不容許再關心其餘事件了(不然運行時會拋出異常)。如下梳理了常使用的AbstractSelectableChannel子類能夠註冊的事件列表:

 

實際上經過每個AbstractSelectableChannel子類所實現的public final int validOps()方法,就能夠查看這個通道「能夠關心的IO事件」。

  • selector.selectedKeys().iterator():當選擇器Selector收到操做系統的IO操做事件後,它的selectedKeys將在下一次輪詢操做中,收到這些事件的關鍵描述字(不一樣的channel,就算關鍵字同樣,也會存儲成兩個對象)。可是每個「事件關鍵字」被處理後都必須移除,不然下一次輪詢時,這個事件會被重複處理。

Returns this selector’s selected-key set.

Keys may be removed from, but not directly added to, the selected-key set. Any attempt to add an object to the key set will cause an UnsupportedOperationException to be thrown.

The selected-key set is not thread-safe.

5-六、JAVA實例改進

上面的代碼中,咱們爲了講解selector的使用,在緩存使用上就進行了簡化。實際的應用中,爲了節約內存資源,咱們通常不會爲通道分配那麼多的緩存空間。下面的代碼咱們主要對其中的緩存操做進行了優化:

package testNSocket;

import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.URLDecoder;
import java.net.URLEncoder;

import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;

public class SocketServer2 {

    static {
        BasicConfigurator.configure();
    }

    /**
     * 日誌
     */
    private static final Log LOGGER = LogFactory.getLog(SocketServer2.class);

    /**
     * 改進的java nio server的代碼中,因爲buffer的大小設置的比較小。
     * 咱們再也不把一個client經過socket channel屢次傳給服務器的信息保存在beff中了(由於根本存不下)<br>
     * 咱們使用socketchanel的hashcode做爲key(固然您也能夠本身肯定一個id),信息的stringbuffer做爲value,存儲到服務器端的一個內存區域MESSAGEHASHCONTEXT。
     * 
     * 若是您不清楚ConcurrentHashMap的做用和工做原理,請自行百度/Google
     */
    private static final ConcurrentMap<Integer, StringBuffer> MESSAGEHASHCONTEXT = new ConcurrentHashMap<Integer , StringBuffer>();

    public static void main(String[] args) throws Exception {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        ServerSocket serverSocket = serverChannel.socket();
        serverSocket.setReuseAddress(true);
        serverSocket.bind(new InetSocketAddress(83));

        Selector selector = Selector.open();
        //注意、服務器通道只能註冊SelectionKey.OP_ACCEPT事件
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);

        try {
            while(true) {
                //若是條件成立,說明本次詢問selector,並無獲取到任何準備好的、感興趣的事件
                //java程序對多路複用IO的支持也包括了阻塞模式 和非阻塞模式兩種。
                if(selector.select(100) == 0) {
                    //================================================
                    //      這裏視業務狀況,能夠作一些然並卵的事情
                    //================================================
                    continue;
                }
                //這裏就是本次詢問操做系統,所獲取到的「所關心的事件」的事件類型(每個通道都是獨立的)
                Iterator<SelectionKey> selecionKeys = selector.selectedKeys().iterator();

                while(selecionKeys.hasNext()) {
                    SelectionKey readyKey = selecionKeys.next();
                    //這個已經處理的readyKey必定要移除。若是不移除,就會一直存在在selector.selectedKeys集合中
                    //待到下一次selector.select() > 0時,這個readyKey又會被處理一次
                    selecionKeys.remove();

                    SelectableChannel selectableChannel = readyKey.channel();
                    if(readyKey.isValid() && readyKey.isAcceptable()) {
                        SocketServer2.LOGGER.info("======channel通道已經準備好=======");
                        /*
                         * 當server socket channel通道已經準備好,就能夠從server socket channel中獲取socketchannel了
                         * 拿到socket channel後,要作的事情就是立刻到selector註冊這個socket channel感興趣的事情。
                         * 不然沒法監聽到這個socket channel到達的數據
                         * */
                        ServerSocketChannel serverSocketChannel = (ServerSocketChannel)selectableChannel;
                        SocketChannel socketChannel = serverSocketChannel.accept();
                        registerSocketChannel(socketChannel , selector);

                    } else if(readyKey.isValid() && readyKey.isConnectable()) {
                        SocketServer2.LOGGER.info("======socket channel 創建鏈接=======");
                    } else if(readyKey.isValid() && readyKey.isReadable()) {
                        SocketServer2.LOGGER.info("======socket channel 數據準備完成,能夠去讀==讀取=======");
                        readSocketChannel(readyKey);
                    }
                }
            }
        } catch(Exception e) {
            SocketServer2.LOGGER.error(e.getMessage() , e);
        } finally {
            serverSocket.close();
        }
    }

    /**
     * 在server socket channel接收到/準備好 一個新的 TCP鏈接後。
     * 就會向程序返回一個新的socketChannel。<br>
     * 可是這個新的socket channel並無在selector「選擇器/代理器」中註冊,
     * 因此程序還無法經過selector通知這個socket channel的事件。
     * 因而咱們拿到新的socket channel後,要作的第一個事情就是到selector「選擇器/代理器」中註冊這個
     * socket channel感興趣的事件
     * @param socketChannel 新的socket channel
     * @param selector selector「選擇器/代理器」
     * @throws Exception
     */
    private static void registerSocketChannel(SocketChannel socketChannel , Selector selector) throws Exception {
        socketChannel.configureBlocking(false);
        //socket通道能夠且只能夠註冊三種事件SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT
        //最後一個參數視爲 爲這個socketchanne分配的緩存區
        socketChannel.register(selector, SelectionKey.OP_READ , ByteBuffer.allocate(50));
    }

    /**
     * 這個方法用於讀取從客戶端傳來的信息。
     * 而且觀察從客戶端過來的socket channel在通過屢次傳輸後,是否完成傳輸。
     * 若是傳輸完成,則返回一個true的標記。
     * @param socketChannel
     * @throws Exception
     */
    private static void readSocketChannel(SelectionKey readyKey) throws Exception {
        SocketChannel clientSocketChannel = (SocketChannel)readyKey.channel();
        //獲取客戶端使用的端口
        InetSocketAddress sourceSocketAddress = (InetSocketAddress)clientSocketChannel.getRemoteAddress();
        Integer resoucePort = sourceSocketAddress.getPort();

        //拿到這個socket channel使用的緩存區,準備讀取數據
        //在後文,將詳細講解緩存區的用法概念,實際上重要的就是三個元素capacity,position和limit。
        ByteBuffer contextBytes = (ByteBuffer)readyKey.attachment();
        //將通道的數據寫入到緩存區,注意是寫入到緩存區。
        //此次,爲了演示buff的使用方式,咱們故意縮小了buff的容量大小到50byte,
        //以便演示channel對buff的屢次讀寫操做
        int realLen = 0;
        StringBuffer message = new StringBuffer();
        //這句話的意思是,將目前通道中的數據寫入到緩存區
        //最大可寫入的數據量就是buff的容量
        while((realLen = clientSocketChannel.read(contextBytes)) != 0) {

            //必定要把buffer切換成「讀」模式,不然因爲limit = capacity
            //在read沒有寫滿的狀況下,就會致使多讀
            contextBytes.flip();
            int position = contextBytes.position();
            int capacity = contextBytes.capacity();
            byte[] messageBytes = new byte[capacity];
            contextBytes.get(messageBytes, position, realLen);

            //這種方式也是能夠讀取數據的,並且不用關心position的位置。
            //由於是目前contextBytes全部的數據所有轉出爲一個byte數組。
            //使用這種方式時,必定要本身控制好讀取的最終位置(realLen很重要)
            //byte[] messageBytes = contextBytes.array();

            //注意中文亂碼的問題,我我的喜愛是使用URLDecoder/URLEncoder,進行解編碼。
            //固然java nio框架自己也提供編解碼方式,看我的咯
            String messageEncode = new String(messageBytes , 0 , realLen , "UTF-8");
            message.append(messageEncode);

            //再切換成「寫」模式,直接狀況緩存的方式,最快捷
            contextBytes.clear();
        }

        //若是發現本次接收的信息中有over關鍵字,說明信息接收完了
        if(URLDecoder.decode(message.toString(), "UTF-8").indexOf("over") != -1) {
            //則從messageHashContext中,取出以前已經收到的信息,組合成完整的信息
            Integer channelUUID = clientSocketChannel.hashCode();
            SocketServer2.LOGGER.info("端口:" + resoucePort + "客戶端發來的信息======message : " + message);
            StringBuffer completeMessage;
            //清空MESSAGEHASHCONTEXT中的歷史記錄
            StringBuffer historyMessage = MESSAGEHASHCONTEXT.remove(channelUUID);
            if(historyMessage == null) {
                completeMessage = message;
            } else {
                completeMessage = historyMessage.append(message);
            }
            SocketServer2.LOGGER.info("端口:" + resoucePort + "客戶端發來的完整信息======completeMessage : " + URLDecoder.decode(completeMessage.toString(), "UTF-8"));

            //======================================================
            //          固然接受完成後,能夠在這裏正式處理業務了        
            //======================================================

            //回發數據,並關閉channel
            ByteBuffer sendBuffer = ByteBuffer.wrap(URLEncoder.encode("回發處理結果", "UTF-8").getBytes());
            clientSocketChannel.write(sendBuffer);
            clientSocketChannel.close();
        } else {
            //若是沒有發現有「over」關鍵字,說明尚未接受完,則將本次接受到的信息存入messageHashContext
            SocketServer2.LOGGER.info("端口:" + resoucePort + "客戶端信息還未接受完,繼續接受======message : " + URLDecoder.decode(message.toString(), "UTF-8"));
            //每個channel對象都是獨立的,因此可使用對象的hash值,做爲惟一標示
            Integer channelUUID = clientSocketChannel.hashCode();

            //而後獲取這個channel下之前已經達到的message信息
            StringBuffer historyMessage = MESSAGEHASHCONTEXT.get(channelUUID);
            if(historyMessage == null) {
                historyMessage = new StringBuffer();
                MESSAGEHASHCONTEXT.put(channelUUID, historyMessage.append(message));
            }
        }
    }
}

 以上代碼應該沒有過多須要講解的了。固然,您仍是能夠加入線程池技術,進行具體的業務處理。注意,必定是線程池,由於這樣能夠保證線程規模的可控性。

六、多路複用IO的優缺點

  • 不用再使用多線程進行IO處理了(包括操做系統內核IO管理模塊和應用程序進程而言)。固然實際業務的處理中,應用程序進程仍是能夠引入線程池技術的。
  • 同一個端口能夠處理多種協議,例如,使用ServerSocketChannel的服務器端口監聽,既能夠處理TCP協議又能夠處理UDP協議。
  • 操做系統級別的優化:多路複用IO技術能夠是操做系統級別在一個端口上可以同時接受多個客戶端的IO事件。同時具備以前咱們講到的阻塞式同步IO和非阻塞同步IO的全部特色。Selector的一部分做用更至關於「輪詢代理器」。
  • 都是同步IO:目前咱們介紹的阻塞式IO、非阻塞式IO甚至多路複用IO,這些都是基於操做系統級別對「同步IO」的實現。咱們一直在說「同步IO」,一直都沒有詳細說,什麼叫作「同步IO」。實際上一句話就能夠說清楚:只有上層(包括上層的某種代理機制)系統詢問我是否有某個事件發生了,不然我不會主動告訴上層系統事件發生了。

這個關鍵概念,在這篇文章以前的幾張「原理說明圖」中實際上就能夠清晰的體現了,可是爲了讓你們更清楚的總結同步IO,異步IO、阻塞IO、非阻塞IO的概念,

下篇文章在講解異步IO後我會梳理了一張對比表格。

七、異步IO(真正的NIO)

好吧,很差意思,我再一次錯估了文章的工做量。JAVA Asynchronous IO的介紹咱們只有再日後推一推了。下一篇文章中,我將詳細講解操做系統支持的異步IO方式,並介紹JAVA 1.7版本中加入的NIO2.0(AIO)對異步IO的實現。上文也說過了,在Linux系統中並無Windows中的IOCP技術,因此Linux技術使用epoll多路複用技術模擬異步IO。

 

摘自:https://blog.csdn.net/yinwenjie/article/details/48522403

相關文章
相關標籤/搜索