<!-- lang: java --> class Reactor implements Runnable { final Selector selector; //ServerSocketChannel //支持異步操做,對應於java.net.ServerSocket這個類,提供了TCP協議IO接口,支持OP_ACCEPT操做。 final ServerSocketChannel serverSocket; Reactor(int port) throws IOException { selector = Selector.open(); //建立實例 serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind(new InetSocketAddress(port)); // 全部channel建立的時候都是blocking模式, // 只有non-blocking的SelectableChannel才能夠參與異步IO操做。 serverSocket.configureBlocking(false); //設置non-blocking模式。 /** *SelectionKey register(Selector sel, int ops) *將當前channel註冊到一個Selector上並返回對應的SelectionKey。 *在這之後,經過調用Selector的select()函數就能夠監控這個channel。ops這個參數是一個bit mask,表明了須要監控的IO操做。 *SelectionKey register(Selector sel, int ops, Object att) *這個函數和上一個的意義同樣,多出來的att參數會做爲attachment被存放在返回的SelectionKey中,這在須要存放一些session state的時候很是有用。 *Selector定義了4個靜態常量來表示4種IO操做,這些常量能夠進行位操做組合成一個bit mask。 *int OP_ACCEPT : 有新的網絡鏈接能夠accept,ServerSocketChannel支持這一異步IO。 *int OP_CONNECT: 表明鏈接已經創建(或出錯),SocketChannel支持這一異步IO。 */ SelectionKey sk = serverSocket.register(selector,SelectionKey.OP_ACCEPT); sk.attach(new Acceptor()); // 綁定attachment } /* Alternatively, use explicit SPI provider: SelectorProvider p = SelectorProvider.provider(); selector = p.openSelector(); serverSocket = p.openServerSocketChannel(); */ public void run() { // normally in a new Thread try { while (!Thread.interrupted()) { /** *在一個Selector中,有3個SelectionKey的集合: *1. key set表明了全部註冊在這個Selector上的channel,這個集合能夠經過keys()方法拿到。 *2. Selected-key set表明了全部經過select()方法監測到能夠進行IO操做的channel,這個集合能夠經過selectedKeys()拿到。 *3. Cancelled-key set表明了已經cancel了註冊關係的channel,在下一個select()操做中,這些channel對應的SelectionKey會從key set和cancelled-key set中移走。這個集合沒法直接訪問。 */ //監控全部註冊的channel,當其中有註冊的IO操做能夠進行時,該函數返回, //並將對應的SelectionKey加入selected-key set。 selector.select(); Set selected = selector.selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) dispatch((SelectionKey)(it.next()); selected.clear(); } } catch (IOException ex) { /* ... */ } } void dispatch(SelectionKey k) { Runnable r = (Runnable)(k.attachment()); if (r != null) r.run(); } class Acceptor implements Runnable { // inner public void run() { try { //SocketChannel accept() :接受一個鏈接,返回表明這個鏈接的SocketChannel對象。 SocketChannel c = serverSocket.accept(); if (c != null) new Handler(selector, c); }catch(IOException ex) { /* ... */ } } } } final class Handler implements Runnable { final SocketChannel socket; final SelectionKey sk; ByteBuffer input = ByteBuffer.allocate(MAXIN); ByteBuffer output = ByteBuffer.allocate(MAXOUT); static final int READING = 0, SENDING = 1; int state = READING; Handler(Selector sel, SocketChannel c) throws IOException { socket = c; c.configureBlocking(false); // Optionally try first read now /** * SocketChannel * 支持異步操做,對應於java.net.Socket這個類,提供了TCP協議IO接口, * 支持OP_CONNECT,OP_READ和OP_WRITE操做。 */ // 爲毛要拆成三句?而不是sk = socket.register(sel, SelectionKey.OP_READ, this) sk = socket.register(sel, 0); sk.attach(this); sk.interestOps(SelectionKey.OP_READ); // 使一個還未返回的select()操做馬上返回。 sel.wakeup(); } boolean inputIsComplete() { /* ... */ } boolean outputIsComplete() { /* ... */ } void process() { /* ... */ } public void run() { try { if (state == READING) read(); else if (state == SENDING) send(); } catch (IOException ex) { /* ... */ } } void read() throws IOException { socket.read(input); if (inputIsComplete()) { process(); state = SENDING; // Normally also do first write now sk.interestOps(SelectionKey.OP_WRITE); } } void send() throws IOException { socket.write(output); // void cancel() : cancel這個SelectionKey所對應的註冊關係。 if (outputIsComplete()) sk.cancel(); } } /** * 下面是變種 */ /** * =========變種============= * GoF State-Object pattern * 狀態模式,適用於"狀態切換"的情景 * 例子:http://www.jdon.com/designpatterns/designpattern_State.htm * */ class Handler { // ... public void run() { // initial state is reader socket.read(input); if (inputIsComplete()) { process(); sk.attach(new Sender()); sk.interest(SelectionKey.OP_WRITE); sk.selector().wakeup(); } } class Sender implements Runnable { public void run(){ // ... socket.write(output); if (outputIsComplete()) sk.cancel(); } } } /** * =========變種============= * Handler with Thread Pool * */ class Handler implements Runnable { // uses util.concurrent thread pool static PooledExecutor pool = new PooledExecutor(...); static final int PROCESSING = 3; // ... synchronized void read() { // ... socket.read(input); if (inputIsComplete()) { state = PROCESSING; pool.execute(new Processer()); } } synchronized void processAndHandOff() { process(); state = SENDING; // or rebind attachment sk.interest(SelectionKey.OP_WRITE); } class Processer implements Runnable { public void run() { processAndHandOff(); } } } /** * =========變種============= * Multiple Reactor Threads * */ //Use to match CPU and IO rates //Static or dynamic construction //" Each with own Selector, Thread, dispatch loop //Main acceptor distributes to other reactors Selector[] selectors; // also create threads int next = 0; class Acceptor { // ... public synchronized void run() { ... Socket connection = serverSocket.accept(); if (connection != null) new Handler(selectors[next], connection); if (++next == selectors.length) next = 0; } }