Jetty : Embedded Server 啓動流程 - 2

前言

本文接着 Jetty : Embedded Server 啓動流程 - 1往下講,上回說到 Server.start 調用 Connector.start 方法,開始接收和處理請求,Server 默認使用 Connector 的子類 SelectChannelConnector,因此咱們重點來看看 SelectChannelConnector 的具體實現segmentfault

類層次

AbstractLifeCycle
    AggregateLifeCycle
        AbstractConnector
            AbstractNIOConnector
                SelectChannelConnector

AbstractLifeCycle,AggregateLifeCycle 咱們上文提到過,這裏有兩個新面孔:socket

  • AbstractConnector:Connector 抽象實現類
  • AbstractNIOConnector

AbstractConnector

上文講過 Server.start 方法最終會調用 Connector.start 方法,AbstractConnector 使用《模板方法》模式在 doStart 方法中實現了 start 的基本流程ide

@Override
protected void doStart() throws Exception {
    if (_server == null) {
        throw new IllegalStateException("No server");
    }
    
    // 子類 override open 方法,打開 server socket
    open();

    // 若是沒有指定 ThreadPool,默認使用 Server 的 ThreadPool
    if (_threadPool == null) {
        _threadPool = _server.getThreadPool();
        addBean(_threadPool, false);
    }
    ...
    synchronized (this) {
        _acceptorThreads = new Thread[getAcceptors()];
        for (int i = 0; i < _acceptorThreads.length; i++) {
            // 啓動 acceptor 線程監聽客戶端鏈接
            if (!_threadPool.dispatch(new Acceptor(i))) {
                throw new IllegalStateException("!accepting");
            }
            ...
        }
    }
    ...
}

幾個關鍵點:this

  • 子類經過 override(覆蓋)open 方法初始化 server socket
  • 經過 setAcceptors 方法能夠設置 acceptor 線程數量
  • 若是沒有特殊指定,acceptor 線程 和 請求處理線程在(Server)同一個線程池裏頭
  • Acceptor 類(實現了 Runnable 接口)實現了具體的 accept 方法

Acceptor

Acceptor 類是 AbstractConnector 類的內部類,它在 Run 方法裏頭調用 AbstractConnector.accept 方法接收客戶端鏈接spa

private class Acceptor implements Runnable {
    public void run() {
        Thread current = Thread.currentThread();
        ...
        accept(_acceptor);
        ...
    }
}

accept 方法是個抽象方法,由 AbstractConnector 的子類提供具體實現線程

SelectChannelConnector

咱們重點關注和啓動流程相關的三個方法:open,doStart,accept 以及一個類 SelectorManagerdebug

open

上文提到 AbstractConnector 在 start 方法中調用 open 方法,子類 override open 方法打開 server socketcode

public void open() throws IOException {
        synchronized(this) {
            if (_acceptChannel == null) {
                // Create a new server socket
                _acceptChannel = ServerSocketChannel.open();
                // Set to blocking mode,阻塞接收鏈接請求
                _acceptChannel.configureBlocking(true);

                // Bind the server socket to the local host and port
                _acceptChannel.socket().setReuseAddress(getReuseAddress());
                InetSocketAddress addr = getHost()==null ?
                    new InetSocketAddress(getPort()) :
                    new InetSocketAddress(getHost(), getPort());
                _acceptChannel.socket().bind(addr,getAcceptQueueSize());

                _localPort=_acceptChannel.socket().getLocalPort();
                if (_localPort<=0)
                    throw new IOException("Server channel not bound");

                addBean(_acceptChannel);
            }
        }
    }

accept

@Override
public void accept(int acceptorID) throws IOException {
    ServerSocketChannel server;
    synchronized(this) {
        server = _acceptChannel;
    }

    if (server!=null && server.isOpen() && _manager.isStarted()) {
        // 獲取客戶端 SocketChannel
        SocketChannel channel = server.accept();
        // 設置非阻塞模式(NIO)
        channel.configureBlocking(false);
        Socket socket = channel.socket();
        configure(socket);
        // 將 channel 註冊到 SelectorManager(見下文)
        _manager.register(channel);
    }
}

經過 server.accept 獲取到客戶端 SocketChannel,並將它註冊到 _manager(SelectorManager),這個 _manager 是啥?server

SelectorManager

The Selector Manager manages and number of SelectSets to allow NIO Scheduling to scale to large numbers of connections
注意 SelectChannelConnector 在構造方法裏將 _manager 做爲 managed bean 添加到 bean registry 裏,這樣在 SelectChannelConnector 啓動(start 方法被調用)的時候 SelectManager 也會跟着啓動(參考上文)接口

public class SelectChannelConnector extends AbstractNIOConnector {
    private final SelectorManager _manager = new ConnectorSelectorManager();

    public SelectChannelConnector() {
        _manager.setMaxIdleTime(getMaxIdleTime());
        addBean(_manager, true);
        ...
    }
}

SelectManager doStart 方法,這裏只保留方法主要邏輯

@Override
protected void doStart() throws Exception {
    _selectSet = new SelectSet[_selectSets];
    for (int i = 0; i < _select.length; i++) {
        _selectSet[i] = new SelectSet(i);
    }
    super.doStart();
    // start a thread to select
    for (int i = 0; i < getSelectSets(); i++) {
        final int id = i;
        // 提交 select runnable 到線程池
        boolean selecting = dispatch(new Runnalbe() {
            public void run() {
                ...
                LOG.debug("Starting {} on {}", Thread.currentThread(), this);
                while (isRunning()) {
                    try {
                        // 對註冊的 channel 進行多路選擇(select)
                        set.doSelect();
                    } catch (...) {
                        ...
                    }
                }
            }
        });
    }
}

doStart 方法啓動 _selectSet 個線程監聽 channel select 事件,咱們回過頭來看 SelectManager 的 register 方法

public void register(SocketChannel acceptChannel) {
    int s=_set++;
    if (s<0) {
        s=-s;
    }
    s=s%_selectSets;
    SelectSet set=_selectSet[s];
    set.addChange(acceptChannel);
    set.wakeup();
}

acceptChannel 被均勻分配(addChange)給 SelectSet

總結

到目前爲止咱們總結一下:

  • Server 啓動 N 個 Accept 線程接收客戶端鏈接
  • Server 啓動 N 個 Select 線程對 SocketChannel 進行多路IO選擇,每一個線程執行 SelectSet 的 doSelect 方法
  • Server 接收到客戶端鏈接後將 SocketChannel 註冊到 SelectorManager
  • SelectorManager 將註冊的 SocketChannel 均勻分配給各個 SelectSet,同時喚醒 Select 線程迎接新的客戶端請求
相關文章
相關標籤/搜索