java網絡編程實戰 - 原生NIO非阻塞式通信網絡編程實戰

前言java

上次提到要改進咱們的RPC框架,這周花時間研究一下JDK提供給咱們的原生NIO非阻塞式網絡編程思想。NIO 庫是在 JDK 1.4 中引入的。NIO 彌補了原來的 I/O 的不足,它在標準 Java 代碼中提供了高速的、面向塊的 I/O。編程

image.png


BIO與NIO的主要區別數組

1. 面向流和麪向緩衝
緩存

java NIO和BIO之間第一個最大的區別是,BIO是面向流的,NIO是面向緩衝區的。 Java IO面向流意味着每次從流中讀一個或多個字節,直至讀取全部字節,它們沒有被緩存在任何地方。此外,它不能先後移動流中的數據。若是須要先後移動從流中讀取的數據,須要先將它緩存到一個緩衝區。 Java NIO的緩衝導向方法略有不一樣。數據讀取到一個它稍後處理的緩衝區,須要時可在緩衝區中先後移動。這就增長了處理過程當中的靈活性。可是,還須要檢查是否該緩衝區中包含全部須要處理的數據。並且,需確保當更多的數據讀入緩衝區時,不要覆蓋緩衝區裏還沒有處理的數據。服務器

2. 阻塞與非阻塞網絡

Java BIO的各類流是阻塞的。這意味着,當一個線程調用read() 或 write()時,該線程被阻塞,直到有一些數據被讀取,或數據徹底寫入。該線程在此期間不能再幹任何事情了。併發

Java NIO的非阻塞模式,使一個線程從某通道發送請求讀取數據,可是它僅能獲得目前可用的數據,若是目前沒有數據可用時,就什麼都不會獲取。而不是保持線程阻塞,因此直至數據變的能夠讀取以前,該線程能夠繼續作其餘的事情。 非阻塞寫也是如此。一個線程請求寫入一些數據到某通道,但不須要等待它徹底寫入,這個線程同時能夠去作別的事情。 線程一般將非阻塞IO的空閒時間用於在其它通道上執行IO操做,因此一個單獨的線程如今能夠管理多個輸入和輸出通道(channel)。框架

3. NIO特有的Selector選擇器機制socket

Java NIO的選擇器容許一個單獨的線程來監視多個輸入通道,你能夠註冊多個通道使用一個選擇器,而後使用一個單獨的線程來「選擇」通道:這些通道里已經有能夠處理的輸入,或者選擇已準備寫入的通道。這種選擇機制,使得一個單獨的線程很容易來管理多個通道。ide

 

今天咱們就基於以上的理解,實現一個端對端的非阻塞式IO的網絡編程。


實戰設計

客戶端部分

/**
* @author andychen https://blog.51cto.com/14815984
* @description:NIO客戶端核心處理器
*/
public class NioClientHandler implements Runnable {
   //服務端主機
   private final String host;
   //服務端口
   private final int port;
   /**定義NIO選擇器:用於註冊和監聽事件
    * 選擇監聽的事件類型: OP_READ 讀事件 / OP_WRITE 寫事件
    * OP_CONNECT 客戶端鏈接事件 / OP_ACCEPT 服務端接收通道鏈接事件
    */
   private Selector selector = null;
   //定義客戶端鏈接通道
   private SocketChannel channel = null;
   //運行狀態是否被激活
   private volatile boolean activated=false;
   public NioClientHandler(String host, int port) {
       this.port = port;
       this.host = host;
       this.init();
   }

   /**
    * 處理器初始化
    * 負責創建鏈接準備工做
    */
   private void init(){
       try {
           //建立並打開選擇器
           this.selector =  Selector.open();
           //創建並打開監聽通道
           this.channel = SocketChannel.open();
           /**
            * 設置通道通信模式爲非阻塞,NIO默認爲阻塞式的
            */
           this.channel.configureBlocking(false);
           //激活運行狀態
           this.activated = true;
       } catch (IOException e) {
           e.printStackTrace();
           this.stop();
       }
   }

   /**
    * 鏈接服務器
    */
   private void connect(){
       try {
           /**
            * 鏈接服務端:由於以前設置了通信模式爲非阻塞
            * 這裏會當即返回TCP握手是否已創建
            */
           if(this.channel.connect(new InetSocketAddress(this.host, this.port))){
               //鏈接創建後,在通道上註冊讀事件關注,客戶端一接收到數據當即觸發處理
               this.channel.register(this.selector, SelectionKey.OP_READ);
           }
           else{
               //若鏈接握手未創建,則在通道上繼續關注鏈接事件,一旦鏈接創建繼續進行後續的處理邏輯
               this.channel.register(this.selector, SelectionKey.OP_CONNECT);
           }
       } catch (IOException e) {
           e.printStackTrace();
           this.stop();
       }
   }

   /**
    * 選擇器事件迭代處理
    * @param keys 選擇器事件KEY
    */
   private void eventIterator(Set<SelectionKey> keys){
       SelectionKey key = null;
       //這裏採用迭代器,由於須要迭代時對key進行移除操做
       Iterator<SelectionKey> it = keys.iterator();
       while (it.hasNext()){
           key = it.next();
           //這裏先移除事件key,避免屢次處理
           it.remove();
           //處理迭代事件
           this.proccessEvent(key);
       }
   }

   /**
    * 處理髮生的事件
    * @param key 選擇器事件KEY
    */
   private void proccessEvent(SelectionKey key){
       //只對有效的事件類型進行處理
       if(key.isValid()){
           try {
               //在事件通道上處理
               SocketChannel socketChannel = (SocketChannel) key.channel();
               /**處理鏈接就緒事件
               * */
               if(key.isConnectable()){
                   //檢測鏈接是否完成,避免發生致使NotYetConnectedException異常
                   if(socketChannel.finishConnect()){
                       System.out.println("Has completed connection with server..");
                       /**
                        * 在通道上關注讀事件,NO的寫事件通常不特別關注,
                        * 緣由:寫緩衝區大部分時間被認爲是空閒的,會頻繁被選擇器選擇(會浪費CPU資源),
                        *       因此不該該頻繁被註冊;
                        * 只有在寫的數據超過寫緩衝區可用空間時,把一部分數據刷出緩衝區後,
                        * 有空間時再通知應用程序進行寫;
                        * 且應用程序寫完後,應當即關閉寫事件
                        */
                        socketChannel.register(this.selector, SelectionKey.OP_READ);
                   }else{//這裏若鏈接仍未創建通常視爲網絡或其餘緣由,暫時退出
                       this.stop();
                   }
               }
               /**
                * 處理讀事件
                */
               if(key.isReadable()){
                   //開闢內存緩衝區,這裏用JVM堆內存
                   ByteBuffer buffer = ByteBuffer.allocate(Constant.BUF_SIZE);
                   //將通道中的數據讀到緩衝區
                   int length = socketChannel.read(buffer);
                   if(0 < length){
                       /**
                        * 進行讀寫轉換,NIO固定範式
                        */
                       buffer.flip();
                       //獲取buffer可用空間
                       int size = buffer.remaining();
                       byte[] bytes = new byte[size];
                       //讀Buffer
                       buffer.get(bytes);
                       //獲取緩衝區數據
                       String result = new String(bytes,"utf-8");
                       System.out.println("Recevied server message: "+result);
                   }else if(0 > length){
                       //取消關注當前事件,關閉通道
                       key.cancel();
                       socketChannel.close();
                   }
               }
           } catch (Exception e) {
               key.cancel();
               if(null != key.channel()){
                   try {
                       key.channel().close();
                   } catch (IOException ex) {
                       ex.printStackTrace();
                   }
               }
               e.printStackTrace();
           }
       }
   }

   /**
    * 寫數據到對端
    * @param data
    */
   public void write(String data){
       try {
           byte[] bytes = data.getBytes();
           ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
           //將數據放入寫緩衝區
           buffer.put(bytes);
           buffer.flip();
           this.channel.write(buffer);
       } catch (IOException e) {
           e.printStackTrace();
       }
   }

   /**
    * 中止運行
    */
   public void stop(){
       this.activated = false;
       System.exit(-1);
   }

   /**
    * 客戶端通信業務覈實現
    */
   @Override
   public void run() {
       //創建服務器鏈接
       this.connect();
       //持續監聽各類事件的發生
       while (this.activated){
           try {
               //監聽事件是否發生,若發生直接返回;反之阻塞至事件發生
               this.selector.select();
           } catch (IOException e) {
               e.printStackTrace();
               this.stop();
           }
           //獲取發生事件的類型
           Set<SelectionKey> keys = this.selector.selectedKeys();
           //迭代處理事件
           this.eventIterator(keys);
       }
       //關閉選擇器
       if(null != this.selector){
           try {
               this.selector.close();
           } catch (IOException e) {
               e.printStackTrace();
           }
       }
       this.stop();
   }
}

/**
* @author andychen https://blog.51cto.com/14815984
* @description:NIO客戶端啓動器
*/
public class NioClientStarter {
   private static NioClientHandler clientHandler = null;

   /*啓動運行客戶端*/
   public static void main(String[] args) {
       try {
           clientHandler = new NioClientHandler(Constant.SERV_HOST, Constant.SERV_PORT);
           new Thread(clientHandler).start();
       } catch (Exception e) {
           e.printStackTrace();
       }
       /**
        * 在控制檯發實時數據到對端
        */
       Scanner scanner = new Scanner(System.in);
       while (true){
           String data = scanner.next();
           if(null != data && !"".equals(data)){
               clientHandler.write(data);
           }
       }
   }
}


服務端部分

/**
* @author andychen https://blog.51cto.com/14815984
* @description:NIO服務端核心處理器
*/
public class NioServerHandler  implements Runnable{
   private final int port;
   //定義選擇器
   private Selector selector = null;
   /**
    * 定義服務端通道: 與客戶端相似的思路
    */
   private ServerSocketChannel channel = null;
   //服務器運行是否被激活
   private volatile boolean activated = false;
   public NioServerHandler(int port) {
       this.port = port;
       this.init();
   }

   /**
    * 初始化處理器
    * 負責作好運行監聽和接收以前的準備
    */
   private void init(){
       try {
           //建立並打開選擇器
           this.selector = Selector.open();
           //建立並打開監聽通道
           this.channel = ServerSocketChannel.open();
           /**
            * 設置通道通信模式爲非阻塞(NIO默認爲阻塞)
            */
           this.channel.configureBlocking(false);
           //綁定監聽的服務端口
           this.channel.socket().bind(new InetSocketAddress(this.port));
           /**
            * 註冊在服務端通道上,首先關注的事件
            */
           this.channel.register(this.selector, SelectionKey.OP_ACCEPT);
           //設置運行狀態激活
           this.activated = true;
       } catch (IOException e) {
           e.printStackTrace();
           this.stop();
       }
   }

   /**
    * 中止服務
    */
   public void stop(){
       this.activated = false;
       try {
           //關閉選擇器
           if(null != this.selector){
               if(this.selector.isOpen()){
                   this.selector.close();
               }
               this.selector = null;
           }
           //關閉通道
           if(null != this.channel){
               if(this.channel.isOpen()){
                   this.channel.close();
               }
               this.channel = null;
           }
       } catch (IOException e) {
           e.printStackTrace();
       }
       System.exit(-1);
   }

   /**
    * 在迭代處理髮生的事件
    * @param keys 發生的事件類型
    */
   private void eventIterator(Set<SelectionKey> keys){
       //SelectionKey key = null;
       Iterator<SelectionKey> it = keys.iterator();
       while (it.hasNext()){
           SelectionKey key = it.next();
           /**
            * 這裏先從迭代器移除,避免後面重複執行
            */
           it.remove();
           //處理事件
           this.proccessEvent(key);
       }
   }

   /**
    *
    * @param key 選擇執行的事件KEY
    */
   private void proccessEvent(SelectionKey key){
       //只對有效的事件KEY執行處理
       if(key.isValid()){
           try {
               /**
                * 處理通道接收數據事件
                */
               if(key.isAcceptable()){
                   /**
                    * 注意這裏接收事件的通道是服務端通道
                    */
                   ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
                   //接收客戶端Socket
                   SocketChannel channel = serverChannel.accept();
                   //設置其爲非阻塞
                   channel.configureBlocking(false);
                   //而後註冊此通道的讀事件
                   channel.register(this.selector, SelectionKey.OP_READ);
                   System.out.println("Build connection with client..");
               }
               /**
                * 處理讀事件
                */
               if(key.isReadable()){
                   System.out.println("Reading client data...");
                   SocketChannel channel = (SocketChannel) key.channel();
                   //開闢內存空間,接收數據
                   ByteBuffer buffer = ByteBuffer.allocate(Constant.BUF_SIZE);
                   //將數據讀入緩衝區
                   int length = channel.read(buffer);
                   if(0 < length){
                       //讀寫切換
                       buffer.flip();
                       //更具緩衝區數據創建轉換的字節數組
                       byte[] bytes = new byte[buffer.remaining()];
                       //從緩衝區讀取字節數據
                       buffer.get(bytes);
                       //解碼數據
                       String data = new String(bytes, "utf-8");
                       System.out.println("Recevied data: "+data);
                       //向對端發送接收應答
                       String answer = "Server has recevied data:"+data;
                       this.reply(channel, answer);
                   }else if(0 > length){
                       //取消處理的事件
                       key.cancel();
                       channel.close();
                   }
               }
               /**
                * 處理寫事件
                */
               if(key.isWritable()){
                   SocketChannel channel = (SocketChannel) key.channel();
                   //拿到寫事件的buffer
                   ByteBuffer buffer = (ByteBuffer) key.attachment();
                   //若buffer中有數據,則刷到對端
                   if(buffer.hasRemaining()){
                        int length = channel.write(buffer);
                        System.out.println("Write data "+length+" byte to client.");
                   }else{
                       //若沒有數據,則繼續監聽讀事件
                       key.interestOps(SelectionKey.OP_READ);
                   }
               }
           } catch (IOException e) {
               key.cancel();
               e.printStackTrace();
           }
       }
   }

   /**
    * 應答對端
    * @param msg 應答消息
    */
   private void reply(SocketChannel channel, String msg){
       //消息編碼
       byte[] bytes = msg.getBytes();
       //開啓寫緩衝區
       ByteBuffer buffer = ByteBuffer.allocate(Constant.BUF_SIZE);
       //將數據寫入緩衝區
       buffer.put(bytes);
       //切換到讀事件
       buffer.flip();
       /**
        * 這裏爲了避免出現寫空或寫溢出緩衝區狀況,創建寫事件監聽同時保留以前的讀監聽
        * 做爲監聽的附件傳入寫操做的buffer
        */
       try {
           channel.register(this.selector, SelectionKey.OP_WRITE |SelectionKey.OP_READ, buffer);
       } catch (ClosedChannelException e) {
           e.printStackTrace();
       }
   }
   /**
    * 服務端監聽運行核心業務實現
    */
   @Override
   public void run() {
       while (this.activated){
           try {
               /**
                * 運行到此方法阻塞,直到有事件發生再返回
               * */
               this.selector.select();
               //獲取被監聽的事件
               Set<SelectionKey> keys = this.selector.selectedKeys();
               //在迭代器中,處理不一樣的事件
               this.eventIterator(keys);
           } catch (IOException e) {
               e.printStackTrace();
               this.stop();
           }
       }
   }
}

/**
* @author andychen https://blog.51cto.com/14815984
* @description:NIO網絡編程服務端啓動類
*/
public class NioServerStart {

   /**
    * 運行服務端監聽
    * @param args
    */
   public static void main(String[] args) {
       String serverTag = "server: "+Constant.SERV_PORT;
       NioServerHandler serverHandler = null;
       try {
           serverHandler = new NioServerHandler(Constant.SERV_PORT);
           new Thread(serverHandler, serverTag).start();
           System.out.println("Starting "+serverTag+" listening...");
       } catch (Exception e) {
           e.printStackTrace();
           if(null != serverHandler){
               serverHandler.stop();
           }
       }
   }
}


屢次驗證結果

image.png

image.png


總結

經過以上的實戰,咱們看到NIO網絡編程實現比BIO稍微要複雜一些。面向緩衝的機制確實比面向流的機制要靈活不少;服務運行的體驗也比阻塞式IO更加流暢;獨有的選擇器機制也讓NIO能夠支撐較大併發數,但學習和開發的成本稍微高一些,項目當中能夠有選擇地使用。

目前網絡編程這塊用得比較多的優秀IO框架非Netty莫屬了,不少優秀的RPC框架的底層也基於Netty擴展和開發。下次咱們就順帶給你們展現一下Netty的網絡編程之美。

相關文章
相關標籤/搜索