Java NIO服務器端開發

1、NIO類庫簡介

  一、緩衝區Buffer

  Buffer是一個對象,包含一些要寫入和讀出的數據。java

  在NIO中,全部的數據都是用緩衝區處理的,讀取數據時,它是從通道(Channel)直接讀到緩衝區中,在寫入數據時,也是從緩衝區寫入到通道。node

  緩衝區實質上是一個數組,一般是一個字節數組(ByteBuffer),也能夠是其它類型的數組,此外緩衝區還提供了對數據的結構化訪問以及維護讀寫位置等信息。數組

  Buffer類的繼承關係以下圖所示:服務器

  

  二、通道Channel

  Channel是一個通道,網絡數據經過Channel讀取和寫入。通道和流的不一樣之處在於通道是雙向的(通道能夠用於讀、寫後者兩者同時進行),流只是在一個方向上移動。網絡

  Channel大致上能夠分爲兩類:用於網絡讀寫的SelectableChannel(ServerSocketChannel和SocketChannel就是其子類)、用於文件操做的FileChannel。dom

  下面的例子給出經過FileChannel來向文件中寫入數據、從文件中讀取數據,將文件數據拷貝到另外一個文件中:socket

public class NioTest
{
    public static void main(String[] args) throws IOException
    {
        copyFile();
    }
    //拷貝文件
    private static void copyFile()
    {
        FileInputStream in=null;
        FileOutputStream out=null;
        try
        {
            in=new FileInputStream("src/main/java/data/in-data.txt");
            out=new FileOutputStream("src/main/java/data/out-data.txt");
            FileChannel inChannel=in.getChannel();
            FileChannel outChannel=out.getChannel();
            ByteBuffer buffer=ByteBuffer.allocate(1024);
            int bytesRead = inChannel.read(buffer);
            while (bytesRead!=-1)
            {
                buffer.flip();
                outChannel.write(buffer);
                buffer.clear();
                bytesRead = inChannel.read(buffer);
            }
        }
        catch (FileNotFoundException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IOException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }    
    }
    //寫文件
    private static void writeFileNio()
    {
        try
        {
            RandomAccessFile fout = new RandomAccessFile("src/main/java/data/nio-data.txt", "rw");
            FileChannel fc=fout.getChannel();
            ByteBuffer buffer=ByteBuffer.allocate(1024);
            buffer.put("hi123".getBytes());
            buffer.flip();
            try
            {
                fc.write(buffer);
            } catch (IOException e)
            {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        } 
        catch (FileNotFoundException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    //讀文件
    private static void readFileNio()
    {
        FileInputStream fileInputStream;
        try
        {
            fileInputStream = new FileInputStream("src/main/java/data/nio-data.txt");
            FileChannel fileChannel=fileInputStream.getChannel();//從 FileInputStream 獲取通道
            ByteBuffer byteBuffer=ByteBuffer.allocate(1024);//建立緩衝區
            int bytesRead=fileChannel.read(byteBuffer);//將數據讀到緩衝區
            while(bytesRead!=-1)
            {
                /*limit=position
                 * position=0;
                 */
                byteBuffer.flip();
                //hasRemaining():告知在當前位置和限制之間是否有元素
                while (byteBuffer.hasRemaining())
                {
                    System.out.print((char) byteBuffer.get());
                }
                /*
                 * 清空緩衝區
                 * position=0;
                 * limit=capacity;
                 */
                byteBuffer.clear();
                bytesRead = fileChannel.read(byteBuffer);
            }
        } catch (FileNotFoundException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IOException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

  三、多路複用器Selector

  多路複用器提供選擇已經就緒的任務的能力。Selector會不斷的輪詢註冊在其上的Channel,若是某個Channel上面發送讀或者寫事件,這個Channel就處於就緒狀態,會被Selector輪詢出來,而後經過SelectionKey能夠獲取就緒Channel的集合,進行後續的I/O操做。ide

  一個多路複用器Selector能夠同時輪詢多個Channel,因爲JDK使用了epoll代替了傳統的select實現,因此它沒有最大鏈接句柄1024/2048的限制,意味着只須要一個線程負責Selector的輪詢,就能夠接入成千上萬的客戶端。其模型以下圖所示:url

  

  用單線程處理一個Selector。要使用Selector,得向Selector註冊Channel,而後調用它的select()方法。這個方法會一直阻塞到某個註冊的通道有事件就緒。一旦這個方法返回,線程就能夠處理這些事件,事件的例子有如新鏈接進來,數據接收等。 spa

  注:

  一、什麼select模型?

  select是事件觸發機制,當等待的事件發生就觸發進行處理,多用於Linux實現的服務器對客戶端的處理。

  能夠阻塞地同時探測一組支持非阻塞的IO設備,是否有事件發生(如可讀、可寫,有高優先級錯誤輸出等),直至某一個設備觸發了事件或者超過了指定的等待時間。也就是它們的職責不是作IO,而是幫助調用者尋找當前就緒的設備。

  二、什麼是epoll模型?

  epoll的設計思路,是把select/poll單個的操做拆分爲1個epoll_create+多個epoll_ctrl+一個wait。此外,內核針對epoll操做添加了一個文件系統」eventpollfs」,每個或者多個要監視的文件描述符都有一個對應的eventpollfs文件系統的inode節點,主要信息保存在eventpoll結構體中。而被監視的文件的重要信息則保存在epitem結構體中。因此他們是一對多的關係。

2、NIO服務器端開發

  功能說明:開啓服務器端,對每個接入的客戶端都向其發送hello字符串。

  使用NIO進行服務器端開發主要有如下幾個步驟:

  一、建立ServerSocketChannel,配置它爲非阻塞模式

    serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.configureBlocking(false);

  二、綁定監聽,配置TCP參數,如backlog大小

    serverSocketChannel.socket().bind(new InetSocketAddress(8080));

  三、建立一個獨立的I/O線程,用於輪詢多路複用器Selector

  四、建立Selector,將以前建立的ServerSocketChannel註冊到Selector上,監聽SelectionKey.ACCEPT

  selector=Selector.open();
   serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

  五、啓動I/O線程,在循環體內執行Selector.select()方法,輪詢就緒的Channel

  while(true)
   {
        try
        {
           //select()阻塞到至少有一個通道在你註冊的事件上就緒了
           //若是沒有準備好的channel,就在這一直阻塞
           //select(long timeout)和select()同樣,除了最長會阻塞timeout毫秒(參數)。
           selector.select();
        } 
        catch (IOException e)
        {
           // TODO Auto-generated catch block
           e.printStackTrace();
           break;
         }
 }

  六、當輪詢到了處於就緒狀態的Channel時,需對其進行判斷,若是是OP_ACCEPT狀態,說明是新的客戶端接入,則調用ServerSocketChannel.accept()方法接受新的客戶端

      //返回已經就緒的SelectionKey,而後迭代執行
            Set<SelectionKey> readKeys=selector.selectedKeys();
            for(Iterator<SelectionKey> it=readKeys.iterator();it.hasNext();)
            {
                SelectionKey key=it.next();
                it.remove();
                try
                {
                    if(key.isAcceptable())
                    {
                        ServerSocketChannel server=(ServerSocketChannel) key.channel();
                        SocketChannel client=server.accept();
                        client.configureBlocking(false);
                        client.register(selector,SelectionKey.OP_WRITE);
                    }
                    else if(key.isWritable())
                    {
                        SocketChannel client=(SocketChannel) key.channel();
                        ByteBuffer buffer=ByteBuffer.allocate(20);
                        String str="hello";
                        buffer=ByteBuffer.wrap(str.getBytes());
                        client.write(buffer);
                        key.cancel();
                    } 
                }catch(IOException e)
                {
                    e.printStackTrace();
                    key.cancel();
                    try
                    {
                        key.channel().close();
                    } catch (IOException e1)
                    {
                        // TODO Auto-generated catch block
                        e1.printStackTrace();
                    }
                    
                }
            }    

  七、設置新接入的客戶端鏈路SocketChannel爲非阻塞模式,配置其餘的一些TCP參數

  if(key.isAcceptable())
    {
        ServerSocketChannel server=(ServerSocketChannel) key.channel();
        SocketChannel client=server.accept();
        client.configureBlocking(false);
        ...
    }

  八、將SocketChannel註冊到Selector,監聽OP_WRITE

  client.register(selector,SelectionKey.OP_WRITE);

  九、若是輪詢的Channel爲OP_WRITE,則說明要向SockChannel中寫入數據,則構造ByteBuffer對象,寫入數據包

  else if(key.isWritable())
    {
        SocketChannel client=(SocketChannel) key.channel();
        ByteBuffer buffer=ByteBuffer.allocate(20);
        String str="hello";
        buffer=ByteBuffer.wrap(str.getBytes());
        client.write(buffer);
        key.cancel();
    } 

  完整代碼以下:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class ServerSocketChannelDemo
{
    public static void main(String[] args)
    {
        ServerSocketChannel serverSocketChannel;
        Selector selector=null;
        try
        {
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.socket().bind(new InetSocketAddress(8080));
            selector=Selector.open();
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        }
        catch (IOException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        while(true)
        {
            try
            {
                //select()阻塞到至少有一個通道在你註冊的事件上就緒了
                //若是沒有準備好的channel,就在這一直阻塞
                //select(long timeout)和select()同樣,除了最長會阻塞timeout毫秒(參數)。
                selector.select();
            } 
            catch (IOException e)
            {
                // TODO Auto-generated catch block
                e.printStackTrace();
                break;
            }
            //返回已經就緒的SelectionKey,而後迭代執行
            Set<SelectionKey> readKeys=selector.selectedKeys();
            for(Iterator<SelectionKey> it=readKeys.iterator();it.hasNext();)
            {
                SelectionKey key=it.next();
                it.remove();
                try
                {
                    if(key.isAcceptable())
                    {
                        ServerSocketChannel server=(ServerSocketChannel) key.channel();
                        SocketChannel client=server.accept();
                        client.configureBlocking(false);
                        client.register(selector,SelectionKey.OP_WRITE);
                    }
                    else if(key.isWritable())
                    {
                        SocketChannel client=(SocketChannel) key.channel();
                        ByteBuffer buffer=ByteBuffer.allocate(20);
                        String str="hello";
                        buffer=ByteBuffer.wrap(str.getBytes());
                        client.write(buffer);
                        key.cancel();
                    } 
                }catch(IOException e)
                {
                    e.printStackTrace();
                    key.cancel();
                    try
                    {
                        key.channel().close();
                    } catch (IOException e1)
                    {
                        // TODO Auto-generated catch block
                        e1.printStackTrace();
                    }
                    
                }
            }    
        }
    }
}
View Code

  咱們用telnet localhost 8080模擬出多個客戶端:

  

  程序運行結果以下:

  

3、參考資料

  一、netty權威指南(李林峯)

相關文章
相關標籤/搜索