一、tcp信道,具體參數詳情參考apihtml
ServerSocketChannel:建立、接收、關閉、讀寫、阻塞java
SocketChannel:建立、鏈接、關閉、讀寫、阻塞(測試鏈接性)react
二、Selector:建立、關閉選擇器api
案例一:socket
NIOAccepter服務端線程tcp
package com.warehouse.data.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Set; import javax.imageio.IIOException; /** * ${DESCRIPTION} * package com.warehouse.data.nio * * @author zli [liz@yyft.com] * @version v1.0 * @create 2017-03-28 9:55 **/ public class NIOAcceptor extends Thread { private Selector selector; private ServerSocketChannel channel; private NIOReactorPool reactorPool; public NIOAcceptor(String name, String host, int port, NIOReactorPool reactorPool) throws IOException { super(name); this.reactorPool = reactorPool; //獲取一個管理通道器 this.selector = Selector.open(); //獲取一個接受鏈接socket this.channel = ServerSocketChannel.open(); //設置非阻塞 this.channel.configureBlocking(false); //綁定端口 this.channel.bind(new InetSocketAddress(host, port)); //註冊事件 this.channel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("start NIOAcceptor thread server."); } @Override public void run() { final Selector selector = this.selector; //輪詢 for (; ; ) { try { //阻塞,直到select事件到達 selector.select(1000L); Set<SelectionKey> selectionKeySet = selector.selectedKeys(); try { for (SelectionKey key : selectionKeySet) { if (key.isValid() && key.isAcceptable()) { accept(selector); }/* else if (key.isValid() && key.isReadable()) { Processor processor = (Processor) key.attachment(); try{ processor.process(key); }catch (IOException e){ processor.close(); } } */else { key.cancel(); } } } finally { selectionKeySet.clear(); } } catch (IOException e) { e.printStackTrace(); } } } public void accept(Selector selector) { SocketChannel socketChannel = null; try { System.out.println("accept client success."); socketChannel = this.channel.accept(); socketChannel.configureBlocking(false); //單個Reactor線程處理 //SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ); //selectionKey.attach(new Processor(socketChannel)); //多個Reactor線程處理 NIOReactor reactor = this.reactorPool.getNextReactor(); reactor.postRegister(new Processor(socketChannel)); } catch (IOException e) { e.printStackTrace(); } } }
NIOReactorPool線程池ide
package com.warehouse.data.nio; import java.io.IOException; /** * ${DESCRIPTION} * package com.warehouse.data.nio * * @author zli [liz@yyft.com] * @version v1.0 * @create 2017-03-28 11:50 **/ public class NIOReactorPool { private final NIOReactor[] nioReactors; private volatile int nextReactor; public NIOReactorPool(String name, int poolSize) throws IOException { nioReactors = new NIOReactor[poolSize]; for (int i = 0; i < poolSize; i++) { NIOReactor nioReactor = new NIOReactor(name + "-" + i); nioReactors[i] = nioReactor; nioReactor.startup(); } } public NIOReactor getNextReactor() { int i = ++nextReactor; if (i > nioReactors.length) { i = nextReactor = 0; } return nioReactors[i]; } }
NIOReactor線程post
package com.warehouse.data.nio; import java.io.IOException; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; /** * ${DESCRIPTION} * package com.warehouse.data.nio * * @author zli [liz@yyft.com] * @version v1.0 * @create 2017-03-28 11:51 **/ public final class NIOReactor { private final String name; private final RWThread reactorR; public NIOReactor(String name) throws IOException { this.name = name; this.reactorR = new RWThread(); } public void startup() { new Thread(reactorR, this.name + "-RW").start(); } public void postRegister(Processor processor) { this.reactorR.registerQueue.offer(processor); this.reactorR.selector.wakeup(); } private final class RWThread extends Thread { private final Selector selector; private final ConcurrentLinkedQueue<Processor> registerQueue; public RWThread() throws IOException { this.selector = Selector.open(); this.registerQueue = new ConcurrentLinkedQueue<Processor>(); } @Override public void run() { final Selector selector = this.selector; Set<SelectionKey> selectionKeySet = null; for (; ; ) { try { selector.select(1000L); register(selector); selectionKeySet = selector.selectedKeys(); for (SelectionKey key : selectionKeySet) { Object att = key.attachment(); Processor processor = null; try { if (att != null && key.isValid()) { processor = (Processor) att; if (key.isReadable()) { processor.process(key); } if (key.isWritable()) { } } else { key.channel(); } } catch (Throwable e) { e.printStackTrace(); if(processor != null){ processor.close(); } } } } catch (IOException e) { e.printStackTrace(); } finally { if (selectionKeySet != null) { selectionKeySet.clear(); } } } } private void register(Selector selector) { if (this.registerQueue.isEmpty()) { return; } Processor processor = null; while ((processor = this.registerQueue.poll()) != null) { try { processor.register(selector); } catch (ClosedChannelException e) { e.printStackTrace(); } } } } }
Processor事件處理測試
package com.warehouse.data.nio; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; /** * ${DESCRIPTION} * package com.warehouse.data.nio * * @author zli [liz@yyft.com] * @version v1.0 * @create 2017-03-28 10:49 **/ public class Processor { private SocketChannel channel; private SelectionKey selectionKey; public Processor(SocketChannel channel) { this.channel = channel; } public void register(Selector selector) throws ClosedChannelException { selectionKey = this.channel.register(selector, SelectionKey.OP_READ, this); } public void process(SelectionKey key) throws IOException { //能夠採用線程池處理 SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int count = socketChannel.read(byteBuffer); System.out.println(new String(byteBuffer.array())); } public void close(){ if(this.channel != null){ try { this.channel.close(); this.selectionKey.cancel(); } catch (IOException e) { e.printStackTrace(); } } } }
Server服務端this
package com.warehouse.data.nio; import java.io.IOException; /** * ${DESCRIPTION} * package com.warehouse.data.nio * * @author zli [liz@yyft.com] * @version v1.0 * @create 2017-03-28 11:03 **/ public class Server { public static void main(String[] args) throws IOException { //5個reactor線程 NIOReactorPool nioReactorPool = new NIOReactorPool("NIOReactor-IO", 5); NIOAcceptor nioAcceptor = new NIOAcceptor("NIOAcceptor-IO", "127.0.0.1", 8888, nioReactorPool); nioAcceptor.start(); } }
Client客戶端
package com.warehouse.data.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; /** * ${DESCRIPTION} * package com.warehouse.data.nio * * @author zli [liz@yyft.com] * @version v1.0 * @create 2017-03-28 11:00 **/ public class Client { public static void main(String[] args) throws IOException { Selector selector = Selector.open(); SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_CONNECT); socketChannel.connect(new InetSocketAddress("127.0.0.1",8888)); if(socketChannel.isConnectionPending()){ //要finishConnect,不然會出現NotYetConnectedException異常 socketChannel.finishConnect(); socketChannel.write(ByteBuffer.wrap(new String("hello world").getBytes())); } } }
資料:具體能夠參考netty權威指南