04.第四階段、基於Netty的RPC架構實戰演練-1. nio

1、概要java

傳統IO特色
阻塞點
server.accept();
inputStream.read(bytes);

單線程狀況下只能有一個客戶端


用線程池能夠有多個客戶端鏈接,可是很是消耗性能


=======================分割線==========================

NIO的特色

ServerSocketChannel	ServerSocket

SocketChannel		Socket

Selector

SelectionKey

NIO的一些疑問
  1. 客戶端關閉的時候會拋出異常,死循環服務器

    解決方案socket

    int read = channel.read(buffer);
     	if(read > 0){
     		byte[] data = buffer.array();
     		String msg = new String(data).trim();
     		System.out.println("服務端收到信息:" + msg);
    
     		//回寫數據
     		ByteBuffer outBuffer = ByteBuffer.wrap("好的".getBytes());
     		channel.write(outBuffer);// 將消息回送給客戶端
     	}else{
     		System.out.println("客戶端關閉");
     		key.cancel();
     	}

二、selector.select();阻塞,那爲何說nio是非阻塞的IO?ide

selector.select()
selector.select(1000);不阻塞
selector.wakeup();也能夠喚醒selector
selector.selectNow();也能夠立馬返還

有的同窗說了,怎麼證實這個write是wakeup方法調用的,而不是其餘方法呢,這個很好證實,咱們多調用幾回:


public class SelectorTest {  
	public static void main(String[] args) throws Exception {  
		Selector selector = Selector.open();  
		selector.wakeup();  
		selector.selectNow();  
		selector.wakeup();  
		selector.selectNow();  
		selector.wakeup();  
	}  
}  

	修改程序調用三次wakeup,心細的朋友確定注意到咱們還調用了兩次selectNow,這是由於在兩次成功的select方法之間調用wakeup多 次都只算作一次,爲了顯示3次write,這裏就每次調用前select一下將前一次寫入的字節讀到,一樣執行上面的strace調用,輸出:




Process 29313 attached  
[pid 29303] write(36, "\1", 1)          = 1  
[pid 29303] write(36, "\1", 1)          = 1  
[pid 29303] write(36, "\1", 1)          = 1  
Process 29313 detached  


	 果真是3次write的系統調用,都是寫入一個字節,若是咱們去掉selectNow,那麼三次wakeup仍是等於一次:

public class SelectorTest {  
	public static void main(String[] args) throws Exception {  
		Selector selector = Selector.open();  
		selector.wakeup();  
		selector.wakeup();  
		selector.wakeup();  
	}  
}  


   輸出:


Process 29339 attached  
Process 29340 attached  
Process 29341 attached  
[pid 29331] write(36, "\1", 1)          = 1  
Process 29341 detached  
Process 29337 detached  


	  wakeup方法的API說明沒有欺騙咱們。wakeup方法的API還告訴咱們,若是當前Selector沒有阻塞在select方法上,那麼本次 wakeup調用會在下一次select阻塞的時候生效,這個道理很簡單,wakeup方法寫入一個字節,下次poll等待的時候當即發現可讀並返回,因 此不會阻塞。
  1. SelectionKey.OP_WRITE是表明什麼意思性能

    OP_WRITE表示底層緩衝區是否有空間,是則響應返還true測試

    NIO.pngthis

    socketIO.png.net

    oio線程

    package oio;
    
    	import java.io.IOException;
    	import java.io.InputStream;
    	import java.net.ServerSocket;
    	import java.net.Socket;
    	import java.util.concurrent.ExecutorService;
    	import java.util.concurrent.Executors;
    
    	/**
    	 * description:
    	 *
    	 * @author: dawn.he QQ:       905845006
    	 * @email: dawn.he@cloudwise.com
    	 * @email: 905845006@qq.com
    	 * @date: 2019/10/7    11:24 AM
    	 */
    
    	public class OioServer {
    
    		@SuppressWarnings("resource")
    		public static void main(String[] args) throws Exception {
    
    			ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
    			//建立socket服務,監聽10101端口
    			ServerSocket server=new ServerSocket(10101);
    			System.out.println("服務器啓動!");
    			while(true){
    				//獲取一個套接字(阻塞)
    				final Socket socket = server.accept();
    				System.out.println("來個一個新客戶端!");
    				newCachedThreadPool.execute(new Runnable() {
    
    					@Override
    					public void run() {
    						//業務處理
    						handler(socket);
    					}
    				});
    
    			}
    		}
    
    		/**
    		 * 讀取數據
    		 * @param socket
    		 * @throws Exception
    		 */
    		public static void handler(Socket socket){
    			try {
    				byte[] bytes = new byte[1024];
    				InputStream inputStream = socket.getInputStream();
    
    				while(true){
    					System.out.println("read前");
    
    					//讀取數據(阻塞)
    					int read = inputStream.read(bytes);
    					System.out.println("read後");
    					if(read != -1){
    						System.out.println(new String(bytes, 0, read));
    					}else{
    						break;
    					}
    				}
    			} catch (Exception e) {
    				e.printStackTrace();
    			}finally{
    				try {
    					System.out.println("socket關閉");
    					socket.close();
    				} catch (IOException e) {
    					e.printStackTrace();
    				}
    			}
    		}
    	}

    niocode

    package nio;
    
    	/**
    	 * description:
    	 *
    	 * @author: dawn.he QQ:       905845006
    	 * @email: dawn.he@cloudwise.com
    	 * @email: 905845006@qq.com
    	 * @date: 2019/10/7    12:11 PM
    	 */
    
    	import java.io.IOException;
    	import java.net.InetSocketAddress;
    	import java.nio.ByteBuffer;
    	import java.nio.channels.SelectionKey;
    	import java.nio.channels.Selector;
    	import java.nio.channels.ServerSocketChannel;
    	import java.nio.channels.SocketChannel;
    	import java.util.Iterator;
    
    	public class NIOServer {
    		// 通道管理器
    		private Selector selector;
    
    		/**
    		 * 得到一個ServerSocket通道,並對該通道作一些初始化的工做
    		 *
    		 * @param port
    		 *            綁定的端口號
    		 * @throws IOException
    		 */
    		public void initServer(int port) throws IOException {
    			// 得到一個ServerSocket通道
    			ServerSocketChannel serverChannel = ServerSocketChannel.open();
    			// 設置通道爲非阻塞
    			serverChannel.configureBlocking(false);
    			// 將該通道對應的ServerSocket綁定到port端口
    			serverChannel.socket().bind(new InetSocketAddress(port));
    			// 得到一個通道管理器
    			this.selector = Selector.open();
    			// 將通道管理器和該通道綁定,併爲該通道註冊SelectionKey.OP_ACCEPT事件,註冊該事件後,
    			// 當該事件到達時,selector.select()會返回,若是該事件沒到達selector.select()會一直阻塞。
    			serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    		}
    
    		/**
    		 * 採用輪詢的方式監聽selector上是否有須要處理的事件,若是有,則進行處理
    		 *
    		 * @throws IOException
    		 */
    		public void listen() throws IOException {
    			System.out.println("服務端啓動成功!");
    			// 輪詢訪問selector
    			while (true) {
    				// 當註冊的事件到達時,方法返回;不然,該方法會一直阻塞
    				selector.select();
    				// 得到selector中選中的項的迭代器,選中的項爲註冊的事件
    				Iterator<?> ite = this.selector.selectedKeys().iterator();
    				while (ite.hasNext()) {
    					SelectionKey key = (SelectionKey) ite.next();
    					// 刪除已選的key,以防重複處理
    					ite.remove();
    
    					handler(key);
    				}
    			}
    		}
    
    		/**
    		 * 處理請求
    		 *
    		 * @param key
    		 * @throws IOException
    		 */
    		public void handler(SelectionKey key) throws IOException {
    
    			// 客戶端請求鏈接事件
    			if (key.isAcceptable()) {
    				handlerAccept(key);
    				// 得到了可讀的事件
    			} else if (key.isReadable()) {
    				handelerRead(key);
    			}
    		}
    
    		/**
    		 * 處理鏈接請求
    		 *
    		 * @param key
    		 * @throws IOException
    		 */
    		public void handlerAccept(SelectionKey key) throws IOException {
    			ServerSocketChannel server = (ServerSocketChannel) key.channel();
    			// 得到和客戶端鏈接的通道
    			SocketChannel channel = server.accept();
    			// 設置成非阻塞
    			channel.configureBlocking(false);
    
    			// 在這裏能夠給客戶端發送信息哦
    			System.out.println("新的客戶端鏈接");
    			// 在和客戶端鏈接成功以後,爲了能夠接收到客戶端的信息,須要給通道設置讀的權限。
    			channel.register(this.selector, SelectionKey.OP_READ);
    		}
    
    		/**
    		 * 處理讀的事件
    		 *
    		 * @param key
    		 * @throws IOException
    		 */
    		public void handelerRead(SelectionKey key) throws IOException {
    			// 服務器可讀取消息:獲得事件發生的Socket通道
    			SocketChannel channel = (SocketChannel) key.channel();
    			// 建立讀取的緩衝區
    			ByteBuffer buffer = ByteBuffer.allocate(1024);
    			int read = channel.read(buffer);
    			if(read > 0){
    				byte[] data = buffer.array();
    				String msg = new String(data).trim();
    				System.out.println("服務端收到信息:" + msg);
    
    				//回寫數據
    				ByteBuffer outBuffer = ByteBuffer.wrap("好的".getBytes());
    				channel.write(outBuffer);// 將消息回送給客戶端
    			}else{
    				System.out.println("客戶端關閉");
    				key.cancel();
    			}
    		}
    
    		/**
    		 * 啓動服務端測試
    		 *
    		 * @throws IOException
    		 */
    		public static void main(String[] args) throws IOException {
    			NIOServer server = new NIOServer();
    			server.initServer(8000);
    			server.listen();
    		}
    
    	}
    訪問
     telnet 127.0.0.1 10101
    
     send hello
相關文章
相關標籤/搜索