Jetty - Connector源碼分析

1. 描述

基於Jetty-9.4.8.v20171121。java

Connector接受遠程機器的鏈接和數據,容許應用向遠程機器發送數據。api

1.2 類圖

從類圖看出AbstractConnector繼承ContainerLifeCycle,因此具備Container和LifeCycle特性。數組

此外有一個ServerConnector,這個是整個Jetty中很重要的鏈接器,目前該鏈接器負責HTTP和HTTPS協議等鏈接。緩存

ConnectionFactory負責爲鏈接器建立鏈接對象,不一樣的鏈接(HTTP)建立不一樣的鏈接對象。服務器

1.3 API能力

主要都是一些getter方法,獲取該鏈接器相關的信息。異步

@ManagedObject("Connector Interface")
public interface Connector extends LifeCycle, Container, Graceful
{
    // 與這個鏈接器關聯的服務器
    public Server getServer();

    // 返回執行任務的執行器
    public Executor getExecutor();

    // 返回調度任務的調度器
    public Scheduler getScheduler();

    // 數據緩衝區
    public ByteBufferPool getByteBufferPool();

    // 返回與協議名稱對應的ConnectionFactory對象
    public ConnectionFactory getConnectionFactory(String nextProtocol);
    

    public <T> T getConnectionFactory(Class<T> factoryType);
    
    // 返回默認ConnectionFactory對象
    public ConnectionFactory getDefaultConnectionFactory();
    // 返回全部Connection工廠
    public Collection<ConnectionFactory> getConnectionFactories();
    
    public List<String> getProtocols();
    
    // 返回最大空閒鏈接時間
    @ManagedAttribute("maximum time a connection can be idle before being closed (in ms)")
    public long getIdleTimeout();

    // 返回這個對象底層的socket,channel,buffer等
    public Object getTransport();
    
    /**
     * @return immutable collection of connected endpoints
     */
    // 返回鏈接端的不可變集合
    public Collection<EndPoint> getConnectedEndPoints();

    public String getName();
}

2. AbstractConnector

2.1 描述

AbstractConnector利用ConnectionFactory工廠機制爲不一樣協議(HTTP,SSL等)建立Connection實例。  socket

AbstractConnector管理着鏈接器必須的幾個基本服務:ide

(1)Executor:Executor服務用於運行該鏈接器所需的全部活動任務,(例如接受鏈接,處理HTTP請求),默認使用Server.getThreadPool做爲Executor;函數

(2)Scheduler:調度器服務用於監視全部鏈接的空閒超時,而且也可用於監控鏈接時間,例如異步請求超時,默認使用ScheduledExecutorScheduler實例;this

(3)ByteBufferPool:ByteBufferPool服務提供給全部鏈接,用於從池中獲取和釋放ByteBuffer實例。

這些服務做爲bean被Container管理,能夠是託管或未託管。

鏈接器有一個ConnectionFactory集合,每一個ConnectionFactory有對應的協議名稱。協議名稱能夠是現實的協議好比https/1.1或http2,甚至能夠是私有協議名稱。

好比SSL-http/1.1標示SslConnectionFactory,它是由HttpConnectionFactory實例化而且做爲HttpConnectionFactory下一個協議。

ConnectionFactory集合能夠經過構造函數注入,經過addConnectionFactory,removeConnectionFactory和setConnectionFactories修改。每一個協議名稱只能對應一個ConnectionFactory實例,若是兩個ConnectionFactory對應一個協議名稱,那麼第二個將替換第一個。

最新ConnectionFactory經過setDefaultProtocol方法設置,或第一次配置的協議工廠。

每一個ConnectionFactory類型負責它所接受的協議配置。爲了配置HTTP協議,你須要傳遞HttpConfiguration實例到HttpConnectionFactory(或者其餘支持HTTP的ConnectionFactory);類似地,SslConnectionFactory須要SslContextFactory對象和下一個協議名稱。

(1)ConnectionFactory能夠簡單建立Connection對象去支持特定協議。好比HttpConnectionFactory能建立HttpConnection處理http/1.1,http/1.0和http/0.9;

(2)ConnectionFactory也能夠經過其餘ConnectionFactory建立一系列Connection對象。好比SslConnectionFactory配置了下一個協議名稱,一旦接受了請求建立了SslConnection對象。而後能夠經過鏈接器的getConnectionFactory獲取下一個ConnectionFactory,這個ConnectionFactory產生的Connection能夠處理從SslConnection獲取的未加密的數據;

(3)ConnectionFactory也能夠建立一個臨時Connection,用於在鏈接上交換數據,以肯定下一個使用的協議。例如,ALPN(Application Layer Protocol Negotiation)協議是SSL的擴展,容許在SSL握手期間指定協議,ALPN用於HTTP2在客戶端與服務器之間通訊協商協議。接受一個HTTP2鏈接,鏈接器會配置SSL-ALPN, h2,http/1.1。一個新接受的鏈接使用「SSL-ALPN」,它指定一個帶有「ALPN」的SSLConnectionFactory做爲下一個協議。所以,一個SSL鏈接實例被連接到一個ALPN鏈接實例。ALPN而後與客戶端協商下一個協議,多是http2,或則http/1.1。一旦決定了下一個協議,ALPN鏈接調用getConnectionFactory建立鏈接實例,而後替換ALPN鏈接。

Connector在運行中會重複調用accept(int)方法,acceptor任務運行在一個循環中。

accept方法實現必須知足以下幾點:

(1)阻塞等待新鏈接;

(2)接受鏈接(好比socket accept);

(3)配置鏈接;

(4)調用getDefaultConnectionFactory->ConnectionFactory.newConnection去建立一個新Connection。

acceptor默認的數量是1,數量能夠是CPU的個數除以8。更多的acceptor能夠減小服務器延遲並且能夠得到一個高速的鏈接(好比http/1.0沒有keepalive)。對於現代持久鏈接協議(http/1.1,http/2)默認值是足夠的。

 

2.2 類圖

 

 

(1)實現Connector接口,實現基本的鏈接器能力;

(2)繼承ContainerLifeCycle類,具有容器化和生命週期能力;

(3)AbstractConnector有一個內部類Acceptor;

(4)ConnectionFactory工廠接口,實現類有HttpConnectionFactory,SslConnectionFactory,針對不一樣的鏈接類型如HTTP,HTTPS產生不一樣的工廠建立不一樣的產品(Connection),這是典型的工廠方法模式;

2.3 AbstractConnector源碼解讀

AbstractConnector實現Connector接口,而Connector接口提供的都是getter方法,都比較簡單,通常都是直接返回某個對象;

AbstractConnector繼承ContainerLifeCycle類,因此具備LifeCycle特性,有啓動中止動做;

2.3.1 字段描述

    // 鎖對象,設置Accepting,獲取ConnectionFactory等操做
    // Locker類封裝了ReentrantLock類
    private final Locker _locker = new Locker();
    private final Condition _setAccepting = _locker.newCondition();
    // ConnectionFactory緩存,key爲協議名稱,value爲ConnectionFactory實例
    private final Map<String, ConnectionFactory> _factories = new LinkedHashMap<>(); 
    // 與鏈接器對應的server
    private final Server _server;
    // 下面executor,scheduler,byteBufferPool是Connector必須的組件,Connector接口的幾個getter方法就是返回這些對象。
    private final Executor _executor;
    private final Scheduler _scheduler;
    private final ByteBufferPool _byteBufferPool;
    // 接受鏈接的線程數組
    private final Thread[] _acceptors;
    // 已經創建的對端鏈接對象
    private final Set<EndPoint> _endpoints = Collections.newSetFromMap(new ConcurrentHashMap<>());
    private final Set<EndPoint> _immutableEndPoints = Collections.unmodifiableSet(_endpoints);
    // Connector中止時,須要把Thread[] _acceptors中止
    // _stopping = new CountDownLatch(_acceptors.length)
    private CountDownLatch _stopping;
// 默認空閒鏈接時間 private long _idleTimeout = 30000; // 默認協議名稱,對應ConnectionFactory private String _defaultProtocol; // 默認ConnectionFactory工廠 private ConnectionFactory _defaultConnectionFactory; private String _name; private int _acceptorPriorityDelta=-2; private boolean _accepting = true; private ThreadPoolBudget.Lease _lease;

2.3.2 構造函數和addConnectionFactory

 從構造函數來看,已經初始化了以下字段:

(1)_factories

(2)_defaultProtocol

(3)_defaultConnectionFactory

(4)_server

(5)_executor

(6)_scheduler

(7)_byteBufferPool

(8)_acceptors 

// 惟一的構造函數 
public AbstractConnector(
            Server server,
            Executor executor,
            Scheduler scheduler,
            ByteBufferPool pool,
            int acceptors,
            ConnectionFactory... factories)
    {
        _server=server;
        // 若是Executor爲null,則獲取Server的
        _executor=executor!=null?executor:_server.getThreadPool();
        if (scheduler==null)
            scheduler=_server.getBean(Scheduler.class);
        _scheduler=scheduler!=null?scheduler:new ScheduledExecutorScheduler();
        if (pool==null)
            pool=_server.getBean(ByteBufferPool.class);
        _byteBufferPool = pool!=null?pool:new ArrayByteBufferPool();

        addBean(_server,false);
        addBean(_executor);
        if (executor==null)
            unmanage(_executor); // inherited from server
        addBean(_scheduler);
        addBean(_byteBufferPool);
        // 緩存ConnectionFactory,若是沒有設置,則爲HttpConnectionFactory
        for (ConnectionFactory factory:factories) 
            addConnectionFactory(factory);

        int cores = Runtime.getRuntime().availableProcessors();
        if (acceptors < 0)
            acceptors=Math.max(1, Math.min(4,cores/8));
        if (acceptors > cores)
            LOG.warn("Acceptors should be <= availableProcessors: " + this);
       // 設置acceptor進程數組
        _acceptors = new Thread[acceptors];
    }

調用addConnectionFactory方法能夠緩存全部的ConnectionFactory:

 public void addConnectionFactory(ConnectionFactory factory)
    {
        if (isRunning())
            throw new IllegalStateException(getState());

        // 須要移除的ConnectionFactory
        Set<ConnectionFactory> to_remove = new HashSet<>();
        for (String key:factory.getProtocols())
        {
            key=StringUtil.asciiToLowerCase(key);
            ConnectionFactory old=_factories.remove(key); // 先移除協議名稱對應的ConnectionFactory對象
            if (old!=null)
            {
                if (old.getProtocol().equals(_defaultProtocol))
                    _defaultProtocol=null;
                to_remove.add(old); // 保存待移除
            }
            _factories.put(key, factory); // 增長新的ConnectionFactory
        }

         // keep factories still referenced
         // 避免一種場景:若是_factories裏面已經緩存了HttpConnectionFactory,對應的協議名稱爲http/1.1
         // 而後增長的factory也是HttpConnectionFactory對應的協議名稱爲http/1.1通過上面的操做,to_remove裏面有HttpConnectionFactory這樣下面removeBean的時候會誤刪
        for (ConnectionFactory f : _factories.values())
            to_remove.remove(f);

        // remove old factories
        for (ConnectionFactory old: to_remove)
        {
            removeBean(old);
            if (LOG.isDebugEnabled())
                LOG.debug("{} removed {}", this, old);
        }

        // add new Bean
        addBean(factory);
        if (_defaultProtocol==null)
            _defaultProtocol=factory.getProtocol();
        if (LOG.isDebugEnabled())
            LOG.debug("{} added {}", this, factory);
    }

   

2.3.3 doStart

在Jetty中,服務器對象Server在啓動的時候會啓動該服務器管理的全部Connector,從全部Connector的繼承關係中能夠看出AbstractConnector對象是全部具體Connector對象的父類。 

        // start connectors last
        for (Connector connector : _connectors)
        {
            try
            {  
                connector.start(); // 啓動Connector對象
            }
            catch(Throwable e)
            {
                mex.add(e);
            }
        }  

下面具體看一下AbstractConnector.doStart的具體實現。

 @Override
    protected void doStart() throws Exception
    {   // 一個Connector至少有一個ConnectionFactory對象
        if(_defaultProtocol==null)
            throw new IllegalStateException("No default protocol for "+this);
        _defaultConnectionFactory = getConnectionFactory(_defaultProtocol);
        if(_defaultConnectionFactory==null)
            throw new IllegalStateException("No protocol factory for default protocol '"+_defaultProtocol+"' in "+this);
        // 若是是SslConnectionFactory,則必需要有下一個協議ConnectionFactory
        SslConnectionFactory ssl = getConnectionFactory(SslConnectionFactory.class);
        if (ssl != null)
        {
            String next = ssl.getNextProtocol();
            ConnectionFactory cf = getConnectionFactory(next);
            if (cf == null)
                throw new IllegalStateException("No protocol factory for SSL next protocol: '" + next + "' in " + this);
        }

        _lease = ThreadPoolBudget.leaseFrom(getExecutor(),this,_acceptors.length);
        super.doStart();
        // 設置全部acceptor線程中止的同步器
        _stopping=new CountDownLatch(_acceptors.length);
        for (int i = 0; i < _acceptors.length; i++)
        {
            Acceptor a = new Acceptor(i);
            addBean(a);
            getExecutor().execute(a); // 啓動接受器
        }

        LOG.info("Started {}", this);
    }  

 

2.3.4 Acceptor

Acceptor接收器負責接受鏈接且自己是一個線程。

@Override
        public void run()
        {  // (1)設置線程名稱與優先級
            final Thread thread = Thread.currentThread();
            String name=thread.getName();
            _name=String.format("%s-acceptor-%d@%x-%s",name,_id,hashCode(),AbstractConnector.this.toString());
            thread.setName(_name);

            int priority=thread.getPriority();
            if (_acceptorPriorityDelta!=0)
                thread.setPriority(Math.max(Thread.MIN_PRIORITY,Math.min(Thread.MAX_PRIORITY,priority+_acceptorPriorityDelta)));

            _acceptors[_id] = thread;

            try
            {
                while (isRunning()) // (2)循環接受請求
                {
                    try (Locker.Lock lock = _locker.lock())
                    {
                        if (!_accepting && isRunning())
                        {
                            _setAccepting.await(); // 若是_accepting==false則阻塞,等待通知
                            continue;
                        }
                    }
                    catch (InterruptedException e) 
                    {
                        continue;
                    }
                    
                    try
                    {
                        accept(_id); // (3)接受請求,這個是核心方法
                    }
                    catch (Throwable x)
                    {
                        if (!handleAcceptFailure(x))
                            break;
                    }
                }
            }
            finally
            {
                thread.setName(name);
                if (_acceptorPriorityDelta!=0)
                    thread.setPriority(priority);

                synchronized (AbstractConnector.this) // Why?
                {
                    _acceptors[_id] = null;
                }
                CountDownLatch stopping=_stopping; 
                if (stopping!=null)
                    stopping.countDown(); // 若是線程異常,則設置線程同步器減一
            }
        }  

在Acceptor的run方法裏面有個accept(int acceptorID)是AbstractConnector新增的抽象方法,負責處理鏈接請求。

到此AbstractConnector類的主要方法基本已經分析完畢,下面緊接着分析accept方法的實現,重點關注ServerConnector類。

3. ServerConnector源碼解讀

ServerConnector主要用於TCP/IP鏈接,使用不一樣的ConnectionFactory實例,它能夠直接或經過SSL接受HTTP、HTTP/2和WebSocket的鏈接。

ServerConnector是一個基於NIO的徹底異步實現。鏈接器必需的服務(Executor,Scheduler等)默認使用傳入的Server實例;也能夠經過構造函數注入;

各類重載的構造函數用於ConnectionFactory的配置。若是沒有設置ConnectionFactory,構造函數將默認使用HttpConnectionFactory。SslContextFactory能夠實例化SslConnectionFactory。

鏈接器會使用Executor執行許多Selector任務,這些任務使用NIO的Selector異步調度accepted鏈接。selector線程將調用經過EndPoint.fillInterested(Callback)或EndPoint.write(Callback,ByteBuffer)傳入的Callback的方法。

這些回調能夠執行一些非阻塞的IO工做,但老是會向Executor服務發送任何阻塞、長時間運行或應用程序任務。Selector默認的數量是JVM可用核數的一半。

3.1 字段描述

3.2 承接上面2.3.4小結介紹ServerConnector.accept(int accpetorID)

     // 參數acceptorID其實沒有使用 
    @Override
    public void accept(int acceptorID) throws IOException
    {   // 獲取與該鏈接器對應的ServerSocketChannel,該類是java.nio
        ServerSocketChannel serverChannel = _acceptChannel;
        if (serverChannel != null && serverChannel.isOpen())
        {
            SocketChannel channel = serverChannel.accept(); // java api
            accepted(channel); // 接受具體的SocketChannel
        }
    }  

在調用accepted方法以後具體的處理將交給SelectorManager類,ServerConnector使用的是ServerConnectorManager類。

4. SelectorManager類

4.1 類圖

(1)SelectorManager經過管理ManagedSelector對象簡化JVM原始提供的java.nio非阻塞操做;SelectorManager子類實現方法返回協議指定的EndPoint和Connection對象。

(2) ManagedSelector包裝Selector簡化在channel上面的非阻塞操做;ManagedSelector運行在select循環中而且阻塞在Selector.select()方法直到註冊channel事件發生,當有事件發生,它負責通知與當前channel相關的EndPoint。

4.2 accept方法

    public void accept(SelectableChannel channel)
    {
        accept(channel, null);
    }

     // 註冊Channel執行非阻塞讀寫操做
    public void accept(SelectableChannel channel, Object attachment)
    {
        final ManagedSelector selector = chooseSelector(channel); // 選擇ManagedSelector對象
        // Accept是一個Runnable子類,submit提交線程池運行
        selector.submit(selector.new Accept(channel, attachment));
    }
     
    private ManagedSelector chooseSelector(SelectableChannel channel)
    {  
        return _selectors[_selectorIndex.updateAndGet(_selectorIndexUpdate)];
    }
    

4.3 ManagedSelectopr.Accept類

// 線程可執行類
class Accept extends Invocable.NonBlocking implements Closeable
    {
        private final SelectableChannel channel;
        private final Object attachment;

        Accept(SelectableChannel channel, Object attachment)
        {
            this.channel = channel;
            this.attachment = attachment;
        }

        @Override
        public void close()
        {
            LOG.debug("closed accept of {}", channel);
            closeNoExceptions(channel);
        }

        @Override
        public void run()
        {
            try
            {
                final SelectionKey key = channel.register(_selector, 0, attachment);
                // 建立EndPoint對象,而後提交線程池執行
                submit(new CreateEndPoint(channel, key));
            }
            catch (Throwable x)
            {
                closeNoExceptions(channel);
                LOG.debug(x);
            }
        }
    }

submit方法就是把線程任務提交到Queue<Runnable> _actions = new ArrayDeque<>();隊列中。

而後每一個ManagedSelector在doStart裏面啓動線程池不斷從_actions隊列中獲取任務執行,主要涉及的類有EatWhatYouKill,ManagedSelector.SelectorProducer等。

Jetty是基於Handler處理各類請求,下面重點分析如何調用最終的Handler處理器。

5. EndPoint和Connection

5.1 類圖

                  

 

緊接着4.3中的CreateEndPoint類,調用createEndPoint方法,

 

private void createEndPoint(SelectableChannel channel, SelectionKey selectionKey) throws IOException
    {
        // endPoint的實際類型是SocketChannelEndPoint對象
        EndPoint endPoint = _selectorManager.newEndPoint(channel, this, selectionKey);
         // 若是未設置ConnectionFactory,則默認是HttpConnectionFactory,鏈接對象類型是HttpConnection,即connection類型是HttpConnection
        Connection connection = _selectorManager.newConnection(channel, endPoint, selectionKey.attachment());
        endPoint.setConnection(connection);
        selectionKey.attach(endPoint);
        // 當EndPoint打開,回調該方法
        endPoint.onOpen();
        // 當EndPoint打開,回調該方法,同時將endPoint保存在AbstractConnector的Set<EndPoint> _endpoints = Collections.newSetFromMap(new ConcurrentHashMap<>());裏面
        _selectorManager.endPointOpened(endPoint);
        // 內部調用HttpConnection.onOpen()
        _selectorManager.connectionOpened(connection);
        if (LOG.isDebugEnabled())
            LOG.debug("Created {}", endPoint);
    }  

下面看看HttpConnection.onOpen()方法實現:

     // HttpConnection
    @Override
    public void onOpen()
    {
        super.onOpen(); // 通知Listeners
        fillInterested();
    }

    // AbstractConnection
    public void fillInterested()
    {
        if (LOG.isDebugEnabled())
            LOG.debug("fillInterested {}",this);
         // SocketChannelEndPoint類型
        getEndPoint().fillInterested(_readCallback);
    }

    // AbstractEndPoint
    @Override
    public void fillInterested(Callback callback)
    {
        notIdle();
        _fillInterest.register(callback); // 最終調用SocketChannelEndPoint父類ChannelEndPoint.needsFillInterest
    }  

最後補充一個業務調用棧:

 

相關文章
相關標籤/搜索