反應堆模式:「反應」器名字中」反應「的由來:數組
注意,Reactor的單線程模式的單線程主要是針對於I/O操做而言,也就是全部的I/O的accept()、read()、write()以及connect()操做都在一個線程上完成的。服務器
先簡單介紹NIO中幾個重要對象:
Selector併發
Channels
通道,被創建的一個應用程序和操做系統交互事件、傳遞內容的渠道(注意是鏈接到操做系統)。那麼既然是和操做系統進行內容的傳遞,那麼說明應用程序能夠經過通道讀取數據,也能夠經過通道向操做系統寫數據。socket
通道中的數據老是要先讀到一個Buffer,或者老是要從一個Buffer中寫入。ide
服務端處理器:性能
/** * 類說明:nio通訊服務端處理器 */ public class NioServerHandle implements Runnable{ private Selector selector; private ServerSocketChannel serverChannel; private volatile boolean started; /** * 構造方法 * @param port 指定要監聽的端口號 */ public NioServerHandle(int port) { try { selector = Selector.open(); serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); serverChannel.socket().bind(new InetSocketAddress(port)); serverChannel.register(selector,SelectionKey.OP_ACCEPT); started = true; System.out.println("服務器已啓動,端口號:"+port); } catch (IOException e) { e.printStackTrace(); } } public void stop(){ started = false; } @Override public void run() { //循環遍歷selector while(started){ try{ //阻塞,只有當至少一個註冊的事件發生的時候纔會繼續. selector.select(); Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); it.remove(); try{ handleInput(key); }catch(Exception e){ if(key != null){ key.cancel(); if(key.channel() != null){ key.channel().close(); } } } } }catch(Throwable t){ t.printStackTrace(); } } //selector關閉後會自動釋放裏面管理的資源 if(selector != null) try{ selector.close(); }catch (Exception e) { e.printStackTrace(); } } private void handleInput(SelectionKey key) throws IOException{ if(key.isValid()){ //處理新接入的請求消息 if(key.isAcceptable()){ //得到關心當前事件的channel ServerSocketChannel ssc = (ServerSocketChannel)key.channel(); //經過ServerSocketChannel的accept建立SocketChannel實例 //完成該操做意味着完成TCP三次握手,TCP物理鏈路正式創建 SocketChannel sc = ssc.accept(); System.out.println("======socket channel 創建鏈接" ); //設置爲非阻塞的 sc.configureBlocking(false); //鏈接已經完成了,能夠開始關心讀事件了 sc.register(selector,SelectionKey.OP_READ); } //讀消息 if(key.isReadable()){ System.out.println("======socket channel 數據準備完成," + "能夠去讀==讀取======="); SocketChannel sc = (SocketChannel) key.channel(); //建立ByteBuffer,並開闢一個1M的緩衝區 ByteBuffer buffer = ByteBuffer.allocate(1024); //讀取請求碼流,返回讀取到的字節數 int readBytes = sc.read(buffer); //讀取到字節,對字節進行編解碼 if(readBytes>0){ //將緩衝區當前的limit設置爲position=0, // 用於後續對緩衝區的讀取操做 buffer.flip(); //根據緩衝區可讀字節數建立字節數組 byte[] bytes = new byte[buffer.remaining()]; //將緩衝區可讀字節數組複製到新建的數組中 buffer.get(bytes); String message = new String(bytes,"UTF-8"); System.out.println("服務器收到消息:" + message); //處理數據 String result = response(message) ; //發送應答消息 doWrite(sc,result); } //鏈路已經關閉,釋放資源 else if(readBytes<0){ key.cancel(); sc.close(); } } } } //發送應答消息 private void doWrite(SocketChannel channel,String response) throws IOException { //將消息編碼爲字節數組 byte[] bytes = response.getBytes(); //根據數組容量建立ByteBuffer ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); //將字節數組複製到緩衝區 writeBuffer.put(bytes); //flip操做 writeBuffer.flip(); //發送緩衝區的字節數組 channel.write(writeBuffer); } } public class NioServer { private static NioServerHandle nioServerHandle; public static void start(){ if(nioServerHandle !=null) nioServerHandle.stop(); nioServerHandle = new NioServerHandle(DEFAULT_PORT); new Thread(nioServerHandle,"Server").start(); } public static void main(String[] args){ start(); } }
客戶端處理器:大數據
public class NioClientHandle implements Runnable{ private String host; private int port; private Selector selector; private SocketChannel socketChannel; private volatile boolean started; public NioClientHandle(String ip, int port) { this.host = ip; this.port = port; try { //建立選擇器 selector = Selector.open(); //打開通道 socketChannel = SocketChannel.open(); //若是爲 true,則此通道將被置於阻塞模式; // 若是爲 false,則此通道將被置於非阻塞模式 socketChannel.configureBlocking(false); started = true; } catch (IOException e) { e.printStackTrace(); } } public void stop(){ started = false; } @Override public void run() { try { doConnect(); } catch (IOException e) { e.printStackTrace(); System.exit(1); } //循環遍歷selector while(started){ try { //阻塞,只有當至少一個註冊的事件發生的時候纔會繼續 selector.select(); //獲取當前有哪些事件可使用 Set<SelectionKey> keys = selector.selectedKeys(); //轉換爲迭代器 Iterator<SelectionKey> it = keys.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); it.remove(); try { handleInput(key); } catch (IOException e) { e.printStackTrace(); if(key!=null){ key.cancel(); if(key.channel()!=null){ key.channel().close(); } } } } } catch (IOException e) { e.printStackTrace(); } } //selector關閉後會自動釋放裏面管理的資源 if(selector!=null){ try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } //具體的事件處理方法 private void handleInput(SelectionKey key) throws IOException{ if(key.isValid()){ //得到關心當前事件的channel SocketChannel sc = (SocketChannel)key.channel(); if(key.isConnectable()){//鏈接事件 if(sc.finishConnect()){} else{System.exit(1);} } //有數據可讀事件 if(key.isReadable()){ //建立ByteBuffer,並開闢一個1M的緩衝區 ByteBuffer buffer = ByteBuffer.allocate(1024); //讀取請求碼流,返回讀取到的字節數 int readBytes = sc.read(buffer); //讀取到字節,對字節進行編解碼 if(readBytes>0){ //將緩衝區當前的limit設置爲position,position=0, // 用於後續對緩衝區的讀取操做 buffer.flip(); //根據緩衝區可讀字節數建立字節數組 byte[] bytes = new byte[buffer.remaining()]; //將緩衝區可讀字節數組複製到新建的數組中 buffer.get(bytes); String result = new String(bytes,"UTF-8"); System.out.println("accept message:"+result); }else if(readBytes<0){ key.cancel(); sc.close(); } } } } //發送消息 private void doWrite(SocketChannel channel,String request) throws IOException { //將消息編碼爲字節數組 byte[] bytes = request.getBytes(); //根據數組容量建立ByteBuffer ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); //將字節數組複製到緩衝區 writeBuffer.put(bytes); //flip操做 writeBuffer.flip(); //發送緩衝區的字節數組 channel.write(writeBuffer); } private void doConnect() throws IOException { /*若是此通道處於非阻塞模式, 則調用此方法將啓動非阻塞鏈接操做。 若是當即創建鏈接,就像本地鏈接可能發生的那樣,則此方法返回true。 不然,此方法返回false, 稍後必須經過調用finishConnect方法完成鏈接操做。*/ if(socketChannel.connect(new InetSocketAddress(host,port))){} else{ //鏈接還未完成,因此註冊鏈接就緒事件,向selector表示關注這個事件 socketChannel.register(selector,SelectionKey.OP_CONNECT); } } //寫數據對外暴露的API public void sendMsg(String msg) throws Exception{ socketChannel.register(selector,SelectionKey.OP_READ); doWrite(socketChannel,msg); } } public class NioClient { private static NioClientHandle nioClientHandle; public static void start(){ if(nioClientHandle !=null) nioClientHandle.stop(); nioClientHandle = new NioClientHandle(DEFAULT_SERVER_IP,DEFAULT_PORT); new Thread(nioClientHandle,"Client").start(); } //向服務器發送消息 public static boolean sendMsg(String msg) throws Exception{ nioClientHandle.sendMsg(msg); return true; } public static void main(String[] args) throws Exception { start(); System.out.println("請輸入請求信息:"); Scanner scanner = new Scanner(System.in); while(NioClient.sendMsg(scanner.next())); } }
服務端過程:this
客戶端過程:編碼
初始化工做如打開selector,channel,設置通道模式是否阻塞.spa
但在單線程Reactor模式中,不只I/O操做在該Reactor線程上,連非I/O的業務操做也在該線程上進行處理了,這可能會大大延遲I/O請求的響應。因此咱們應該將非I/O的業務邏輯操做從Reactor線程上卸載,以此來加速Reactor線程對I/O請求的響應.
添加了一個工做者線程池,並將非I/O操做從Reactor線程中移出轉交給工做者線程池來執行。這樣可以提升Reactor線程的I/O響應,不至於由於一些耗時的業務邏輯而延遲對後面I/O請求的處理。
改進的版本中,因此的I/O操做依舊由一個Reactor來完成,包括I/O的accept()、read()、write()以及connect()操做。
對於一些小容量應用場景,可使用單線程模型。可是對於高負載、大併發或大數據量的應用場景卻不合適,主要緣由以下:
Reactor線程池中的每一Reactor線程都會有本身的Selector、線程和分發的事件循環邏輯。
mainReactor能夠只有一個,但subReactor通常會有多個。mainReactor線程主要負責接收客戶端的鏈接請求,而後將接收到的SocketChannel傳遞給subReactor,由subReactor來完成和客戶端的通訊。
流程:
注意,因此的I/O操做(包括,I/O的accept()、read()、write()以及connect()操做)依舊仍是在Reactor線程(mainReactor線程 或 subReactor線程)中完成的。Thread Pool(線程池)僅用來處理非I/O操做的邏輯。
多Reactor線程模式將「接受客戶端的鏈接請求」和「與該客戶端的通訊」分在了兩個Reactor線程來完成。mainReactor完成接收客戶端鏈接請求的操做,它不負責與客戶端的通訊,而是將創建好的鏈接轉交給subReactor線程來完成與客戶端的通訊,這樣一來就不會由於read()數據量太大而致使後面的客戶端鏈接請求得不到即時處理的狀況。而且多Reactor線程模式在海量的客戶端併發請求的狀況下,還能夠經過實現subReactor線程池來將海量的鏈接分發給多個subReactor線程,在多核的操做系統中這能大大提高應用的負載和吞吐量。
Netty服務端使用了「多Reactor線程模式」