幾種NIO程序寫法(一)kafka的SocketServer

總結一點常見的NIO程序的寫法,雖然沒有一個固定的格式,也沒有特別大的差異,可是多總結總結各位大師的寫法,多見點兒組合仍是對自身代碼的質量有很大提升的。這一篇我想經過對kafka network包下的源碼進行分析,而後抽取第一種比較常見的寫法。以前已經寫過一篇關於kafka通訊源碼的解讀,可參見http://my.oschina.net/ielts0909/blog/102336,最近再改寫一些源碼,因此仍是有必要把更完整的認識更新上來。 java

固然這不是最多見的寫法,最多見,也是大多數人都能接受的代碼能夠參考http://my.oschina.net/ielts0909/blog/89849文中的代碼,固然在read或者write的時,更多的會採用多線程來處理。 react

各類寫法的本質都是同樣的,可能咱們要更注重比較不一樣細節的寫法,kafkaSocketServer的代碼主要在kafka broker端使用,是最基礎的服務端通訊類。在kafka代碼中,與常見寫法區別比較大的就是將選擇器鍵的接收部分(acceptor)單獨拿出來寫了。再經過接收器(acceptor)將讀寫的任務交付給更多個處理器(processor)。在處理器中採用隊列的形式按先進先出的順序對讀寫進行操做。 多線程


我特地將圖畫的跟reactor模式的圖差很少的形式,其實二者的本質也沒什麼差異,主要看怎麼去實現acceptor與多線程處理讀寫操做。 less


我用java改寫了部分kafka的代碼,這樣便於閱讀,咱們看AbstractServerThread中的一些細節,這個類主要定義了startupshutdown相關的一系列方法,這些方法經過閉鎖來同步多線程執行的順序。這個類直接向子類提供了selector對象。 socket

package org.gfg.inforq.network;

import java.io.IOException;
import java.nio.channels.Selector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A base class for server thread with several variables and methods
 * 
 * @author Chen.Hui
 * @since 0.1
 */
public abstract class AbstractServerThread implements Runnable {

	protected Selector selector;
	private static final Logger LOG = LoggerFactory
			.getLogger(AbstractServerThread.class);
	private CountDownLatch startupLatch = new CountDownLatch(1);
	private CountDownLatch shutdownLatch = new CountDownLatch(1);
	private AtomicBoolean alive = new AtomicBoolean(false);

	protected AbstractServerThread() throws IOException {
		this.selector = Selector.open();
		LOG.info("selector is opening");
	}

	/**
	 * shutdown the running therad
	 * @throws InterruptedException
	 * 
	 */
	protected void shutdown() throws InterruptedException {
		alive.set(false);
		selector.wakeup();
		shutdownLatch.await();
	}

	/**
	 * Causes the current thread to wait until the latch has counted down to
	 * zero, unless the thread is interrupted.
	 * @throws InterruptedException
	 */
	protected void awaitStartup() throws InterruptedException {
		startupLatch.await();
	}

	/**
	 *  releasing all waiting threads if the count reaches zero.
	 */
	protected void startupComplete() {
		alive.set(true);
		startupLatch.countDown();
	}

	/**
	 * 
	 */
	protected void shutdownComplete() {
		shutdownLatch.countDown();
	}
	
	/**
	 * 
	 * @return true only this thread is startup complete
	 */
	protected boolean isRunning(){
		return alive.get();
	}
	
}

Acceptor的做用也僅僅就是accept。咱們能夠注意到Acceptor引用了一系列的Processor,而後爲每一個processor與一個key綁定。並將相應的通道傳遞給processor。因此總結起來acceptor作了兩件事,綁定keyprocessor,並將通道傳遞給相應的processor 學習

package org.gfg.inforq.network;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 
 * @author Chen.Hui
 * 
 */
public class Acceptor extends AbstractServerThread {

	protected int port;
	private Processor[] processors;
	protected int sendBufferSize;
	protected int receiveBufferSize;
	
	private static final Logger LOG=LoggerFactory.getLogger(Acceptor.class);
	
	
	public Acceptor(int port, Processor[] processors, int sendBufferSize,
			int receiveBufferSize) throws IOException {
		//super();
		this.port=port;
		this.processors=processors;
		this.sendBufferSize=sendBufferSize;
		this.receiveBufferSize=receiveBufferSize;
	}

	public void run() {
		try {
			ServerSocketChannel serverChannel = ServerSocketChannel.open();
			serverChannel.configureBlocking(false);
			serverChannel.socket().bind(new InetSocketAddress(port));
			serverChannel.register(selector, SelectionKey.OP_ACCEPT);
			LOG.info("Awaiting connections on port "+port);
			startupComplete();
			
			int currentProcessor=0;
			while(isRunning()){
				int ready=selector.select(500);
				if(ready>0){
				 Set<SelectionKey> keys=selector.selectedKeys();
				 Iterator<SelectionKey> iter=keys.iterator();
				 while(iter.hasNext()&&isRunning()){
					 SelectionKey key=null;
					 try{
						 key=iter.next();
						 iter.remove();
						 if(key.isAcceptable()){
							 accept(key,processors[currentProcessor]);
						 }else{
							 throw new IllegalStateException("Unrecognized key state for acceptor thread");
						 }
						 currentProcessor=(currentProcessor+1)%processors.length;
					 }catch (Exception e) {
						LOG.error(" ",e);
					}
				 }
				}
			}
			serverChannel.close();
			selector.close();
			shutdownComplete();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	private void accept(SelectionKey key, Processor processor) throws IOException {
		ServerSocketChannel ssc=(ServerSocketChannel) key.channel();
		ssc.socket().setReceiveBufferSize(receiveBufferSize);
		
		SocketChannel socketChannel=ssc.accept();
		socketChannel.configureBlocking(false);
		socketChannel.socket().setTcpNoDelay(true);
		socketChannel.socket().setSendBufferSize(sendBufferSize);
		
		if(LOG.isDebugEnabled()){
			LOG.debug("sendBufferSize:["+socketChannel.socket().getSendBufferSize()+"] receiveBufferSize:["+socketChannel.socket().getReceiveBufferSize()+"]" );
		}
		processor.accept(socketChannel);
	}
}

Acceptor裏也沒作什麼策略直接將任務按照currentProcessor=(currentProcessor+1)%processors.length;的形式分配,若是作的更好的話,能夠按照任務的權重、執行時間等從新分配。另外要注意這裏調用的一些AbstractServerThread裏的方法。 ui

Processor類中主要作的就是讀寫操做,這部分沒有什麼特別突出的不一樣,在類裏引用了隊列來存儲由acceptor分發過來的通道,並按順序處理。 this

package org.gfg.inforq.network;

import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Processor extends AbstractServerThread {

	private final int maxRequestSize;
	
	public Processor(int maxRequestSize) throws IOException {
		super();
		this.maxRequestSize=maxRequestSize;
	}

	private ConcurrentLinkedQueue<SocketChannel> newConnections = new ConcurrentLinkedQueue<SocketChannel>();
	private static final Logger LOG = LoggerFactory.getLogger(Processor.class);

	public void run() {
		/** make sure the selector is open completely */
		startupComplete();

		while (isRunning()) {
			try {
				// setup any new connections that have been queued up
				configureNewConnections();
				int ready = selector.select();
				if (ready > 0) {
					Set<SelectionKey> keys = selector.selectedKeys();
					Iterator<SelectionKey> iter = keys.iterator();
					while (iter.hasNext() && isRunning()) {
						SelectionKey key = null;
						try {
							key = iter.next();
							iter.remove();

							if (key.isReadable()) {
								//
								read(key);
							} else if (key.isWritable()) {
								//
								write(key);
							} else if (key.isValid()) {
								//close
								close(key);
							} else {
								throw new IllegalStateException("Unrecognized key state for processor thread");
							}
						} catch (Exception e) {
							if (e instanceof IOException) {

							}
						}
					}
				}

			} catch (Exception e) {
				// TODO: handle exception
			}
		}
	}

	private void read(SelectionKey key) {
		SocketChannel socketChannel=channelFor(key);
		Receive request=(Receive) key.attachment();
		
		if(key.attachment()==null){
			request=new BoundedByteBufferReceive(maxRequestSize);
			key.attach(request);
		}
		
		int read=request.readFrom(socketChannel);
		
		if(LOG.isTraceEnabled()){
			LOG.trace(read+" bytes read from "+socketChannel.socket().getRemoteSocketAddress());
		}
		
		if(read<0){
			close(key);
			return;
		}else if(request.complete){
			handle(key,request);
		}
		
	}

	private void handle(SelectionKey key, Receive request) {
		// TODO Auto-generated method stub
		
	}

	private void write(SelectionKey key) {	
		Send response= (Send) key.attachment();
		SocketChannel socketChannel=channelFor(key);
		int written =response.writeTo(socketChannel);
		
		if(LOG.isTraceEnabled()){
			LOG.trace(written+" bytes written to "+socketChannel.socket().getRemoteSocketAddress());
		}
		if(response.complete){
			key.attach(null);
			key.interestOps(SelectionKey.OP_READ);
		}else{
			key.interestOps(SelectionKey.OP_WRITE);
			selector.wakeup();
		}
	}
	
	private SocketChannel channelFor(SelectionKey key){
		return (SocketChannel) key.channel(); 
	}

	/**
	 * close the channel
	 * @param key
	 */
	private void close(SelectionKey key) {
		SocketChannel channel=(SocketChannel) key.channel();
		if(LOG.isDebugEnabled()){
			LOG.debug("Closing the connection from"+channel.socket().getRemoteSocketAddress());
		}
		if(channel.isOpen()&&channel.socket().isConnected()){
			try {
				//care the sequence of closing
				channel.socket().close();
				channel.close();
				key.attach(null);
				key.cancel();
			} catch (IOException e) {
				LOG.info(e.getMessage(),e);
			}
		}
	}

	private void configureNewConnections() {
		while (newConnections.size() > 0) {
			SocketChannel channel = newConnections.poll();
			if (LOG.isDebugEnabled()) {
				LOG.debug("linstening to a new connection from "
						+ channel.socket().getRemoteSocketAddress());
			}
			try {
				channel.register(selector, SelectionKey.OP_READ);
			} catch (ClosedChannelException e) {
				LOG.info("the channel has been closed..");
				continue;
			}
		}
	}

	public void accept(SocketChannel socketChannel) {
		newConnections.add(socketChannel);
		selector.wakeup();
	}
}

我也沒法說出這種將接收器單獨拿出來寫的方式到底好處在哪裏,可是這樣的寫法結構清晰,各個組件分工明確,仍是很值得學習的。總結的目的不就是爲了學習各類優秀的東西麼。 atom

相關文章
相關標籤/搜索