基於Jetty-9.4.8.v20171121。java
Connector接受遠程機器的鏈接和數據,容許應用向遠程機器發送數據。api
從類圖看出AbstractConnector繼承ContainerLifeCycle,因此具備Container和LifeCycle特性。數組
此外有一個ServerConnector,這個是整個Jetty中很重要的鏈接器,目前該鏈接器負責HTTP和HTTPS協議等鏈接。緩存
ConnectionFactory負責爲鏈接器建立鏈接對象,不一樣的鏈接(HTTP)建立不一樣的鏈接對象。服務器
主要都是一些getter方法,獲取該鏈接器相關的信息。異步
@ManagedObject("Connector Interface") public interface Connector extends LifeCycle, Container, Graceful { // 與這個鏈接器關聯的服務器 public Server getServer(); // 返回執行任務的執行器 public Executor getExecutor(); // 返回調度任務的調度器 public Scheduler getScheduler(); // 數據緩衝區 public ByteBufferPool getByteBufferPool(); // 返回與協議名稱對應的ConnectionFactory對象 public ConnectionFactory getConnectionFactory(String nextProtocol); public <T> T getConnectionFactory(Class<T> factoryType); // 返回默認ConnectionFactory對象 public ConnectionFactory getDefaultConnectionFactory(); // 返回全部Connection工廠 public Collection<ConnectionFactory> getConnectionFactories(); public List<String> getProtocols(); // 返回最大空閒鏈接時間 @ManagedAttribute("maximum time a connection can be idle before being closed (in ms)") public long getIdleTimeout(); // 返回這個對象底層的socket,channel,buffer等 public Object getTransport(); /** * @return immutable collection of connected endpoints */ // 返回鏈接端的不可變集合 public Collection<EndPoint> getConnectedEndPoints(); public String getName(); }
AbstractConnector利用ConnectionFactory工廠機制爲不一樣協議(HTTP,SSL等)建立Connection實例。 socket
AbstractConnector管理着鏈接器必須的幾個基本服務:ide
(1)Executor:Executor服務用於運行該鏈接器所需的全部活動任務,(例如接受鏈接,處理HTTP請求),默認使用Server.getThreadPool做爲Executor;函數
(2)Scheduler:調度器服務用於監視全部鏈接的空閒超時,而且也可用於監控鏈接時間,例如異步請求超時,默認使用ScheduledExecutorScheduler實例;this
(3)ByteBufferPool:ByteBufferPool服務提供給全部鏈接,用於從池中獲取和釋放ByteBuffer實例。
這些服務做爲bean被Container管理,能夠是託管或未託管。
鏈接器有一個ConnectionFactory集合,每一個ConnectionFactory有對應的協議名稱。協議名稱能夠是現實的協議好比https/1.1或http2,甚至能夠是私有協議名稱。
好比SSL-http/1.1標示SslConnectionFactory,它是由HttpConnectionFactory實例化而且做爲HttpConnectionFactory下一個協議。
ConnectionFactory集合能夠經過構造函數注入,經過addConnectionFactory,removeConnectionFactory和setConnectionFactories修改。每一個協議名稱只能對應一個ConnectionFactory實例,若是兩個ConnectionFactory對應一個協議名稱,那麼第二個將替換第一個。
最新ConnectionFactory經過setDefaultProtocol方法設置,或第一次配置的協議工廠。
每一個ConnectionFactory類型負責它所接受的協議配置。爲了配置HTTP協議,你須要傳遞HttpConfiguration實例到HttpConnectionFactory(或者其餘支持HTTP的ConnectionFactory);類似地,SslConnectionFactory須要SslContextFactory對象和下一個協議名稱。
(1)ConnectionFactory能夠簡單建立Connection對象去支持特定協議。好比HttpConnectionFactory能建立HttpConnection處理http/1.1,http/1.0和http/0.9;
(2)ConnectionFactory也能夠經過其餘ConnectionFactory建立一系列Connection對象。好比SslConnectionFactory配置了下一個協議名稱,一旦接受了請求建立了SslConnection對象。而後能夠經過鏈接器的getConnectionFactory獲取下一個ConnectionFactory,這個ConnectionFactory產生的Connection能夠處理從SslConnection獲取的未加密的數據;
(3)ConnectionFactory也能夠建立一個臨時Connection,用於在鏈接上交換數據,以肯定下一個使用的協議。例如,ALPN(Application Layer Protocol Negotiation)協議是SSL的擴展,容許在SSL握手期間指定協議,ALPN用於HTTP2在客戶端與服務器之間通訊協商協議。接受一個HTTP2鏈接,鏈接器會配置SSL-ALPN, h2,http/1.1。一個新接受的鏈接使用「SSL-ALPN」,它指定一個帶有「ALPN」的SSLConnectionFactory做爲下一個協議。所以,一個SSL鏈接實例被連接到一個ALPN鏈接實例。ALPN而後與客戶端協商下一個協議,多是http2,或則http/1.1。一旦決定了下一個協議,ALPN鏈接調用getConnectionFactory建立鏈接實例,而後替換ALPN鏈接。
Connector在運行中會重複調用accept(int)方法,acceptor任務運行在一個循環中。
accept方法實現必須知足以下幾點:
(1)阻塞等待新鏈接;
(2)接受鏈接(好比socket accept);
(3)配置鏈接;
(4)調用getDefaultConnectionFactory->ConnectionFactory.newConnection去建立一個新Connection。
acceptor默認的數量是1,數量能夠是CPU的個數除以8。更多的acceptor能夠減小服務器延遲並且能夠得到一個高速的鏈接(好比http/1.0沒有keepalive)。對於現代持久鏈接協議(http/1.1,http/2)默認值是足夠的。
(1)實現Connector接口,實現基本的鏈接器能力;
(2)繼承ContainerLifeCycle類,具有容器化和生命週期能力;
(3)AbstractConnector有一個內部類Acceptor;
(4)ConnectionFactory工廠接口,實現類有HttpConnectionFactory,SslConnectionFactory,針對不一樣的鏈接類型如HTTP,HTTPS產生不一樣的工廠建立不一樣的產品(Connection),這是典型的工廠方法模式;
AbstractConnector實現Connector接口,而Connector接口提供的都是getter方法,都比較簡單,通常都是直接返回某個對象;
AbstractConnector繼承ContainerLifeCycle類,因此具備LifeCycle特性,有啓動中止動做;
// 鎖對象,設置Accepting,獲取ConnectionFactory等操做 // Locker類封裝了ReentrantLock類 private final Locker _locker = new Locker(); private final Condition _setAccepting = _locker.newCondition(); // ConnectionFactory緩存,key爲協議名稱,value爲ConnectionFactory實例 private final Map<String, ConnectionFactory> _factories = new LinkedHashMap<>(); // 與鏈接器對應的server private final Server _server; // 下面executor,scheduler,byteBufferPool是Connector必須的組件,Connector接口的幾個getter方法就是返回這些對象。 private final Executor _executor; private final Scheduler _scheduler; private final ByteBufferPool _byteBufferPool; // 接受鏈接的線程數組 private final Thread[] _acceptors; // 已經創建的對端鏈接對象 private final Set<EndPoint> _endpoints = Collections.newSetFromMap(new ConcurrentHashMap<>()); private final Set<EndPoint> _immutableEndPoints = Collections.unmodifiableSet(_endpoints); // Connector中止時,須要把Thread[] _acceptors中止 // _stopping = new CountDownLatch(_acceptors.length) private CountDownLatch _stopping;
// 默認空閒鏈接時間 private long _idleTimeout = 30000; // 默認協議名稱,對應ConnectionFactory private String _defaultProtocol; // 默認ConnectionFactory工廠 private ConnectionFactory _defaultConnectionFactory; private String _name; private int _acceptorPriorityDelta=-2; private boolean _accepting = true; private ThreadPoolBudget.Lease _lease;
從構造函數來看,已經初始化了以下字段:
(1)_factories
(2)_defaultProtocol
(3)_defaultConnectionFactory
(4)_server
(5)_executor
(6)_scheduler
(7)_byteBufferPool
(8)_acceptors
// 惟一的構造函數 public AbstractConnector( Server server, Executor executor, Scheduler scheduler, ByteBufferPool pool, int acceptors, ConnectionFactory... factories) { _server=server; // 若是Executor爲null,則獲取Server的 _executor=executor!=null?executor:_server.getThreadPool(); if (scheduler==null) scheduler=_server.getBean(Scheduler.class); _scheduler=scheduler!=null?scheduler:new ScheduledExecutorScheduler(); if (pool==null) pool=_server.getBean(ByteBufferPool.class); _byteBufferPool = pool!=null?pool:new ArrayByteBufferPool(); addBean(_server,false); addBean(_executor); if (executor==null) unmanage(_executor); // inherited from server addBean(_scheduler); addBean(_byteBufferPool); // 緩存ConnectionFactory,若是沒有設置,則爲HttpConnectionFactory for (ConnectionFactory factory:factories) addConnectionFactory(factory); int cores = Runtime.getRuntime().availableProcessors(); if (acceptors < 0) acceptors=Math.max(1, Math.min(4,cores/8)); if (acceptors > cores) LOG.warn("Acceptors should be <= availableProcessors: " + this); // 設置acceptor進程數組 _acceptors = new Thread[acceptors]; }
調用addConnectionFactory方法能夠緩存全部的ConnectionFactory:
public void addConnectionFactory(ConnectionFactory factory) { if (isRunning()) throw new IllegalStateException(getState()); // 須要移除的ConnectionFactory Set<ConnectionFactory> to_remove = new HashSet<>(); for (String key:factory.getProtocols()) { key=StringUtil.asciiToLowerCase(key); ConnectionFactory old=_factories.remove(key); // 先移除協議名稱對應的ConnectionFactory對象 if (old!=null) { if (old.getProtocol().equals(_defaultProtocol)) _defaultProtocol=null; to_remove.add(old); // 保存待移除 } _factories.put(key, factory); // 增長新的ConnectionFactory } // keep factories still referenced // 避免一種場景:若是_factories裏面已經緩存了HttpConnectionFactory,對應的協議名稱爲http/1.1 // 而後增長的factory也是HttpConnectionFactory對應的協議名稱爲http/1.1通過上面的操做,to_remove裏面有HttpConnectionFactory這樣下面removeBean的時候會誤刪 for (ConnectionFactory f : _factories.values()) to_remove.remove(f); // remove old factories for (ConnectionFactory old: to_remove) { removeBean(old); if (LOG.isDebugEnabled()) LOG.debug("{} removed {}", this, old); } // add new Bean addBean(factory); if (_defaultProtocol==null) _defaultProtocol=factory.getProtocol(); if (LOG.isDebugEnabled()) LOG.debug("{} added {}", this, factory); }
在Jetty中,服務器對象Server在啓動的時候會啓動該服務器管理的全部Connector,從全部Connector的繼承關係中能夠看出AbstractConnector對象是全部具體Connector對象的父類。
// start connectors last for (Connector connector : _connectors) { try { connector.start(); // 啓動Connector對象 } catch(Throwable e) { mex.add(e); } }
下面具體看一下AbstractConnector.doStart的具體實現。
@Override protected void doStart() throws Exception { // 一個Connector至少有一個ConnectionFactory對象 if(_defaultProtocol==null) throw new IllegalStateException("No default protocol for "+this); _defaultConnectionFactory = getConnectionFactory(_defaultProtocol); if(_defaultConnectionFactory==null) throw new IllegalStateException("No protocol factory for default protocol '"+_defaultProtocol+"' in "+this); // 若是是SslConnectionFactory,則必需要有下一個協議ConnectionFactory SslConnectionFactory ssl = getConnectionFactory(SslConnectionFactory.class); if (ssl != null) { String next = ssl.getNextProtocol(); ConnectionFactory cf = getConnectionFactory(next); if (cf == null) throw new IllegalStateException("No protocol factory for SSL next protocol: '" + next + "' in " + this); } _lease = ThreadPoolBudget.leaseFrom(getExecutor(),this,_acceptors.length); super.doStart(); // 設置全部acceptor線程中止的同步器 _stopping=new CountDownLatch(_acceptors.length); for (int i = 0; i < _acceptors.length; i++) { Acceptor a = new Acceptor(i); addBean(a); getExecutor().execute(a); // 啓動接受器 } LOG.info("Started {}", this); }
Acceptor接收器負責接受鏈接且自己是一個線程。
@Override public void run() { // (1)設置線程名稱與優先級 final Thread thread = Thread.currentThread(); String name=thread.getName(); _name=String.format("%s-acceptor-%d@%x-%s",name,_id,hashCode(),AbstractConnector.this.toString()); thread.setName(_name); int priority=thread.getPriority(); if (_acceptorPriorityDelta!=0) thread.setPriority(Math.max(Thread.MIN_PRIORITY,Math.min(Thread.MAX_PRIORITY,priority+_acceptorPriorityDelta))); _acceptors[_id] = thread; try { while (isRunning()) // (2)循環接受請求 { try (Locker.Lock lock = _locker.lock()) { if (!_accepting && isRunning()) { _setAccepting.await(); // 若是_accepting==false則阻塞,等待通知 continue; } } catch (InterruptedException e) { continue; } try { accept(_id); // (3)接受請求,這個是核心方法 } catch (Throwable x) { if (!handleAcceptFailure(x)) break; } } } finally { thread.setName(name); if (_acceptorPriorityDelta!=0) thread.setPriority(priority); synchronized (AbstractConnector.this) // Why? { _acceptors[_id] = null; } CountDownLatch stopping=_stopping; if (stopping!=null) stopping.countDown(); // 若是線程異常,則設置線程同步器減一 } }
在Acceptor的run方法裏面有個accept(int acceptorID)是AbstractConnector新增的抽象方法,負責處理鏈接請求。
到此AbstractConnector類的主要方法基本已經分析完畢,下面緊接着分析accept方法的實現,重點關注ServerConnector類。
ServerConnector主要用於TCP/IP鏈接,使用不一樣的ConnectionFactory實例,它能夠直接或經過SSL接受HTTP、HTTP/2和WebSocket的鏈接。
ServerConnector是一個基於NIO的徹底異步實現。鏈接器必需的服務(Executor,Scheduler等)默認使用傳入的Server實例;也能夠經過構造函數注入;
各類重載的構造函數用於ConnectionFactory的配置。若是沒有設置ConnectionFactory,構造函數將默認使用HttpConnectionFactory。SslContextFactory能夠實例化SslConnectionFactory。
鏈接器會使用Executor執行許多Selector任務,這些任務使用NIO的Selector異步調度accepted鏈接。selector線程將調用經過EndPoint.fillInterested(Callback)或EndPoint.write(Callback,ByteBuffer)傳入的Callback的方法。
這些回調能夠執行一些非阻塞的IO工做,但老是會向Executor服務發送任何阻塞、長時間運行或應用程序任務。Selector默認的數量是JVM可用核數的一半。
// 參數acceptorID其實沒有使用 @Override public void accept(int acceptorID) throws IOException { // 獲取與該鏈接器對應的ServerSocketChannel,該類是java.nio ServerSocketChannel serverChannel = _acceptChannel; if (serverChannel != null && serverChannel.isOpen()) { SocketChannel channel = serverChannel.accept(); // java api accepted(channel); // 接受具體的SocketChannel } }
在調用accepted方法以後具體的處理將交給SelectorManager類,ServerConnector使用的是ServerConnectorManager類。
(1)SelectorManager經過管理ManagedSelector對象簡化JVM原始提供的java.nio非阻塞操做;SelectorManager子類實現方法返回協議指定的EndPoint和Connection對象。
(2) ManagedSelector包裝Selector簡化在channel上面的非阻塞操做;ManagedSelector運行在select循環中而且阻塞在Selector.select()方法直到註冊channel事件發生,當有事件發生,它負責通知與當前channel相關的EndPoint。
public void accept(SelectableChannel channel) { accept(channel, null); } // 註冊Channel執行非阻塞讀寫操做 public void accept(SelectableChannel channel, Object attachment) { final ManagedSelector selector = chooseSelector(channel); // 選擇ManagedSelector對象 // Accept是一個Runnable子類,submit提交線程池運行 selector.submit(selector.new Accept(channel, attachment)); } private ManagedSelector chooseSelector(SelectableChannel channel) { return _selectors[_selectorIndex.updateAndGet(_selectorIndexUpdate)]; }
// 線程可執行類 class Accept extends Invocable.NonBlocking implements Closeable { private final SelectableChannel channel; private final Object attachment; Accept(SelectableChannel channel, Object attachment) { this.channel = channel; this.attachment = attachment; } @Override public void close() { LOG.debug("closed accept of {}", channel); closeNoExceptions(channel); } @Override public void run() { try { final SelectionKey key = channel.register(_selector, 0, attachment); // 建立EndPoint對象,而後提交線程池執行 submit(new CreateEndPoint(channel, key)); } catch (Throwable x) { closeNoExceptions(channel); LOG.debug(x); } } }
submit方法就是把線程任務提交到Queue<Runnable> _actions = new ArrayDeque<>();隊列中。
而後每一個ManagedSelector在doStart裏面啓動線程池不斷從_actions隊列中獲取任務執行,主要涉及的類有EatWhatYouKill,ManagedSelector.SelectorProducer等。
Jetty是基於Handler處理各類請求,下面重點分析如何調用最終的Handler處理器。
緊接着4.3中的CreateEndPoint類,調用createEndPoint方法,
private void createEndPoint(SelectableChannel channel, SelectionKey selectionKey) throws IOException { // endPoint的實際類型是SocketChannelEndPoint對象 EndPoint endPoint = _selectorManager.newEndPoint(channel, this, selectionKey); // 若是未設置ConnectionFactory,則默認是HttpConnectionFactory,鏈接對象類型是HttpConnection,即connection類型是HttpConnection Connection connection = _selectorManager.newConnection(channel, endPoint, selectionKey.attachment()); endPoint.setConnection(connection); selectionKey.attach(endPoint); // 當EndPoint打開,回調該方法 endPoint.onOpen(); // 當EndPoint打開,回調該方法,同時將endPoint保存在AbstractConnector的Set<EndPoint> _endpoints = Collections.newSetFromMap(new ConcurrentHashMap<>());裏面 _selectorManager.endPointOpened(endPoint); // 內部調用HttpConnection.onOpen() _selectorManager.connectionOpened(connection); if (LOG.isDebugEnabled()) LOG.debug("Created {}", endPoint); }
下面看看HttpConnection.onOpen()方法實現:
// HttpConnection @Override public void onOpen() { super.onOpen(); // 通知Listeners fillInterested(); } // AbstractConnection public void fillInterested() { if (LOG.isDebugEnabled()) LOG.debug("fillInterested {}",this); // SocketChannelEndPoint類型 getEndPoint().fillInterested(_readCallback); } // AbstractEndPoint @Override public void fillInterested(Callback callback) { notIdle(); _fillInterest.register(callback); // 最終調用SocketChannelEndPoint父類ChannelEndPoint.needsFillInterest }
最後補充一個業務調用棧: