Netty裏面的Boss和Worker【Server篇】

#Netty裏面的Boss和Worker【Server篇】 最近在總結Dubbo關於Netty通訊方面的實現,因而也就藉此機會深刻體會了一下Netty。通常啓動Netty的Server端時都會設置兩個ExecutorService對象,咱們都習慣用boss,worker兩個變量來引用這兩個對象,因而從我一開始接觸Netty就有了boss和worker的概念。這篇博客將對boss和worker進行介紹,但並非涉及Netty其餘部分介紹。java

在Netty的裏面有一個Boss,他開了一家公司(開啓一個服務端口)對外提供業務服務,它手下有一羣作事情的workers。Boss一直對外宣傳本身公司提供的業務,而且接受(accept)有須要的客戶(client),當一位客戶找到Boss說須要他公司提供的業務,Boss便會爲這位客戶安排一個worker,這個worker全程爲這位客戶服務(read/write)。若是公司業務繁忙,一個worker可能會爲多個客戶進行服務。這就是Netty裏面Boss和worker之間的關係。下面看看Netty是如何讓Boss和Worker進行協助的。bootstrap

<!--lang:java-->
protected void doOpen() throws Throwable {
    NettyHelper.setNettyLoggerFactory();
    ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
    ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
    ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
    bootstrap = new ServerBootstrap(channelFactory);
    
    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    channels = nettyHandler.getChannels();
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        public ChannelPipeline getPipeline() {
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
            ChannelPipeline pipeline = Channels.pipeline();
            pipeline.addLast("decoder", adapter.getDecoder());
            pipeline.addLast("encoder", adapter.getEncoder());
            pipeline.addLast("handler", nettyHandler);
            return pipeline;
        }
    });
    // bind
    channel = bootstrap.bind(getBindAddress());
}

上面這段代碼是Dubbo用來開啓服務的,也是大部分使用Netty進行服務端開發經常使用的方式啓動服務端。首先是設置boss和worker的線程池,以可以讓它們在各自的線程池裏面異步執行。當調用bootstrap.bind(getBindAddress())的時候最終受理綁定操做的是NioServerSocketPipelineSinkeventSunk方法,看類名和方法簽名就應該知道是處理IO事件的。方法eventSunk實現以下:異步

<!--lang:java-->
public void eventSunk(
        ChannelPipeline pipeline, ChannelEvent e) throws Exception {
    Channel channel = e.getChannel();
    if (channel instanceof NioServerSocketChannel) {
        handleServerSocket(e);
    } else if (channel instanceof NioSocketChannel) {
        handleAcceptedSocket(e);
    }
}

因爲這個時候Server還處於bind階段,因此channel確定不是NioSocketChannel,因而就到了方法handleServerSocket裏面,最後將會調用bind方法來綁定某個端口啓動服務。下面是bind方法實現:socket

<!--lang:java-->
private void bind(
        NioServerSocketChannel channel, ChannelFuture future,
        SocketAddress localAddress) {

    boolean bound = false;
    boolean bossStarted = false;
    try {
        channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
        bound = true;

        future.setSuccess();
        fireChannelBound(channel, channel.getLocalAddress());

        Executor bossExecutor =
            ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
        DeadLockProofWorker.start(
                bossExecutor,
                new ThreadRenamingRunnable(
                        new Boss(channel),
                        "New I/O server boss #" + id + " (" + channel + ')'));
        bossStarted = true;
    } catch (Throwable t) {
        future.setFailure(t);
        fireExceptionCaught(channel, t);
    } finally {
        if (!bossStarted && bound) {
            close(channel, future);
        }
    }
}

能夠看到socket的綁定以及設置異步的future成功,已通知服務啓動成功,同時將綁定成功事件通知出去。接下來我看的重點來了,就是bossExecutor,能夠看到它是經過NioServerSocketChannelFactory裏面去獲取的,NioServerSocketChannelFactory裏面的boss就是以前咱們設置進去的,能夠肯定咱們以前設置boss的異步線程池是在這裏被使用了。緊接下來的是啓動咱們的異步線程池,到這裏進入了Boss該作的事情,Boss實際上是實現了Runnable接口,從而能夠交給boss的線程池運行,接下來的關注點就是Boss的run方法,這裏纔是Boss作事情的地方。再此以前先看看Boss初始化作了什麼事情:oop

<!--lang:java-->
Boss(NioServerSocketChannel channel) throws IOException {
        this.channel = channel;

        selector = Selector.open();

        boolean registered = false;
        try {
            channel.socket.register(selector, SelectionKey.OP_ACCEPT);
            registered = true;
        } finally {
            if (!registered) {
                closeSelector();
            }
        }

        channel.selector = selector;
    }

Boss初始化過程當中其實就是將serversocket註冊到一個selector裏面,從而能夠實現NIO的異步IO處理。this

<!--lang:java-->
public void run() {
        final Thread currentThread = Thread.currentThread();

        channel.shutdownLock.lock();
        try {
            for (;;) {
                try {
                    if (selector.select(1000) > 0) {
                        selector.selectedKeys().clear();
                    }

                    SocketChannel acceptedSocket = channel.socket.accept();
                    if (acceptedSocket != null) {
                        registerAcceptedChannel(acceptedSocket, currentThread);
                    }
                } catch (SocketTimeoutException e) {
                    // Thrown every second to get ClosedChannelException
                    // raised.
                } catch (CancelledKeyException e) {
                    // Raised by accept() when the server socket was closed.
                } catch (ClosedSelectorException e) {
                    // Raised by accept() when the server socket was closed.
                } catch (ClosedChannelException e) {
                    // Closed as requested.
                    break;
                } catch (Throwable e) {
                    logger.warn(
                            "Failed to accept a connection.", e);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e1) {
                        // Ignore
                    }
                }
            }
        } finally {
            channel.shutdownLock.unlock();
            closeSelector();
        }
    }

run方法裏面是一個死循環,裏面在不間斷的等待客戶端的鏈接,若是有客戶端的鏈接,那麼將會調用方法registerAcceptedChannel進行後續的處理。線程

<!--lang:java-->
 private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
        try {
            ChannelPipeline pipeline =
                channel.getConfig().getPipelineFactory().getPipeline();
            NioWorker worker = nextWorker();
            worker.register(new NioAcceptedSocketChannel(
                    channel.getFactory(), pipeline, channel,
                    NioServerSocketPipelineSink.this, acceptedSocket,
                    worker, currentThread), null);
        } catch (Exception e) {
            logger.warn(
                    "Failed to initialize an accepted socket.", e);
            try {
                acceptedSocket.close();
            } catch (IOException e2) {
                logger.warn(
                        "Failed to close a partially accepted socket.",
                        e2);
            }
        }
    }

方法registerAcceptedChannel就是將客戶端的channle分配給一個worker,而這個worker是經過方法nextWorker獲取 <!--lang:java--> NioWorker nextWorker() { return workers[Math.abs( workerIndex.getAndIncrement() % workers.length)]; }rest

能夠看到方法nextWorker是一個讓worker裏面的客戶端channel保持平衡的做用,可能你會疑問這個workers是哪裏來的,實際上是在上面初始化NioServerSocketChannelFactory的時候,NioServerSocketChannelFactory再去初始化NioServerSocketPipelineSink時候構造出來的,默認狀況下workers的數量是咱們初始化NioServerSocketChannelFactory設置進去的。能夠看到是調用worker的register方法將客戶端的channel註冊到worker裏面的。netty

<!--lang:java-->
void register(NioSocketChannel channel, ChannelFuture future) {

    boolean server = !(channel instanceof NioClientSocketChannel);
    Runnable registerTask = new RegisterTask(channel, future, server);
    Selector selector;

    synchronized (startStopLock) {
        if (!started) {
            .....
                this.selector = selector = Selector.open();
           .....
                DeadLockProofWorker.start(
                        executor, new ThreadRenamingRunnable(this, threadName));
                success = true;
          .....
        } else {
            selector = this.selector;
        }

        assert selector != null && selector.isOpen();

        started = true;
        boolean offered = registerTaskQueue.offer(registerTask);
        assert offered;
    }

    if (wakenUp.compareAndSet(false, true)) {
        selector.wakeup();
    }
}

上面對worker有一個started狀態的檢測,若是沒啓動,則啓動worker,這個額通常都是將第一個客戶端的channel註冊到worker裏面才進行的。因爲worker也是實現了Rannable接口,因此啓動的主要工做就是讓worker在某個線程裏面跑起來,而且爲這個worker分配一個selector,用來進行監控IO事件。下面即是這個過程實現:code

<!--lang:java-->
  DeadLockProofWorker.start(
                        executor, new ThreadRenamingRunnable(this, threadName));
                success = true;

其中的executor即是咱們一開始設置的workerExecutor。 worker啓動成功以後,接下來要作的即是讓worker管理器客戶端的channel

<!--lang:java-->
 Runnable registerTask = new RegisterTask(channel, future, server);
	.......
 boolean offered = registerTaskQueue.offer(registerTask);
        assert offered;

worker是將客戶端包裝成一個RegisterTask,而後放入隊列,可見RegisterTask也實現了Runnable接口。那放入隊列之後誰去取這個隊列裏面的數據呢?固然,確定是worker去取。上面介紹啓動worker的時候是讓worker在某個線程裏面跑起來,而且worker是實現了Rannable方法,因而運行worker的線程確定是調用worker的run方法。

<!--lang:java-->
 public void run() {
    thread = Thread.currentThread();
    boolean shutdown = false;
    Selector selector = this.selector;
    for (;;) {
         .....
        try {
            SelectorUtil.select(selector);
			.....

            cancelledKeys = 0;
            processRegisterTaskQueue();
            processWriteTaskQueue();
            processSelectedKeys(selector.selectedKeys());
             .....
        } catch (Throwable t) {
             
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
              
            }
        }
    }
}

能夠看到run方法裏面也是一個死循環,在不斷的輪詢調用selector的select IO的事件。接下來會調用三個方法processRegisterTaskQueue,processWriteTaskQueueprocessSelectedKeys。經過方法簽名就應該知道這個三個方法具體是作什麼事情的,第一個是處理上面registerTaskQueue的,而且queue裏面對象的run方法,而第二個processWriteTaskQueue是處理寫任務的,而processSelectedKeys是處理selector匹配的IO事件。咱們先看看registerTaskQueue是作了什麼?

<!--lang:java-->
 private void processRegisterTaskQueue() throws IOException {
    for (;;) {
        final Runnable task = registerTaskQueue.poll();
        if (task == null) {
            break;
        }

        task.run();
        cleanUpCancelledKeys();
    }
}

上面介紹過registerTaskQueue裏面的元素是RegisterTask。因此須要去看看RegisterTask的run方法實現,其中RegisterTaskNioWorker裏面的內部類,因此RegisterTask是能夠訪問NioWorker的元素信息。

<!--lang:java-->
 public void run() {
        SocketAddress localAddress = channel.getLocalAddress();
        SocketAddress remoteAddress = channel.getRemoteAddress();
        if (localAddress == null || remoteAddress == null) {
            if (future != null) {
                future.setFailure(new ClosedChannelException());
            }
            close(channel, succeededFuture(channel));
            return;
        }

        try {
            if (server) {
                channel.socket.configureBlocking(false);
            }

            synchronized (channel.interestOpsLock) {
                channel.socket.register(
                        selector, channel.getRawInterestOps(), channel);
            }
            if (future != null) {
                channel.setConnected();
                future.setSuccess();
            }
        } catch (IOException e) {
            if (future != null) {
                future.setFailure(e);
            }
            close(channel, succeededFuture(channel));
           ....
        }

        if (!server) {
            if (!((NioClientSocketChannel) channel).boundManually) {
                fireChannelBound(channel, localAddress);
            }
            fireChannelConnected(channel, remoteAddress);
        }
    }

能夠看到這裏面主要作的事情是將Boss分配給worker的客戶端channel和worker的selector關聯上,從而worker能夠處理該客戶端channel的IO事件。

到這裏就完成了由Boss接收到一個客戶端鏈接,到分配給某個worker,以及worker是怎麼去和客戶端的channel關聯的,其中因爲worker有可能爲多個客戶端channel服務,因此worker並不會直接和某個channel產生引用,而是將客戶端的channel註冊在該worker的selector上面,worker的run方法裏面經過不斷對selector的select輪詢,以達到對channel進行處理。接下來看看worker怎麼處理selector的io事件的

<!--java:lang-->
private void processSelectedKeys(Set<SelectionKey> selectedKeys) throws IOException {
    for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
        SelectionKey k = i.next();
        i.remove();
        try {
            int readyOps = k.readyOps();
            if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
                if (!read(k)) {
                    // Connection already closed - no need to handle write.
                    continue;
                }
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                writeFromSelectorLoop(k);
            }
        } catch (CancelledKeyException e) {
            close(k);
        }

        if (cleanUpCancelledKeys()) {
            break; // break the loop to avoid ConcurrentModificationException
        }
    }
}

上面的方法完成的是處理selector產生的io事件,其中若是當前IO時間是讀,那麼將SelectionKey中的channel流進行讀出,而且向上交給Netty的Handler。若是是當前某個channel的寫知足條件,則觸發writeFromSelectorLoop查看是否有待寫出的內容。

對於寫數據Netty在worker提供了三種入口

<!--lang:java-->
void writeFromUserCode(final NioSocketChannel channel) {
    if (!channel.isConnected()) {
        cleanUpWriteBuffer(channel);
        return;
    }

    if (scheduleWriteIfNecessary(channel)) {
        return;
    }

    if (channel.writeSuspended) {
        return;
    }

    if (channel.inWriteNowLoop) {
        return;
    }

    write0(channel);
}

void writeFromTaskLoop(final NioSocketChannel ch) {
    if (!ch.writeSuspended) {
        write0(ch);
    }
}

void writeFromSelectorLoop(final SelectionKey k) {
    NioSocketChannel ch = (NioSocketChannel) k.attachment();
    ch.writeSuspended = false;
    write0(ch);
}

其中writeFromUserCode是提供外部直接寫出的,writeFromTaskLoop是在worker的run方法調用processWriteTaskQueue時候會觸發。

相關文章
相關標籤/搜索