#Netty裏面的Boss和Worker【Server篇】 最近在總結Dubbo關於Netty通訊方面的實現,因而也就藉此機會深刻體會了一下Netty。通常啓動Netty的Server端時都會設置兩個ExecutorService對象,咱們都習慣用boss,worker兩個變量來引用這兩個對象,因而從我一開始接觸Netty就有了boss和worker的概念。這篇博客將對boss和worker進行介紹,但並非涉及Netty其餘部分介紹。java
在Netty的裏面有一個Boss,他開了一家公司(開啓一個服務端口)對外提供業務服務,它手下有一羣作事情的workers。Boss一直對外宣傳本身公司提供的業務,而且接受(accept)有須要的客戶(client),當一位客戶找到Boss說須要他公司提供的業務,Boss便會爲這位客戶安排一個worker,這個worker全程爲這位客戶服務(read/write)。若是公司業務繁忙,一個worker可能會爲多個客戶進行服務。這就是Netty裏面Boss和worker之間的關係。下面看看Netty是如何讓Boss和Worker進行協助的。bootstrap
<!--lang:java--> protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; } }); // bind channel = bootstrap.bind(getBindAddress()); }
上面這段代碼是Dubbo用來開啓服務的,也是大部分使用Netty進行服務端開發經常使用的方式啓動服務端。首先是設置boss和worker的線程池,以可以讓它們在各自的線程池裏面異步執行。當調用bootstrap.bind(getBindAddress())
的時候最終受理綁定操做的是NioServerSocketPipelineSink
的eventSunk
方法,看類名和方法簽名就應該知道是處理IO事件的。方法eventSunk
實現以下:異步
<!--lang:java--> public void eventSunk( ChannelPipeline pipeline, ChannelEvent e) throws Exception { Channel channel = e.getChannel(); if (channel instanceof NioServerSocketChannel) { handleServerSocket(e); } else if (channel instanceof NioSocketChannel) { handleAcceptedSocket(e); } }
因爲這個時候Server還處於bind階段,因此channel確定不是NioSocketChannel
,因而就到了方法handleServerSocket
裏面,最後將會調用bind
方法來綁定某個端口啓動服務。下面是bind
方法實現:socket
<!--lang:java--> private void bind( NioServerSocketChannel channel, ChannelFuture future, SocketAddress localAddress) { boolean bound = false; boolean bossStarted = false; try { channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog()); bound = true; future.setSuccess(); fireChannelBound(channel, channel.getLocalAddress()); Executor bossExecutor = ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor; DeadLockProofWorker.start( bossExecutor, new ThreadRenamingRunnable( new Boss(channel), "New I/O server boss #" + id + " (" + channel + ')')); bossStarted = true; } catch (Throwable t) { future.setFailure(t); fireExceptionCaught(channel, t); } finally { if (!bossStarted && bound) { close(channel, future); } } }
能夠看到socket的綁定以及設置異步的future成功,已通知服務啓動成功,同時將綁定成功事件通知出去。接下來我看的重點來了,就是bossExecutor,能夠看到它是經過NioServerSocketChannelFactory
裏面去獲取的,NioServerSocketChannelFactory
裏面的boss就是以前咱們設置進去的,能夠肯定咱們以前設置boss的異步線程池是在這裏被使用了。緊接下來的是啓動咱們的異步線程池,到這裏進入了Boss該作的事情,Boss實際上是實現了Runnable接口,從而能夠交給boss的線程池運行,接下來的關注點就是Boss的run
方法,這裏纔是Boss作事情的地方。再此以前先看看Boss初始化作了什麼事情:oop
<!--lang:java--> Boss(NioServerSocketChannel channel) throws IOException { this.channel = channel; selector = Selector.open(); boolean registered = false; try { channel.socket.register(selector, SelectionKey.OP_ACCEPT); registered = true; } finally { if (!registered) { closeSelector(); } } channel.selector = selector; }
Boss初始化過程當中其實就是將serversocket註冊到一個selector裏面,從而能夠實現NIO的異步IO處理。this
<!--lang:java--> public void run() { final Thread currentThread = Thread.currentThread(); channel.shutdownLock.lock(); try { for (;;) { try { if (selector.select(1000) > 0) { selector.selectedKeys().clear(); } SocketChannel acceptedSocket = channel.socket.accept(); if (acceptedSocket != null) { registerAcceptedChannel(acceptedSocket, currentThread); } } catch (SocketTimeoutException e) { // Thrown every second to get ClosedChannelException // raised. } catch (CancelledKeyException e) { // Raised by accept() when the server socket was closed. } catch (ClosedSelectorException e) { // Raised by accept() when the server socket was closed. } catch (ClosedChannelException e) { // Closed as requested. break; } catch (Throwable e) { logger.warn( "Failed to accept a connection.", e); try { Thread.sleep(1000); } catch (InterruptedException e1) { // Ignore } } } } finally { channel.shutdownLock.unlock(); closeSelector(); } }
在run
方法裏面是一個死循環,裏面在不間斷的等待客戶端的鏈接,若是有客戶端的鏈接,那麼將會調用方法registerAcceptedChannel
進行後續的處理。線程
<!--lang:java--> private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) { try { ChannelPipeline pipeline = channel.getConfig().getPipelineFactory().getPipeline(); NioWorker worker = nextWorker(); worker.register(new NioAcceptedSocketChannel( channel.getFactory(), pipeline, channel, NioServerSocketPipelineSink.this, acceptedSocket, worker, currentThread), null); } catch (Exception e) { logger.warn( "Failed to initialize an accepted socket.", e); try { acceptedSocket.close(); } catch (IOException e2) { logger.warn( "Failed to close a partially accepted socket.", e2); } } }
方法registerAcceptedChannel
就是將客戶端的channle分配給一個worker,而這個worker是經過方法nextWorker
獲取 <!--lang:java--> NioWorker nextWorker() { return workers[Math.abs( workerIndex.getAndIncrement() % workers.length)]; }rest
能夠看到方法nextWorker
是一個讓worker裏面的客戶端channel保持平衡的做用,可能你會疑問這個workers是哪裏來的,實際上是在上面初始化NioServerSocketChannelFactory
的時候,NioServerSocketChannelFactory
再去初始化NioServerSocketPipelineSink
時候構造出來的,默認狀況下workers的數量是咱們初始化NioServerSocketChannelFactory
設置進去的。能夠看到是調用worker的register
方法將客戶端的channel註冊到worker裏面的。netty
<!--lang:java--> void register(NioSocketChannel channel, ChannelFuture future) { boolean server = !(channel instanceof NioClientSocketChannel); Runnable registerTask = new RegisterTask(channel, future, server); Selector selector; synchronized (startStopLock) { if (!started) { ..... this.selector = selector = Selector.open(); ..... DeadLockProofWorker.start( executor, new ThreadRenamingRunnable(this, threadName)); success = true; ..... } else { selector = this.selector; } assert selector != null && selector.isOpen(); started = true; boolean offered = registerTaskQueue.offer(registerTask); assert offered; } if (wakenUp.compareAndSet(false, true)) { selector.wakeup(); } }
上面對worker有一個started狀態的檢測,若是沒啓動,則啓動worker,這個額通常都是將第一個客戶端的channel註冊到worker裏面才進行的。因爲worker也是實現了Rannable接口,因此啓動的主要工做就是讓worker在某個線程裏面跑起來,而且爲這個worker分配一個selector,用來進行監控IO事件。下面即是這個過程實現:code
<!--lang:java--> DeadLockProofWorker.start( executor, new ThreadRenamingRunnable(this, threadName)); success = true;
其中的executor
即是咱們一開始設置的workerExecutor。 worker啓動成功以後,接下來要作的即是讓worker管理器客戶端的channel
<!--lang:java--> Runnable registerTask = new RegisterTask(channel, future, server); ....... boolean offered = registerTaskQueue.offer(registerTask); assert offered;
worker是將客戶端包裝成一個RegisterTask,而後放入隊列,可見RegisterTask也實現了Runnable接口。那放入隊列之後誰去取這個隊列裏面的數據呢?固然,確定是worker去取。上面介紹啓動worker的時候是讓worker在某個線程裏面跑起來,而且worker是實現了Rannable方法,因而運行worker的線程確定是調用worker的run方法。
<!--lang:java--> public void run() { thread = Thread.currentThread(); boolean shutdown = false; Selector selector = this.selector; for (;;) { ..... try { SelectorUtil.select(selector); ..... cancelledKeys = 0; processRegisterTaskQueue(); processWriteTaskQueue(); processSelectedKeys(selector.selectedKeys()); ..... } catch (Throwable t) { try { Thread.sleep(1000); } catch (InterruptedException e) { } } } }
能夠看到run
方法裏面也是一個死循環,在不斷的輪詢調用selector的select IO的事件。接下來會調用三個方法processRegisterTaskQueue
,processWriteTaskQueue
和processSelectedKeys
。經過方法簽名就應該知道這個三個方法具體是作什麼事情的,第一個是處理上面registerTaskQueue
的,而且queue裏面對象的run
方法,而第二個processWriteTaskQueue
是處理寫任務的,而processSelectedKeys
是處理selector匹配的IO事件。咱們先看看registerTaskQueue
是作了什麼?
<!--lang:java--> private void processRegisterTaskQueue() throws IOException { for (;;) { final Runnable task = registerTaskQueue.poll(); if (task == null) { break; } task.run(); cleanUpCancelledKeys(); } }
上面介紹過registerTaskQueue
裏面的元素是RegisterTask
。因此須要去看看RegisterTask
的run方法實現,其中RegisterTask
是NioWorker
裏面的內部類,因此RegisterTask
是能夠訪問NioWorker
的元素信息。
<!--lang:java--> public void run() { SocketAddress localAddress = channel.getLocalAddress(); SocketAddress remoteAddress = channel.getRemoteAddress(); if (localAddress == null || remoteAddress == null) { if (future != null) { future.setFailure(new ClosedChannelException()); } close(channel, succeededFuture(channel)); return; } try { if (server) { channel.socket.configureBlocking(false); } synchronized (channel.interestOpsLock) { channel.socket.register( selector, channel.getRawInterestOps(), channel); } if (future != null) { channel.setConnected(); future.setSuccess(); } } catch (IOException e) { if (future != null) { future.setFailure(e); } close(channel, succeededFuture(channel)); .... } if (!server) { if (!((NioClientSocketChannel) channel).boundManually) { fireChannelBound(channel, localAddress); } fireChannelConnected(channel, remoteAddress); } }
能夠看到這裏面主要作的事情是將Boss分配給worker的客戶端channel和worker的selector關聯上,從而worker能夠處理該客戶端channel的IO事件。
到這裏就完成了由Boss接收到一個客戶端鏈接,到分配給某個worker,以及worker是怎麼去和客戶端的channel關聯的,其中因爲worker有可能爲多個客戶端channel服務,因此worker並不會直接和某個channel產生引用,而是將客戶端的channel註冊在該worker的selector上面,worker的run方法裏面經過不斷對selector的select輪詢,以達到對channel進行處理。接下來看看worker怎麼處理selector的io事件的
<!--java:lang--> private void processSelectedKeys(Set<SelectionKey> selectedKeys) throws IOException { for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) { SelectionKey k = i.next(); i.remove(); try { int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) { if (!read(k)) { // Connection already closed - no need to handle write. continue; } } if ((readyOps & SelectionKey.OP_WRITE) != 0) { writeFromSelectorLoop(k); } } catch (CancelledKeyException e) { close(k); } if (cleanUpCancelledKeys()) { break; // break the loop to avoid ConcurrentModificationException } } }
上面的方法完成的是處理selector產生的io事件,其中若是當前IO時間是讀,那麼將SelectionKey中的channel流進行讀出,而且向上交給Netty的Handler。若是是當前某個channel的寫知足條件,則觸發writeFromSelectorLoop
查看是否有待寫出的內容。
對於寫數據Netty在worker提供了三種入口
<!--lang:java--> void writeFromUserCode(final NioSocketChannel channel) { if (!channel.isConnected()) { cleanUpWriteBuffer(channel); return; } if (scheduleWriteIfNecessary(channel)) { return; } if (channel.writeSuspended) { return; } if (channel.inWriteNowLoop) { return; } write0(channel); } void writeFromTaskLoop(final NioSocketChannel ch) { if (!ch.writeSuspended) { write0(ch); } } void writeFromSelectorLoop(final SelectionKey k) { NioSocketChannel ch = (NioSocketChannel) k.attachment(); ch.writeSuspended = false; write0(ch); }
其中writeFromUserCode
是提供外部直接寫出的,writeFromTaskLoop
是在worker的run方法調用processWriteTaskQueue
時候會觸發。