Jetty9 源碼初解(2)——IO之Connection

1、概述

查看Jetty-io包,清單以下:java

接口類:
ByteBufferPool
ClientConnectionFactory
Connection
Connection.Listener
Connection.UpgradeFrom
Connection.UpgradeTo
EndPoint
ManagedSelector.SelectableEndPoint
NetworkTrafficListener
實體類:
AbstractConnection
AbstractEndPoint
ArrayByteBufferPool
ArrayByteBufferPool.Bucket
ByteArrayEndPoint
ByteBufferPool.Lease
ChannelEndPoint
ClientConnectionFactory.Helper
Connection.Listener.Adapter
FillInterest
IdleTimeout
LeakTrackingByteBufferPool
ManagedSelector
MappedByteBufferPool
MappedByteBufferPool.Tagged
NegotiatingClientConnection
NegotiatingClientConnectionFactory
NetworkTrafficListener.Adapter
NetworkTrafficSelectChannelEndPoint
SelectChannelEndPoint
SelectorManager
WriteFlusher
WriterOutputStream
異常類:
EofException
RuntimeIOException

從名字看幾個主要的類可能爲:Connection、ByteBufferPool、SelectorManager、EndPoint,由於其餘類應該是從中延伸出來的。app

2、類分析

首先看Connection接口:
ide

public interface Connection extends Closeable
{
    public void addListener(Listener listener);

    public void onOpen();

    /**
     * <p>Callback method invoked when this {@link Connection} is closed.</p>
     * <p>Creators of the connection implementation are responsible for calling this method.</p>
     */
    public void onClose();

    /**
     * @return the {@link EndPoint} associated with this {@link Connection}
     */
    public EndPoint getEndPoint();
    
    /**
     * <p>Performs a logical close of this connection.</p>
     * <p>For simple connections, this may just mean to delegate the close to the associated
     * {@link EndPoint} but, for example, SSL connections should write the SSL close message
     * before closing the associated {@link EndPoint}.</p>
     */
    @Override
    public void close();

    public int getMessagesIn();
    public int getMessagesOut();
    public long getBytesIn();
    public long getBytesOut();
    public long getCreatedTimeStamp();
    
    public interface UpgradeFrom extends Connection
    {
        /* ------------------------------------------------------------ */
        /** Take the input buffer from the connection on upgrade.
         * <p>This method is used to take any unconsumed input from
         * a connection during an upgrade.
         * @return A buffer of unconsumed input. The caller must return the buffer
         * to the bufferpool when consumed and this connection must not.
         */
        ByteBuffer onUpgradeFrom();
    }
    
    public interface UpgradeTo extends Connection
    {
        /**
         * <p>Callback method invoked when this {@link Connection} is upgraded.</p>
         * <p>This must be called before {@link #onOpen()}.</p>
         * @param prefilled An optional buffer that can contain prefilled data. Typically this
         * results from an upgrade of one protocol to the other where the old connection has buffered
         * data destined for the new connection.  The new connection must take ownership of the buffer
         * and is responsible for returning it to the buffer pool
         */
        void onUpgradeTo(ByteBuffer prefilled);
    }
    
    
    /* ------------------------------------------------------------ */
    /** 
     * <p>A Listener for connection events.</p>
     * <p>Listeners can be added to a {@link Connection} to get open and close events.
     * The AbstractConnectionFactory implements a pattern where objects implement
     * this interface that have been added via {@link Container#addBean(Object)} to
     * the Connector or ConnectionFactory are added as listeners to all new connections
     * </p>
     */
    public interface Listener
    {
        public void onOpened(Connection connection);

        public void onClosed(Connection connection);

        public static class Adapter implements Listener
        {
            @Override
            public void onOpened(Connection connection)
            {
            }

            @Override
            public void onClosed(Connection connection)
            {
            }
        }
    }
}

Connection接口主要用來添加監聽,並定義監聽接口Listener。this

再看一個實現了Connection接口的抽象類AbstractConnection:spa

public abstract class AbstractConnection implements Connection
{
    private static final Logger LOG = Log.getLogger(AbstractConnection.class);

    private final List<Listener> listeners = new CopyOnWriteArrayList<>();
    private final long _created=System.currentTimeMillis();
    private final EndPoint _endPoint;
    private final Executor _executor;
    private final Callback _readCallback;
    private int _inputBufferSize=2048;

    protected AbstractConnection(EndPoint endp, Executor executor)
    {
        if (executor == null)
            throw new IllegalArgumentException("Executor must not be null!");
        _endPoint = endp;
        _executor = executor;
        _readCallback = new ReadCallback();
    }

    @Override
    public void addListener(Listener listener)
    {
        listeners.add(listener);
    }

    public int getInputBufferSize()
    {
        return _inputBufferSize;
    }

    public void setInputBufferSize(int inputBufferSize)
    {
        _inputBufferSize = inputBufferSize;
    }

    protected Executor getExecutor()
    {
        return _executor;
    }

    @Deprecated
    public boolean isDispatchIO()
    {
        return false;
    }

    protected void failedCallback(final Callback callback, final Throwable x)
    {
        if (callback.isNonBlocking())
        {
            try
            {
                callback.failed(x);
            }
            catch (Exception e)
            {
                LOG.warn(e);
            }
        }
        else
        {
            try
            {
                getExecutor().execute(new Runnable()
                {
                    @Override
                    public void run()
                    {
                        try
                        {
                            callback.failed(x);
                        }
                        catch (Exception e)
                        {
                            LOG.warn(e);
                        }
                    }
                });
            }
            catch(RejectedExecutionException e)
            {
                LOG.debug(e);
                callback.failed(x);
            }
        }
    }

    /**
     * <p>Utility method to be called to register read interest.</p>
     * <p>After a call to this method, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)}
     * will be called back as appropriate.</p>
     * @see #onFillable()
     */
    public void fillInterested()
    {
        if (LOG.isDebugEnabled())
            LOG.debug("fillInterested {}",this);
        getEndPoint().fillInterested(_readCallback);
    }

    public boolean isFillInterested()
    {
        return getEndPoint().isFillInterested();
    }

    /**
     * <p>Callback method invoked when the endpoint is ready to be read.</p>
     * @see #fillInterested()
     */
    public abstract void onFillable();

    /**
     * <p>Callback method invoked when the endpoint failed to be ready to be read.</p>
     * @param cause the exception that caused the failure
     */
    protected void onFillInterestedFailed(Throwable cause)
    {
        if (LOG.isDebugEnabled())
            LOG.debug("{} onFillInterestedFailed {}", this, cause);
        if (_endPoint.isOpen())
        {
            boolean close = true;
            if (cause instanceof TimeoutException)
                close = onReadTimeout();
            if (close)
            {
                if (_endPoint.isOutputShutdown())
                    _endPoint.close();
                else
                {
                    _endPoint.shutdownOutput();
                    fillInterested();
                }
            }
        }
    }

    /**
     * <p>Callback method invoked when the endpoint failed to be ready to be read after a timeout</p>
     * @return true to signal that the endpoint must be closed, false to keep the endpoint open
     */
    protected boolean onReadTimeout()
    {
        return true;
    }

    @Override
    public void onOpen()
    {
        if (LOG.isDebugEnabled())
            LOG.debug("onOpen {}", this);

        for (Listener listener : listeners)
            listener.onOpened(this);
    }

    @Override
    public void onClose()
    {
        if (LOG.isDebugEnabled())
            LOG.debug("onClose {}",this);

        for (Listener listener : listeners)
            listener.onClosed(this);
    }

    @Override
    public EndPoint getEndPoint()
    {
        return _endPoint;
    }

    @Override
    public void close()
    {
        getEndPoint().close();
    }

    @Override
    public int getMessagesIn()
    {
        return -1;
    }

    @Override
    public int getMessagesOut()
    {
        return -1;
    }

    @Override
    public long getBytesIn()
    {
        return -1;
    }

    @Override
    public long getBytesOut()
    {
        return -1;
    }

    @Override
    public long getCreatedTimeStamp()
    {
        return _created;
    }

    @Override
    public String toString()
    {
        return String.format("%s@%x[%s]",
                getClass().getSimpleName(),
                hashCode(),
                _endPoint);
    }

    private class ReadCallback implements Callback
    {
        @Override
        public void succeeded()
        {
            onFillable();
        }

        @Override
        public void failed(final Throwable x)
        {
            onFillInterestedFailed(x);
        }

        @Override
        public String toString()
        {
            return String.format("AC.ReadCB@%x{%s}", AbstractConnection.this.hashCode(),AbstractConnection.this);
        }
    }
}
相關文章
相關標籤/搜索