總結一點常見的NIO程序的寫法,雖然沒有一個固定的格式,也沒有特別大的差異,可是多總結總結各位大師的寫法,多見點兒組合仍是對自身代碼的質量有很大提升的。這一篇我想經過對kafka network包下的源碼進行分析,而後抽取第一種比較常見的寫法。以前已經寫過一篇關於kafka通訊源碼的解讀,可參見http://my.oschina.net/ielts0909/blog/102336,最近再改寫一些源碼,因此仍是有必要把更完整的認識更新上來。 java
固然這不是最多見的寫法,最多見,也是大多數人都能接受的代碼能夠參考http://my.oschina.net/ielts0909/blog/89849文中的代碼,固然在read或者write的時,更多的會採用多線程來處理。 react
各類寫法的本質都是同樣的,可能咱們要更注重比較不一樣細節的寫法,kafkaSocketServer的代碼主要在kafka broker端使用,是最基礎的服務端通訊類。在kafka代碼中,與常見寫法區別比較大的就是將選擇器鍵的接收部分(acceptor)單獨拿出來寫了。再經過接收器(acceptor)將讀寫的任務交付給更多個處理器(processor)。在處理器中採用隊列的形式按先進先出的順序對讀寫進行操做。 多線程
我特地將圖畫的跟reactor模式的圖差很少的形式,其實二者的本質也沒什麼差異,主要看怎麼去實現acceptor與多線程處理讀寫操做。 less
我用java改寫了部分kafka的代碼,這樣便於閱讀,咱們看AbstractServerThread中的一些細節,這個類主要定義了startup、shutdown相關的一系列方法,這些方法經過閉鎖來同步多線程執行的順序。這個類直接向子類提供了selector對象。 socket
package org.gfg.inforq.network; import java.io.IOException; import java.nio.channels.Selector; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * A base class for server thread with several variables and methods * * @author Chen.Hui * @since 0.1 */ public abstract class AbstractServerThread implements Runnable { protected Selector selector; private static final Logger LOG = LoggerFactory .getLogger(AbstractServerThread.class); private CountDownLatch startupLatch = new CountDownLatch(1); private CountDownLatch shutdownLatch = new CountDownLatch(1); private AtomicBoolean alive = new AtomicBoolean(false); protected AbstractServerThread() throws IOException { this.selector = Selector.open(); LOG.info("selector is opening"); } /** * shutdown the running therad * @throws InterruptedException * */ protected void shutdown() throws InterruptedException { alive.set(false); selector.wakeup(); shutdownLatch.await(); } /** * Causes the current thread to wait until the latch has counted down to * zero, unless the thread is interrupted. * @throws InterruptedException */ protected void awaitStartup() throws InterruptedException { startupLatch.await(); } /** * releasing all waiting threads if the count reaches zero. */ protected void startupComplete() { alive.set(true); startupLatch.countDown(); } /** * */ protected void shutdownComplete() { shutdownLatch.countDown(); } /** * * @return true only this thread is startup complete */ protected boolean isRunning(){ return alive.get(); } }
Acceptor的做用也僅僅就是accept。咱們能夠注意到Acceptor引用了一系列的Processor,而後爲每一個processor與一個key綁定。並將相應的通道傳遞給processor。因此總結起來acceptor作了兩件事,綁定key和processor,並將通道傳遞給相應的processor。 學習
package org.gfg.inforq.network; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * @author Chen.Hui * */ public class Acceptor extends AbstractServerThread { protected int port; private Processor[] processors; protected int sendBufferSize; protected int receiveBufferSize; private static final Logger LOG=LoggerFactory.getLogger(Acceptor.class); public Acceptor(int port, Processor[] processors, int sendBufferSize, int receiveBufferSize) throws IOException { //super(); this.port=port; this.processors=processors; this.sendBufferSize=sendBufferSize; this.receiveBufferSize=receiveBufferSize; } public void run() { try { ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); serverChannel.socket().bind(new InetSocketAddress(port)); serverChannel.register(selector, SelectionKey.OP_ACCEPT); LOG.info("Awaiting connections on port "+port); startupComplete(); int currentProcessor=0; while(isRunning()){ int ready=selector.select(500); if(ready>0){ Set<SelectionKey> keys=selector.selectedKeys(); Iterator<SelectionKey> iter=keys.iterator(); while(iter.hasNext()&&isRunning()){ SelectionKey key=null; try{ key=iter.next(); iter.remove(); if(key.isAcceptable()){ accept(key,processors[currentProcessor]); }else{ throw new IllegalStateException("Unrecognized key state for acceptor thread"); } currentProcessor=(currentProcessor+1)%processors.length; }catch (Exception e) { LOG.error(" ",e); } } } } serverChannel.close(); selector.close(); shutdownComplete(); } catch (IOException e) { e.printStackTrace(); } } private void accept(SelectionKey key, Processor processor) throws IOException { ServerSocketChannel ssc=(ServerSocketChannel) key.channel(); ssc.socket().setReceiveBufferSize(receiveBufferSize); SocketChannel socketChannel=ssc.accept(); socketChannel.configureBlocking(false); socketChannel.socket().setTcpNoDelay(true); socketChannel.socket().setSendBufferSize(sendBufferSize); if(LOG.isDebugEnabled()){ LOG.debug("sendBufferSize:["+socketChannel.socket().getSendBufferSize()+"] receiveBufferSize:["+socketChannel.socket().getReceiveBufferSize()+"]" ); } processor.accept(socketChannel); } }
Acceptor裏也沒作什麼策略直接將任務按照currentProcessor=(currentProcessor+1)%processors.length;的形式分配,若是作的更好的話,能夠按照任務的權重、執行時間等從新分配。另外要注意這裏調用的一些AbstractServerThread裏的方法。 ui
Processor類中主要作的就是讀寫操做,這部分沒有什麼特別突出的不一樣,在類裏引用了隊列來存儲由acceptor分發過來的通道,並按順序處理。 this
package org.gfg.inforq.network; import java.io.IOException; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Processor extends AbstractServerThread { private final int maxRequestSize; public Processor(int maxRequestSize) throws IOException { super(); this.maxRequestSize=maxRequestSize; } private ConcurrentLinkedQueue<SocketChannel> newConnections = new ConcurrentLinkedQueue<SocketChannel>(); private static final Logger LOG = LoggerFactory.getLogger(Processor.class); public void run() { /** make sure the selector is open completely */ startupComplete(); while (isRunning()) { try { // setup any new connections that have been queued up configureNewConnections(); int ready = selector.select(); if (ready > 0) { Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iter = keys.iterator(); while (iter.hasNext() && isRunning()) { SelectionKey key = null; try { key = iter.next(); iter.remove(); if (key.isReadable()) { // read(key); } else if (key.isWritable()) { // write(key); } else if (key.isValid()) { //close close(key); } else { throw new IllegalStateException("Unrecognized key state for processor thread"); } } catch (Exception e) { if (e instanceof IOException) { } } } } } catch (Exception e) { // TODO: handle exception } } } private void read(SelectionKey key) { SocketChannel socketChannel=channelFor(key); Receive request=(Receive) key.attachment(); if(key.attachment()==null){ request=new BoundedByteBufferReceive(maxRequestSize); key.attach(request); } int read=request.readFrom(socketChannel); if(LOG.isTraceEnabled()){ LOG.trace(read+" bytes read from "+socketChannel.socket().getRemoteSocketAddress()); } if(read<0){ close(key); return; }else if(request.complete){ handle(key,request); } } private void handle(SelectionKey key, Receive request) { // TODO Auto-generated method stub } private void write(SelectionKey key) { Send response= (Send) key.attachment(); SocketChannel socketChannel=channelFor(key); int written =response.writeTo(socketChannel); if(LOG.isTraceEnabled()){ LOG.trace(written+" bytes written to "+socketChannel.socket().getRemoteSocketAddress()); } if(response.complete){ key.attach(null); key.interestOps(SelectionKey.OP_READ); }else{ key.interestOps(SelectionKey.OP_WRITE); selector.wakeup(); } } private SocketChannel channelFor(SelectionKey key){ return (SocketChannel) key.channel(); } /** * close the channel * @param key */ private void close(SelectionKey key) { SocketChannel channel=(SocketChannel) key.channel(); if(LOG.isDebugEnabled()){ LOG.debug("Closing the connection from"+channel.socket().getRemoteSocketAddress()); } if(channel.isOpen()&&channel.socket().isConnected()){ try { //care the sequence of closing channel.socket().close(); channel.close(); key.attach(null); key.cancel(); } catch (IOException e) { LOG.info(e.getMessage(),e); } } } private void configureNewConnections() { while (newConnections.size() > 0) { SocketChannel channel = newConnections.poll(); if (LOG.isDebugEnabled()) { LOG.debug("linstening to a new connection from " + channel.socket().getRemoteSocketAddress()); } try { channel.register(selector, SelectionKey.OP_READ); } catch (ClosedChannelException e) { LOG.info("the channel has been closed.."); continue; } } } public void accept(SocketChannel socketChannel) { newConnections.add(socketChannel); selector.wakeup(); } }
我也沒法說出這種將接收器單獨拿出來寫的方式到底好處在哪裏,可是這樣的寫法結構清晰,各個組件分工明確,仍是很值得學習的。總結的目的不就是爲了學習各類優秀的東西麼。 atom