屢次嘗試的學習,終於搞懂了NIO!

NIO—NonBlocking IO(new IO)

  1. io面向流編程,只能做爲輸入或者輸出流的一種,是同步阻塞的,每個鏈接過來都要建立一個線程去處理,線程上下文切換開銷很大,形成了很大的瓶頸
  2. 因而有了線程池實現的僞阻塞IO,必定程度解決了線程建立過多的問題,可是沒有從根本上解決阻塞的問題,而且線程過多而線程池太小時也會形成很大的瓶頸
  3. 既然根本瓶頸緣由是線程數和阻塞IO,那麼咱們有沒有辦法只用1個線程去處理多個客戶端鏈接呢?這就是NIO出現的緣由

NIO主要有三個核心部分組成linux

  • buffer緩衝區
  • Channel管道
  • Selector選擇器

nio面向block塊,buffer緩衝區編程,底層是數組,buffer提供數據訪問,channel讀寫到buffer,buffer讀寫到channel,從buffer讀取到程序channel是雙向的編程

理解NIO須要理解事件編程模型數組

NIO核心:緩存

NIO由原來的阻塞讀寫(佔用線程)變成了單線程輪詢事件,找到能夠進行讀寫的網絡描述符進行讀寫。除了事件的輪詢是阻塞的(沒有可乾的事情必需要阻塞),剩餘的I/O操做都是純CPU操做,沒有必要開啓多線程。服務器

單線程處理I/O的效率確實很是高,沒有線程切換,只是拼命的讀、寫、選擇事件。微信

NIO帶個咱們:網絡

  1. 事件驅動模型—異步編程都離不開事件
  2. 單線程處理多鏈接—多路複用使得處理更加高效
  3. 非阻塞IO,只阻塞獲取可操做事件
  4. 基於block傳輸比基於流傳輸更加高效
  5. 零拷貝—DirectBuffer

缺點:多線程

NIO並無徹底屏蔽平臺差別,它仍然是基於各個操做系統的I/O系統實現的,差別仍然存在。使用NIO作網絡編程構建事件驅動模型並不容易,陷阱重重。app

推薦使用NIO成熟框架Netty框架

Buffer

緩衝區本質上是一塊能夠寫入數據,而後能夠從中讀取數據的內存。這塊內存被包裝成NIO Buffer對象,並提供了一組方法,用來方便的訪問該塊內存。

Capacity、Position、Limit

<=< code=""> mark <=< code=""> position <=< code=""> limit <=< code=""> capacity

  • capacity

做爲一個內存塊,Buffer有一個固定的大小值,也叫「capacity」.你只能往裏寫capacity個byte、long,char等類型。一旦Buffer滿了,須要將其清空(經過讀數據或者清除數據)才能繼續寫數據往裏寫數據。

  • position

當你寫數據到Buffer中時,position表示當前的位置。初始的position值爲0.當一個byte、long等數據寫到Buffer後, position會向前移動到下一個可插入數據的Buffer單元。position最大可爲capacity – 1.

當讀取數據時,也是從某個特定位置讀。當將Buffer從寫模式切換到讀模式,position會被重置爲0. 當從Buffer的position處讀取數據時,position向前移動到下一個可讀的位置。

  • limit

在寫模式下,Buffer的limit表示你最多能往Buffer裏寫多少數據。 寫模式下,limit等於Buffer的capacity。

當切換Buffer到讀模式時, limit表示你最多能讀到多少數據。所以,當切換Buffer到讀模式時,limit會被設置成寫模式下的position值。換句話說,你能讀到以前寫入的全部數據(limit被設置成已寫數據的數量,這個值在寫模式下就是position)

同一個buffer能夠存儲不一樣數據類型的數據,可是獲取的時候要指定類型獲取

ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.putInt(1);
buffer.putLong(387524875628742L);
buffer.putChar('s');
buffer.flip();
System.out.println(buffer.getInt());
System.out.println(buffer.getLong());
System.out.println(buffer.getChar());複製代碼

put方法只能放入byte型,不能放入int

flip、clear、rewind、mark

  • flip

flip方法將Buffer從寫模式切換到讀模式。調用flip()方法會將position設回0,並將limit設置成以前position的值。

public final Buffer flip() {
        this.limit = this.position;
        this.position = 0;
        this.mark = -1;
        return this;
    }複製代碼

  • clear

position將被設回0,limit被設置成 capacity的值。換句話說,Buffer 被清空了。Buffer中的數據並未清除,只是這些標記告訴咱們能夠從哪裏開始往Buffer裏寫數據。

public final Buffer clear() {
        this.position = 0;
        this.limit = this.capacity;
        this.mark = -1;
        return this;
    }複製代碼

  • rewind

Buffer.rewind()將position設回0,因此你能夠重讀Buffer中的全部數據。limit保持不變,仍然表示能從Buffer中讀取多少個元素

public final Buffer rewind() {
        this.position = 0;
        this.mark = -1;
        return this;
    }複製代碼

  • mark

能夠標記Buffer中的一個特定position。以後能夠經過調用Buffer.reset()方法恢復到這個position。

public final Buffer mark() {
        this.mark = this.position;
        return this;
    }複製代碼

  • slice分片

將buffer根據設置的position和limit分片一個buffer,有本身的position、limit和capacity,數據共用一個內存地址的buffer數據

public static void test2(){
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        for(int i=0;i<buffer.capacity();i++){
            buffer.put((byte)i);
        }
        buffer.position(10);
        buffer.limit(20);
        ByteBuffer buffer1 = buffer.slice();//buffer分片
        for(int m=0;m<buffer1.capacity();m++){
            byte b = buffer1.get();
            System.out.print(b+" ");
        }
    }

輸出:
10 11 12 13 14 15 16 17 18 19複製代碼

ReadOnlyBuffer

普通的Buffer(可讀可寫)能夠隨時轉換爲只讀Buffer,可是隻讀Buffer不能夠轉換爲普通Buffer

ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();複製代碼

轉換後的Buffer是一個新的只讀Buffer,擁有獨立的position、limit和capacity

DirectBuffer

堆外內存buffer,本地JNI非JVM堆內存buffer,容許直接訪問

普通ByteBuffer由JVM管理,在JVM堆上分配內存

ByteBuffer buf = ByteBuffer.allocate(1024);複製代碼

DirectBuffer會在本地內存中分配,脫離JVM的堆管理

ByteBuffer buf = ByteBuffer.allocateDirect(1024);複製代碼

爲何要這樣作呢?

----------又是GC--------

咱們都知道JVM 在堆上的老年代中,GC時會採起標記-整理策略,會使得對象在堆內存中的地址發生變化,整理時會buffer太大時會很難gc整理

因此出現了DirectBuffer,它使用unsafe.allocateMemory分配內存,是一個native方法,由buffer的address變量記錄這個內存的地址來提供訪問

比較

  • DirectBuffer:本地方法分配內存顯然沒有JVM堆分配快,可是涉及IO網絡IO的話就是DirectBuffer比較快了

DirectByteBuffer繼承了MappedByteBuffer

緩存的使用可使用DirectByteBuffer和HeapByteBuffer。若是使用了DirectByteBuffer,通常來講能夠減小一次系統空間到用戶空間的拷貝。

數據量比較小的中小應用狀況下,能夠考慮使用heapBuffer;反之能夠用directBuffer

MappedByteBuffer

映射到堆外內存的ByteBuffer,DirectByteBuffer繼承此類實現堆外內存的分配

經過下面方式映射buffer到堆外內存

MappedByteBuffer mappedByteBuffer = channel.map(MapMode.READ_WRITE, 0, channel.size());複製代碼

使用拷貝文件:

RandomAccessFile in = new RandomAccessFile("nio/1.txt", "rw");
RandomAccessFile out = new RandomAccessFile("nio/2.txt", "rw");
FileChannel inChannel = in.getChannel();
FileChannel outChannel = out.getChannel();
MappedByteBuffer inputData = inChannel.map(FileChannel.MapMode.READ_ONLY,0,new File("nio/1.txt").length());
Charset charset = Charset.forName("utf-8");//編碼
CharsetDecoder decoder = charset.newDecoder();
CharsetEncoder encoder = charset.newEncoder();
CharBuffer charBuffer = decoder.decode(inputData);
ByteBuffer buffer = encoder.encode(charBuffer);
outChannel.write(buffer);
in.close();out.close();複製代碼

Channel—通道

FileChannel

NIO提供的一種鏈接到文件的通道,用於文件的讀寫

在使用FileChannel時,須要從輸入輸出流或者RandomAccessFile中獲取FIleChannel

  • 若是要向FileChannel中讀取數據,須要申請一個ByteBuffer,將數據從FileChannel中讀取到緩衝區ByteBuffer,read()返回多少個字節被讀取,若是返回-1說明文件已經到達末尾
  • 若是要向FileChannel中寫入數據,須要先將數據寫入到ByteBuffer中,在從ByteBuffer中寫入到FileChannel中,調用write()方法

注意讀寫之間須要Buffer.flip();

例子:

1.讀取文件數據並打印

FileInputStream fileInputStream = new FileInputStream("1.log");
FileChannel channel = fileInputStream.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(512);;
channel.read(byteBuffer);
byteBuffer.flip();
while(byteBuffer.remaining()>0){
    byte b = byteBuffer.get();
    System.out.println((char) b);
}
fileInputStream.close();複製代碼

2.把1.txt數據寫入2.txt

FileInputStream inputStream = new FileInputStream("1.txt");
FileChannel in = inputStream.getChannel();
FileOutputStream outputStream = new FileOutputStream("2.txt");
FileChannel out = outputStream.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
while(true){
    byteBuffer.clear();//沒有的話會一直讀取
    int read = in.read(byteBuffer);
    System.out.println("read:"+read);
    if(read==-1){
        break;//爲-1表示文件結束 返回
    }
    byteBuffer.flip();
    out.write(byteBuffer);
}
inputStream.close();
outputStream.close();複製代碼

ServerSockerChannel

NIO提供了一種能夠監聽新進入的TCP鏈接的通道,就是ServerSocketChannel,對應IO中ServerSocket

  • 打開監聽通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(9999));
while(true){
    SocketChannel socketChannel = serverSocketChannel.accept();
    //do something with socketChannel...
}複製代碼

SocketChannel

NIO提供的一種鏈接到TCP套接字的通道,就是SocketChannel,對應IO中Socket

  • 打開一個SocketChannel
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", 8080));複製代碼

Channel讀寫

ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = socketChannel.read(buf);複製代碼

ByteBuffer writeBuffer = ByteBuffer.allocate(48);
String msg = "hello";
writeBuffer.put(msg.getBytes());
writeBuffer.flip();
channel.write(writeBuffer);複製代碼

  • 讀完寫
ByteBuffer buffer = ByteBuffer.allocate(1024);
int byteRead = channel.read(buffer);
if(byteRead<=0){
    channel.close();
    break;
}
buffer.flip();
channel.write(buffer);
read += byteRead;
buffer.clear();複製代碼

每次寫完buffer,若是buffer數據不須要再使用,建議clear清空buffer,準備下一次寫操做

Selector—多路複用器(選擇器)

多路複用器,這個名字很形象,使用一個線程去處理多個channel,從而管理多個channel

爲何要使用一個線程管理多個channel?

線程上下文切換開銷很大,線程越少處理channel更高效

建立Selector—建立比賽

Selector selector = Selector.open();複製代碼

註冊channel—購買入場卷

channel經過註冊到selector上來把channel的事件交給Selector管理,而且返回一個SelectionKey

SelectionKey key = channel.register(selector, SelectionKey.OP_READ);複製代碼

  • 與 Selector 一塊兒使用時,Channel 必須處於非阻塞模式下。這意味着不能將 FileChannel 與 Selector 一塊兒使用,由於 FileChannel 不能切換到非阻塞模式
channel.configureBlocking(false);複製代碼

  • 經過SelectionKey獲取channel和selector以及準備好的事件
Channel  channel  = selectionKey.channel();
Selector selector = selectionKey.selector();複製代碼

Selector執行選擇—拿着入場捲入場

把channel註冊到Selector後,咱們可使用Selector.select();方法獲取準備就緒的通道,返回一個int型整數,表示準備好的channel數

經過selector.selectedKeys();方法獲取準備就緒的SelectionKey,再經過SelectionKey獲取channel和selector,通常使用迭代器遍歷這些準備好的channel

在每一次處理完一個SelectionKey,必須把它從迭代器中刪除,處理完,這個SelectionKey就沒有用了,就像一個入場卷,你能夠經過它進入賽場而且它上面有入場人和座位對應信息,比賽結束後你沒法再經過它執行任何有效的操做。

  • 看完比賽,舉辦者不會回收全部的票據,須要大家本身處理,不能亂丟在場地中,須要本身丟到垃圾桶中或者帶回家
iterator.remove();複製代碼

  • wakeUp()方法

某個線程調用 select() 方法後阻塞了,即便沒有通道已經就緒,也沒法返回,wakeUp方法使得立馬返回。

Scatter、Gather

scatter / gather 常常用於須要將傳輸的數據分開處理的場合,例如傳輸一個由消息頭和消息體組成的消息,你可能會將消息體和消息頭分散到不一樣的 buffer 中,這樣你能夠方便的處理消息頭和消息體。

Scatter

分散(scatter)從 Channel 中讀取是指在讀操做時將讀取的數據寫入多個 buffer 中。所以,Channel 將從 Channel 中讀取的數據 「分散(scatter)」 到多個 Buffer 中。

Gather

彙集(gather)寫入 Channel 是指在寫操做時將多個 buffer 的數據寫入同一個 Channel,所以,Channel 將多個 Buffer 中的數據 「彙集(gather)」 後發送到 Channel。

例子:用三個長度分別爲3,4,5的buffer存儲輸入的字符串,前3個字符存儲在第一個buffer,4-7字符存儲在第二個buffer,長度爲4,8-12存儲在第三個buffer,長度爲5

ServerSocketChannel serverSocketChannel =  ServerSocketChannel.open();
        InetSocketAddress inetSocketAddress = new InetSocketAddress(8899);
        serverSocketChannel.socket().bind(inetSocketAddress);
        int messageLength = 3 + 4 + 5;
        ByteBuffer[] byteBuffer = new ByteBuffer[3];
        byteBuffer[0] = ByteBuffer.allocate(3);
        byteBuffer[1] = ByteBuffer.allocate(4);
        byteBuffer[2] = ByteBuffer.allocate(5);
        SocketChannel socketChannel = serverSocketChannel.accept();
        while (true){
            int byteRead = 0;
            while (byteRead<messageLength){
                long r = socketChannel.read(byteBuffer);
                byteRead += r;
                System.out.println("byteread:"+byteRead);
                Arrays.stream(byteBuffer).map(o->"position:"+o.position()+",limit:"+o.limit()).forEach(System.out::println);
            }

            Arrays.stream(byteBuffer).forEach(Buffer::flip);

            int byteWrite = 0;
            while(byteWrite<messageLength){
                long r = socketChannel.write(byteBuffer);
                byteWrite += r;
                System.out.println("bytewrite:"+byteWrite);
                Arrays.stream(byteBuffer).map(o->"position:"+o.position()+",limit:"+o.limit()).forEach(System.out::println);
            }

            Arrays.stream(byteBuffer).forEach(Buffer::clear);
        }

測試:使用linux nc localhost 8899測試
輸入:helloworld回車 
輸出:
byteread:11
position:3,limit:3
position:4,limit:4
position:4,limit:5
解釋:
回車算一個字符一共11個字符,前三個存儲到第一個buffer了,存滿了;中間四個存儲到第二個buffer,存滿了;剩下多餘的存儲到第三個buffer,沒有存滿複製代碼

NIO服務端客戶端

這個程序演示使用NIO建立一個聊天室,服務端和多個客戶端鏈接,客戶端能夠互發消息

  • server服務端
/**
 * 能夠直接使用 linux nc命令當作客戶端
 * nc localhost 端口
 */
public class Server {
    private static Map<SocketChannel,String> clientMap = new HashMap<>();
    public static void main(String[] args) throws IOException {
        //打開服務器channel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        //設置非阻塞 即將使用selector
        serverSocketChannel.configureBlocking(false);
        //獲取服務器的socket
        ServerSocket serverSocket = serverSocketChannel.socket();
        //綁定端口
        serverSocket.bind(new InetSocketAddress(8089));
        //打開一個多路複用器,使用一條線程處理客戶端channel
        Selector selector = Selector.open();
        //註冊服務器channel到
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        while (true){
            //阻塞獲取channel事件
            //一旦調用了 select() 方法,而且返回值代表有一個或更多個通道就緒了
            int num = selector.select();
            /**
             * 獲取到後 拿到多路複用器的SelectionKey 核心方法channel獲取註冊在起上的channel
             * SelectionKey 每次註冊一個channel都會建立一個SelectionKey 其中常量定義channel狀態
            **/
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            //對其中每個SelectionKey進行操做
            selectionKeys.forEach(selectionKey->{
                    try {
                        //若是該服務器SelectionKey被接收
                        if(selectionKey.isAcceptable()){
                            //拿到服務器channel
                            ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
                            SocketChannel client = null;
                            //拿到本次鏈接上服務器的客戶端
                            client = server.accept();
                            client.configureBlocking(false);
                            //把客戶端註冊到多路複用器,監聽客戶端的可讀事件
                            client.register(selector,SelectionKey.OP_READ);
                            //爲每一個客戶端分配id
                            String key = "["+ UUID.randomUUID()+"]";
                            clientMap.put(client,key);
                            //若是SelectionKey讀就緒,執行讀操做
                        }else if(selectionKey.isReadable()){
                            //拿到channel
                            SocketChannel channel = (SocketChannel) selectionKey.channel();
                            //建立讀buffer
                            ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                            //讀取channel中數據到讀buffer
                            int read = channel.read(readBuffer);
                            String reMsg = "";
                            //若是有數據
                            if(read>0){
                                //翻轉進行寫操做
                                readBuffer.flip();
                                //制定解碼集utf-8,對讀buffer解碼打印
                                Charset charset = Charset.forName("utf-8");
                                reMsg = String.valueOf(charset.decode(readBuffer).array());
                                System.out.println(clientMap.get(channel)+" receive: "+reMsg);
                            }else if(read==-1) channel.close();//若是客戶端關閉就關閉客戶端channel
                            //羣發:發送數據到其餘客戶端channel
                            for(SocketChannel ch:clientMap.keySet()){
                                if(ch!=channel) {
                                    String key = clientMap.get(ch);
                                    ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
                                    writeBuffer.put(("來自"+key + ":" + reMsg).getBytes());
                                    writeBuffer.flip();
                                    ch.write(writeBuffer);
                                }
                            }
                        }

                    } catch (IOException e) {
                        e.printStackTrace();

                }
            });
            selectionKeys.clear();//每次處理完一個SelectionKey的事件,把該SelectionKey刪除
        }
    }
}複製代碼

  • 客戶端
public class Client {
    public static void main(String[] args) throws IOException {
        //打開客戶端channel
        SocketChannel socketChannel = SocketChannel.open();
        //設置爲非阻塞模式,能夠配合selector使用
        socketChannel.configureBlocking(false);
        //打開selector
        Selector selector = Selector.open();
        //註冊客戶端channel到多路複用器,監聽鏈接事件
        socketChannel.register(selector, SelectionKey.OP_CONNECT);
        //鏈接到指定地址
        socketChannel.connect(new InetSocketAddress("localhost",8089));
        while (true){
            try{
                    //執行selector方法,阻塞獲取channel事件的觸發
                    int num = selector.select();
                    //獲取註冊到多路複用器上的SelectionKey
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    //經過迭代器遍歷SelectionKey
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey selectionKey = iterator.next();
                        //若是SelectionKey觸發的事件爲鏈接就緒
                        if(selectionKey.isConnectable()){
                            //拿到SelectionKey的客戶端channel
                            SocketChannel client = (SocketChannel) selectionKey.channel();
                            if(client.isConnectionPending()){
                                //完成鏈接
                                client.finishConnect();
                                //新建一個寫buffer
                                ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
                                //寫入客戶端鏈接成功消息
                                writeBuffer.put((client.toString()+":鏈接成功!").getBytes());
                                //翻轉讀寫操做 執行寫操做
                                writeBuffer.flip();
                                //寫入buffer數據刅客戶端
                                client.write(writeBuffer);
                                //開闢一個線程寫,由於標準輸入是阻塞的,當前線程不能阻塞寫
                                ExecutorService executorService = Executors.newSingleThreadExecutor();
                                executorService.submit(()->{
                                    while (true){
                                        writeBuffer.clear();
                                        InputStreamReader reader = new InputStreamReader(System.in);
                                        BufferedReader br = new BufferedReader(reader);
                                        String msg = br.readLine();
                                        //每次讀入一行,寫入數據到buffer而且寫入客戶端channel
                                        writeBuffer.put(msg.getBytes());
                                        writeBuffer.flip();
                                        client.write(writeBuffer);
                                    }
                                });
                            }
                            //註冊客戶端可讀事件到多路複用器
                            client.register(selector,SelectionKey.OP_READ);
                            //若是多路複用器上的SelectionKey處於讀就緒狀態
                        }else if(selectionKey.isReadable()){
                            //拿到SelectionKey觸發相應事件對應的客戶端channel,執行讀操做
                            SocketChannel client = (SocketChannel) selectionKey.channel();
                            //建立一個新的讀buffer,
                            ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                            //從準備好讀操做的channel中讀取數據
                            int count = client.read(readBuffer);
                            if (count>0){
                                //轉碼並數據使用String保存且打印
                                String reMsg = new String(readBuffer.array(),0,count);
                                System.out.println(reMsg);
                            }else if(count==-1) client.close();//關閉客戶端
                        }
                    }
                    selectionKeys.clear();//每次處理完一個SelectionKey的事件,把該SelectionKey刪除
                }
            catch (Exception e){
                e.printStackTrace();
            }
        }
    }
}複製代碼

  • 測試

1.建立一個服務端和三個客戶端

2.客戶端1,2,3分別發送數據

服務端拿到鏈接信息和三個客戶端發送信息

客戶端1先建立,拿到2,3鏈接信息和2,3發送信息

客戶端2先於3建立,拿到3鏈接信息和1,3發送信息

客戶端3最後建立,只能拿到1,2發送信息

3.此時再使用nc命令建立一個客戶端

發送信息,客戶端能夠收到

客戶端2發送信息,該終端客戶端也能夠收到

NIO案例—跨端口傳輸數據—MultiServer

實現目標:服務端監聽兩個端口,一個8089,一個8090,8089只有惟一的一個主客戶端A鏈接,8090有多個客戶端B鏈接,客戶端A接收多個客戶端B鏈接的發送的消息,實現跨端口的消息轉發

  • 服務端

咱們先看服務端,服務端首先須要監聽兩個端口,咱們建立兩個服務端channel;服務端接收到鏈接後監聽客戶端B的發送數據事件(也就是客戶端writable服務端readable事件);拿到客戶端B的消息後,把它發送到客戶端A

服務端怎麼發送數據到客戶端A?

保存一個客戶端channel集合,爲不一樣端口客戶端分配不一樣的id的結尾部分,客戶端A分配爲wxq],客戶端B分配爲gzh],在他們channel建立的時候保存到HashMap中,channel做爲key,id做爲值保存

下面說一下服務端流程:

  1. 建立兩個服務端channel,綁定不一樣端口
  2. 建立一個多路複用器selector,把兩個服務端註冊到selector上,並監聽acceptable事件
  3. 執行selector.select()方法,拿到SelectionKey集合,對不一樣事件作不停處理
    1. 若是事件爲接收就緒,經過SelectionKey.channel()方法拿到服務端channel,根據端口不一樣註冊不一樣的監聽事件,若是是8090的,說明是客戶端B的鏈接完成,拿到客戶端B的channel,監聽它的可讀事件,而且分配id後綴爲gzh]而且保存;若是是8089端口的服務端channel,說明是客戶端A的鏈接完成,客戶端客戶端A的channel,監聽它的可寫事件,而且分配id後綴爲wxq],保存到hashmap
    2. 若是事件是讀就緒,說明客戶端B已經完後數據的寫操做,能夠讀取客戶端B的數據,執行讀取;首先把數據讀取並寫入到readBuffer,使用new String(readBuffer.array()建立即將發送的msg,遍歷客戶端channel的key,若是後綴爲wxq],說明是客戶端A,則把數據寫入writeBuffer中,並把數據寫入客戶端A的channel中
  4. 每次SelectionKey的事件執行完畢,把該SelectionKey刪除

代碼:

public class Server {
    private static int CAPACITY = 1024;
    private static ByteBuffer readBuffer = ByteBuffer.allocate(CAPACITY);
    private static ByteBuffer writeBuffer = ByteBuffer.allocate(CAPACITY);
    private static Map<SocketChannel,String> clientMap = new HashMap<>();

    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannelWxq = ServerSocketChannel.open();
        ServerSocketChannel serverSocketChannelGzh = ServerSocketChannel.open();
        serverSocketChannelGzh.configureBlocking(false);
        serverSocketChannelWxq.configureBlocking(false);
        ServerSocket serverSocketWxq = serverSocketChannelWxq.socket();
        ServerSocket serverSocketGzh = serverSocketChannelGzh.socket();
        serverSocketWxq.bind(new InetSocketAddress(8089));
        System.out.println("監聽8089:微信牆服務端口");
        serverSocketGzh.bind(new InetSocketAddress(8090));
        System.out.println("監聽8090:公衆號服務端口");
        Selector selector = Selector.open();
        serverSocketChannelWxq.register(selector, SelectionKey.OP_ACCEPT);
        serverSocketChannelGzh.register(selector, SelectionKey.OP_ACCEPT);
        while (true){
            int num = selector.select();
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            selectionKeys.forEach(selectionKey->{
                try {
                    if(selectionKey.isAcceptable()){
                        ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
                        SocketChannel client = null;
                        client = server.accept();
                        client.configureBlocking(false);
                        String key = "";
                        if(server==serverSocketChannelGzh) {//若是是公衆號server,註冊其客戶端的可讀事件
                            client.register(selector, SelectionKey.OP_READ);
                            key = "["+ UUID.randomUUID()+":gzh]";
                        }else if(server==serverSocketChannelWxq){//若是是
                            client.register(selector,SelectionKey.OP_WRITE);
                            key = "["+ UUID.randomUUID()+":wxq]";
                        }
                        System.out.println(key+":鏈接成功!");
                        clientMap.put(client,key);
                    }else if(selectionKey.isReadable()){
                        SocketChannel channel = (SocketChannel) selectionKey.channel();
                        readBuffer.clear();
                        int read = 0;
                        while(true){
                            int byteRead = channel.read(readBuffer);
                            if(byteRead<=0){
                                break;
                            }
                            readBuffer.flip();
                            channel.write(readBuffer);
                            read += byteRead;
                            readBuffer.clear();
                        }
                        String reMsg = new String(readBuffer.array(),0,read);
                        System.out.println(clientMap.get(channel)+" send to wxq: "+reMsg);
                        //寫入微信牆服務
                        for(SocketChannel ch:clientMap.keySet()){
                            if(ch!=channel) {
                                String key = clientMap.get(ch);
                                if(key.endsWith("wxq]")) {
                                    writeBuffer.clear();
                                    writeBuffer.put(("來自" + clientMap.get(channel) + ":" + reMsg).getBytes(StandardCharsets.UTF_8));
                                    writeBuffer.flip();
                                    ch.write(writeBuffer);
                                }
                            }
                        }
                    }

                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
            selectionKeys.clear();//每次處理完一個SelectionKey的事件,把該SelectionKey刪除
        }
    }
}複製代碼

到此,服務端寫完了,你就可使用linux或者win下的nc命令鏈接到服務端,模擬客戶端A和客戶端B發送消息

客戶端發送消息後會會寫一條是由於我在接收到消息後把消息寫入客戶端B的buffer中了

  • 客戶端B—發送消息

客戶端B負責發送消息,主要事件就是負責寫數據

流程:

  1. 建立一個客戶端channelSocketChannel,打開一個多留複用器selector,綁定可鏈接事件,鏈接到服務端監聽的8090端口
  2. 執行selector.select()方法,處理鏈接就緒寫就緒兩個事件
    1. 若是事件爲鏈接就緒,只須要拿到channel,執行finishConnect方法完成鏈接,而且註冊監聽事件爲可寫事件
    2. 若是事件爲寫就緒,執行寫操做,使用標準輸入從控制檯讀取輸入而且寫入writebuffer中,經過channel.write()方法寫入數據到客戶端
  3. 清理事件的SelectionKey

代碼:

public class GzhClient {
    public static void main(String[] args) throws IOException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        Selector selector = Selector.open();
        socketChannel.register(selector, SelectionKey.OP_CONNECT);
        socketChannel.connect(new InetSocketAddress("localhost",8090));
        while (true){
            try{
                int num = selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    if(selectionKey.isConnectable()){
                        SocketChannel client = (SocketChannel) selectionKey.channel();
                        if(client.isConnectionPending()){
                            client.finishConnect();
                        }
                        client.register(selector,SelectionKey.OP_WRITE);
                    }else if(selectionKey.isWritable()){
                        SocketChannel client = (SocketChannel) selectionKey.channel();
                        ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
                        writeBuffer.clear();
                        InputStreamReader reader = new InputStreamReader(System.in);
                        BufferedReader br = new BufferedReader(reader);
                        String msg = br.readLine();
                        //每次讀入一行,寫入數據到buffer而且寫入客戶端channel
                        writeBuffer.put(msg.getBytes());
                        writeBuffer.flip();
                        client.write(writeBuffer);
                    }
                }
                selectionKeys.clear();//每次處理完一個SelectionKey的事件,把該SelectionKey刪除
            }
            catch (Exception e){
                e.printStackTrace();
            }
        }
    }
}
複製代碼

  • 客戶端A—接收服務端轉發消息

客戶端A負責發送消息,主要事件就是負責讀數據

流程:

  1. 建立一個客戶端channelSocketChannel,打開一個多留複用器selector,綁定可鏈接事件,鏈接到服務端監聽的8089端口
  2. 執行selector.select()方法,處理鏈接就緒讀就緒兩個事件
    1. 若是事件爲鏈接就緒,只須要拿到channel,執行finishConnect方法完成鏈接,而且註冊監聽事件爲可寫事件
    2. 若是事件爲讀就緒,執行讀操做,把channel中數據使用read()方法讀取到readBuffer中,經過new String(readBuffer.array()方法接收String類型數據,而且打印到控制檯
  3. 清理事件的SelectionKey

代碼:

public class WxQClient {
    public static void main(String[] args) throws IOException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        Selector selector = Selector.open();
        socketChannel.register(selector, SelectionKey.OP_CONNECT);
        socketChannel.connect(new InetSocketAddress("localhost",8089));
        while (true){
            try{
                int num = selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    if(selectionKey.isConnectable()){
                        SocketChannel client = (SocketChannel) selectionKey.channel();
                        if(client.isConnectionPending()){
                            client.finishConnect();
                        }
                        client.register(selector,SelectionKey.OP_READ);
                    }else if(selectionKey.isReadable()){
                        SocketChannel client = (SocketChannel) selectionKey.channel();
                        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                        int count = client.read(readBuffer);
                        if (count>0){
                            String reMsg = new String(readBuffer.array(),0,count);
                            System.out.println(reMsg);
                        }else if(count==-1) client.close();//關閉客戶端
                    }
                }
                selectionKeys.clear();//每次處理完一個SelectionKey的事件,把該SelectionKey刪除
            }
            catch (Exception e){
                e.printStackTrace();
            }
        }
    }
}複製代碼

至此,咱們服務端和客戶端AB都已經完後,如今咱們測試一下

  1. 啓動服務端,啓動一個WxQClient也就是ClientA,啓動兩個GzhClient,也就是ClientB

服務端顯示鏈接成功

  1. 客戶端B發送消息

服務端接收到消息並打印,並轉發到客戶端A,客戶端A打印消息

相關文章
相關標籤/搜索