Scalable IO in Java 的簡單解讀

<!-- lang: java -->
	class Reactor implements Runnable {
	final Selector selector;
	//ServerSocketChannel
	//支持異步操做,對應於java.net.ServerSocket這個類,提供了TCP協議IO接口,支持OP_ACCEPT操做。
	final ServerSocketChannel serverSocket;
	
	Reactor(int port) throws IOException {
		selector = Selector.open();  //建立實例
		serverSocket = ServerSocketChannel.open();
		serverSocket.socket().bind(new InetSocketAddress(port));
		// 全部channel建立的時候都是blocking模式,
		// 只有non-blocking的SelectableChannel才能夠參與異步IO操做。
		serverSocket.configureBlocking(false); //設置non-blocking模式。
		/**
		*SelectionKey register(Selector sel, int ops)
		*將當前channel註冊到一個Selector上並返回對應的SelectionKey。
		*在這之後,經過調用Selector的select()函數就能夠監控這個channel。ops這個參數是一個bit mask,表明了須要監控的IO操做。
		*SelectionKey register(Selector sel, int ops, Object att)
		*這個函數和上一個的意義同樣,多出來的att參數會做爲attachment被存放在返回的SelectionKey中,這在須要存放一些session state的時候很是有用。
		*Selector定義了4個靜態常量來表示4種IO操做,這些常量能夠進行位操做組合成一個bit mask。
		*int OP_ACCEPT : 有新的網絡鏈接能夠accept,ServerSocketChannel支持這一異步IO。
		*int OP_CONNECT: 表明鏈接已經創建(或出錯),SocketChannel支持這一異步IO。
		*/
		SelectionKey sk = serverSocket.register(selector,SelectionKey.OP_ACCEPT);
		sk.attach(new Acceptor());	// 綁定attachment
	}
	/*
	Alternatively, use explicit SPI provider:
	SelectorProvider p = SelectorProvider.provider();
	selector = p.openSelector();
	serverSocket = p.openServerSocketChannel();
	*/


	public void run() { // normally in a new Thread
		try {
			while (!Thread.interrupted()) {
				/**
				*在一個Selector中,有3個SelectionKey的集合:
				*1. key set表明了全部註冊在這個Selector上的channel,這個集合能夠經過keys()方法拿到。
				*2. Selected-key set表明了全部經過select()方法監測到能夠進行IO操做的channel,這個集合能夠經過selectedKeys()拿到。
				*3. Cancelled-key set表明了已經cancel了註冊關係的channel,在下一個select()操做中,這些channel對應的SelectionKey會從key set和cancelled-key set中移走。這個集合沒法直接訪問。
				*/
				//監控全部註冊的channel,當其中有註冊的IO操做能夠進行時,該函數返回,
				//並將對應的SelectionKey加入selected-key set。
				selector.select();	
				Set selected = selector.selectedKeys();
				Iterator it = selected.iterator();
				while (it.hasNext())
					dispatch((SelectionKey)(it.next());
				selected.clear();
			}
		} catch (IOException ex) { /* ... */ }
	}
	
	void dispatch(SelectionKey k) {
		Runnable r = (Runnable)(k.attachment());
		if (r != null)
			r.run();
	}


	class Acceptor implements Runnable { // inner
		public void run() {
			try {
				//SocketChannel accept() :接受一個鏈接,返回表明這個鏈接的SocketChannel對象。
				SocketChannel c = serverSocket.accept();
				if (c != null)
					new Handler(selector, c);
			}catch(IOException ex) { /* ... */ }
		}
	}
}


final class Handler implements Runnable {
	final SocketChannel socket;
	final SelectionKey sk;
	ByteBuffer input = ByteBuffer.allocate(MAXIN);
	ByteBuffer output = ByteBuffer.allocate(MAXOUT);
	static final int READING = 0, SENDING = 1;
	int state = READING;
	
	Handler(Selector sel, SocketChannel c) throws IOException {
		socket = c; 
		c.configureBlocking(false);
		// Optionally try first read now
		/**
		* SocketChannel
		* 支持異步操做,對應於java.net.Socket這個類,提供了TCP協議IO接口,
		* 支持OP_CONNECT,OP_READ和OP_WRITE操做。
		*/
		// 爲毛要拆成三句?而不是sk = socket.register(sel, SelectionKey.OP_READ, this)
		sk = socket.register(sel, 0);
		sk.attach(this);
		sk.interestOps(SelectionKey.OP_READ);
		// 使一個還未返回的select()操做馬上返回。
		sel.wakeup();
	}
	
	boolean inputIsComplete() { /* ... */ }
	boolean outputIsComplete() { /* ... */ }
	void process() { /* ... */ }
	
	
	public void run() {
		try {
			if (state == READING) read();
			else if (state == SENDING) send();
		} catch (IOException ex) { /* ... */ }
	}
	
	void read() throws IOException {
		socket.read(input);
		if (inputIsComplete()) {
			process();
			state = SENDING;
			// Normally also do first write now
			sk.interestOps(SelectionKey.OP_WRITE);
		}
	}
	void send() throws IOException {
		socket.write(output);
		// void cancel() : cancel這個SelectionKey所對應的註冊關係。
		if (outputIsComplete()) sk.cancel();
	}
}


/**
* 下面是變種
*/

/**
 * =========變種=============
 * GoF State-Object pattern 
 * 狀態模式,適用於"狀態切換"的情景
 * 例子:http://www.jdon.com/designpatterns/designpattern_State.htm
 *
*/

class Handler { 
	// ...
	
	public void run() { // initial state is reader
		socket.read(input);
		if (inputIsComplete()) {
			process();
			sk.attach(new Sender());
			sk.interest(SelectionKey.OP_WRITE);
			sk.selector().wakeup();
		}
	}
	class Sender implements Runnable {
		public void run(){ // ...
			socket.write(output);
			if (outputIsComplete()) sk.cancel();
		}
	}
}


/**
 * =========變種=============
 * Handler with Thread Pool
 *
*/

class Handler implements Runnable {
	// uses util.concurrent thread pool
	static PooledExecutor pool = new PooledExecutor(...);
	static final int PROCESSING = 3;
	// ...
	synchronized void read() { // ...
		socket.read(input);
		if (inputIsComplete()) {
			state = PROCESSING;
			pool.execute(new Processer());
		}
	}
	synchronized void processAndHandOff() {
		process();
		state = SENDING; // or rebind attachment
		sk.interest(SelectionKey.OP_WRITE);
	}
	class Processer implements Runnable {
		public void run() { processAndHandOff(); }
	}
}


/**
 * =========變種=============
 * Multiple Reactor Threads
 *
*/

	//Use to match CPU and IO rates
	//Static or dynamic construction
	//" Each with own Selector, Thread, dispatch loop
	//Main acceptor distributes to other reactors

	Selector[] selectors; // also create threads
	int next = 0;
	class Acceptor { // ...
		public synchronized void run() { ...
			Socket connection = serverSocket.accept();
			if (connection != null)
			new Handler(selectors[next], connection);
			if (++next == selectors.length) next = 0;
		}
	}
相關文章
相關標籤/搜索