本文接着 Jetty : Embedded Server 啓動流程 - 1往下講,上回說到 Server.start 調用 Connector.start 方法,開始接收和處理請求,Server 默認使用 Connector 的子類 SelectChannelConnector,因此咱們重點來看看 SelectChannelConnector 的具體實現segmentfault
AbstractLifeCycle AggregateLifeCycle AbstractConnector AbstractNIOConnector SelectChannelConnector
AbstractLifeCycle,AggregateLifeCycle 咱們上文提到過,這裏有兩個新面孔:socket
上文講過 Server.start 方法最終會調用 Connector.start 方法,AbstractConnector 使用《模板方法》模式在 doStart 方法中實現了 start 的基本流程ide
@Override protected void doStart() throws Exception { if (_server == null) { throw new IllegalStateException("No server"); } // 子類 override open 方法,打開 server socket open(); // 若是沒有指定 ThreadPool,默認使用 Server 的 ThreadPool if (_threadPool == null) { _threadPool = _server.getThreadPool(); addBean(_threadPool, false); } ... synchronized (this) { _acceptorThreads = new Thread[getAcceptors()]; for (int i = 0; i < _acceptorThreads.length; i++) { // 啓動 acceptor 線程監聽客戶端鏈接 if (!_threadPool.dispatch(new Acceptor(i))) { throw new IllegalStateException("!accepting"); } ... } } ... }
幾個關鍵點:this
Acceptor 類是 AbstractConnector 類的內部類,它在 Run 方法裏頭調用 AbstractConnector.accept 方法接收客戶端鏈接spa
private class Acceptor implements Runnable { public void run() { Thread current = Thread.currentThread(); ... accept(_acceptor); ... } }
accept 方法是個抽象方法,由 AbstractConnector 的子類提供具體實現線程
咱們重點關注和啓動流程相關的三個方法:open,doStart,accept 以及一個類 SelectorManagerdebug
上文提到 AbstractConnector 在 start 方法中調用 open 方法,子類 override open 方法打開 server socketcode
public void open() throws IOException { synchronized(this) { if (_acceptChannel == null) { // Create a new server socket _acceptChannel = ServerSocketChannel.open(); // Set to blocking mode,阻塞接收鏈接請求 _acceptChannel.configureBlocking(true); // Bind the server socket to the local host and port _acceptChannel.socket().setReuseAddress(getReuseAddress()); InetSocketAddress addr = getHost()==null ? new InetSocketAddress(getPort()) : new InetSocketAddress(getHost(), getPort()); _acceptChannel.socket().bind(addr,getAcceptQueueSize()); _localPort=_acceptChannel.socket().getLocalPort(); if (_localPort<=0) throw new IOException("Server channel not bound"); addBean(_acceptChannel); } } }
@Override public void accept(int acceptorID) throws IOException { ServerSocketChannel server; synchronized(this) { server = _acceptChannel; } if (server!=null && server.isOpen() && _manager.isStarted()) { // 獲取客戶端 SocketChannel SocketChannel channel = server.accept(); // 設置非阻塞模式(NIO) channel.configureBlocking(false); Socket socket = channel.socket(); configure(socket); // 將 channel 註冊到 SelectorManager(見下文) _manager.register(channel); } }
經過 server.accept 獲取到客戶端 SocketChannel,並將它註冊到 _manager(SelectorManager),這個 _manager 是啥?server
The Selector Manager manages and number of SelectSets to allow NIO Scheduling to scale to large numbers of connections
注意 SelectChannelConnector 在構造方法裏將 _manager 做爲 managed bean 添加到 bean registry 裏,這樣在 SelectChannelConnector 啓動(start 方法被調用)的時候 SelectManager 也會跟着啓動(參考上文)接口
public class SelectChannelConnector extends AbstractNIOConnector { private final SelectorManager _manager = new ConnectorSelectorManager(); public SelectChannelConnector() { _manager.setMaxIdleTime(getMaxIdleTime()); addBean(_manager, true); ... } }
SelectManager doStart 方法,這裏只保留方法主要邏輯
@Override protected void doStart() throws Exception { _selectSet = new SelectSet[_selectSets]; for (int i = 0; i < _select.length; i++) { _selectSet[i] = new SelectSet(i); } super.doStart(); // start a thread to select for (int i = 0; i < getSelectSets(); i++) { final int id = i; // 提交 select runnable 到線程池 boolean selecting = dispatch(new Runnalbe() { public void run() { ... LOG.debug("Starting {} on {}", Thread.currentThread(), this); while (isRunning()) { try { // 對註冊的 channel 進行多路選擇(select) set.doSelect(); } catch (...) { ... } } } }); } }
doStart 方法啓動 _selectSet 個線程監聽 channel select 事件,咱們回過頭來看 SelectManager 的 register 方法
public void register(SocketChannel acceptChannel) { int s=_set++; if (s<0) { s=-s; } s=s%_selectSets; SelectSet set=_selectSet[s]; set.addChange(acceptChannel); set.wakeup(); }
acceptChannel 被均勻分配(addChange)給 SelectSet
到目前爲止咱們總結一下: