Tomcat源碼解析系列(十三)Acceptor

前言
上篇文章介紹了 NioEndpoint,其中講到了在 NioEndpoint#startInternal 方法裏建立並啓動了 Acceptor 和 Poller,線程。本篇文章先看 Acceptor,下篇文章再看 Poller。
1. Acceptor
Acceptor 的構造方法聲明爲:java

private final AbstractEndpoint<?,U> endpoint;

public Acceptor(AbstractEndpoint<?,U> endpoint) {
    this.endpoint = endpoint;
}

其中 endpoint 參數是在 NioEndpoint#startAcceptorThreads 方法裏 new Acceptor 時傳入的 NioEndpoint 對象。算法

Acceptor 實現了 Runnable 方法,所以它的 run 方法是 Acceptor 的關鍵。編程

@Override
public void run() {

    int errorDelay = 0;

    // Loop until we receive a shutdown command
    while (endpoint.isRunning()) {

        // Loop if endpoint is paused
        while (endpoint.isPaused() && endpoint.isRunning()) {
            state = AcceptorState.PAUSED;
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                // Ignore
            }
        }

        if (!endpoint.isRunning()) {
            break;
        }
        state = AcceptorState.RUNNING;

        try {
            //if we have reached max connections, wait
            endpoint.countUpOrAwaitConnection();

            // Endpoint might have been paused while waiting for latch
            // If that is the case, don't accept new connections
            if (endpoint.isPaused()) {
                continue;
            }

            U socket = null;
            try {
                // Accept the next incoming connection from the server
                // socket
                socket = endpoint.serverSocketAccept();
            } catch (Exception ioe) {
                // We didn't get a socket
                endpoint.countDownConnection();
                if (endpoint.isRunning()) {
                    // Introduce delay if necessary
                    errorDelay = handleExceptionWithDelay(errorDelay);
                    // re-throw
                    throw ioe;
                } else {
                    break;
                }
            }
            // Successful accept, reset the error delay
            errorDelay = 0;

            // Configure the socket
            if (endpoint.isRunning() && !endpoint.isPaused()) {
                // setSocketOptions() will hand the socket off to
                // an appropriate processor if successful
                if (!endpoint.setSocketOptions(socket)) {
                    endpoint.closeSocket(socket);
                }
            } else {
                endpoint.destroySocket(socket);
            }
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            String msg = sm.getString("endpoint.accept.fail");
            // APR specific.
            // Could push this down but not sure it is worth the trouble.
            if (t instanceof Error) {
                Error e = (Error) t;
                if (e.getError() == 233) {
                    // Not an error on HP-UX so log as a warning
                    // so it can be filtered out on that platform
                    // See bug 50273
                    log.warn(msg, t);
                } else {
                    log.error(msg, t);
                }
            } else {
                    log.error(msg, t);
            }
        }
    }
    state = AcceptorState.ENDED;
}

run 方法的代碼被包裹在一個 while 循環裏,while 循環的判斷條件是 endpoint.isRunning(),也就是 NioEndpoint 的父類 AbstractEndpoint 裏的 running 字段。segmentfault

/**
 * Running state of the endpoint.
 */
protected volatile boolean running = false;

這個 running 字段在 NioEndpoint#startInternal 方法裏被置爲 true。在 NioEndpoint#stopInternal 方法裏 running 置爲 false,數組

在最外層的 while 循環裏,就是 run 方法的核心了。緩存

// Loop if endpoint is paused
while (endpoint.isPaused() && endpoint.isRunning()) {
    state = AcceptorState.PAUSED;
    try {
        Thread.sleep(50);
    } catch (InterruptedException e) {
        // Ignore
    }
}

首先判斷 endpoint.isPaused() 是否是爲 true,若是是就讓線程 sleep 50毫秒,並把 Acceptor 的狀態設置爲 AcceptorState.PAUSED。
這個 endpoint.isPaused() 跟 isRunning 方法相似,也就是判斷
AbstractEndpoint 裏的一個 paused 屬性,起聲明以下app

/**
 * Will be set to true whenever the endpoint is paused.
 */
protected volatile boolean paused = false;

這個 pause 的是在 AbstractEndpoint#pause 裏置爲 true 的。socket

而後把 Acceptor 的狀態改成 AcceptorState.RUNNING。ide

接着進入 try 語句塊。首先調用 endpoint.countUpOrAwaitConnection()oop

//if we have reached max connections, wait
endpoint.countUpOrAwaitConnection();
protected void countUpOrAwaitConnection() throws InterruptedException {
    if (maxConnections==-1) return;
    LimitLatch latch = connectionLimitLatch;
    if (latch!=null) latch.countUpOrAwait();
}

能夠看出,countUpOrAwaitConnection 這個方法是判斷是否已超過 maxConnections,若是是就調用 latch.countUpOrAwait() 等待。

而後調用 endpoint.serverSocketAccept() 方法,返回一個泛型對象,這個泛型對象的具體類型在 NioEndpoint 對象中就確立了。

public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
public abstract class AbstractJsseEndpoint<S,U> extends AbstractEndpoint<S,U>

從 NioEndpoint 和 AbstractJsseEndpoint 的聲明中能夠看出,泛型 U 的具體類型是 SocketChannel。即 java.nio.channels.SocketChannel。
也就是說 endpoint.serverSocketAccept() 獲取的是一個 SocketChannel 對象。

@Override
protected SocketChannel serverSocketAccept() throws Exception {
    return serverSock.accept();
}

serverSocketAccept 就是簡單調用 serverSock.accept() 方法獲取一個 SocketChannel 對象。在 nio 編程裏,能夠認爲一個 SocketChannel 對象表明一個服務端與客戶端的鏈接。
這個 serverSock 就是在 NioEndpoint#initServerSocket() 裏調用 ServerSocketChannel.open() 初始化的。

拿到這個 SocketChannel 對象以後就配置這個對象

// Configure the socket
if (endpoint.isRunning() && !endpoint.isPaused()) {
    // setSocketOptions() will hand the socket off to
    // an appropriate processor if successful
    if (!endpoint.setSocketOptions(socket)) {
        endpoint.closeSocket(socket);
    }
} else {
    endpoint.destroySocket(socket);
}

上面代碼的邏輯很簡單,就是調用 endpoint.setSocketOptions(socket) 方法,若是不成功就調用 endpoint.closeSocket(socket) 方法。destroySocket(socket) 方法內部也是調用 closeSocket 方法。

protected void destroySocket(U socket) {
    closeSocket(socket);
}
@Override
protected void closeSocket(SocketChannel socket) {
    countDownConnection();
    try {
        socket.socket().close();
    } catch (IOException ioe)  {
        if (log.isDebugEnabled()) {
            log.debug(sm.getString("endpoint.err.close"), ioe);
        }
    }
    try {
        socket.close();
    } catch (IOException ioe) {
        if (log.isDebugEnabled()) {
            log.debug(sm.getString("endpoint.err.close"), ioe);
        }
    }
}

closeSocket 在 NioEndpoint 裏,而 destroySocket 在 AbstractEndpoint 裏。closeSocket 方法邏輯很簡單就是調用 SocketChannel.socket().close() 和 SocketChannel.close() 方法。

關鍵地方在於 endpoint.setSocketOptions(socket) 方法。

1.1 NioEndpoint#setSocketOptions

/**
 * Process the specified connection.
 * @param socket The socket channel
 * @return <code>true</code> if the socket was correctly configured
 *  and processing may continue, <code>false</code> if the socket needs to be
 *  close immediately
 */
@Override
protected boolean setSocketOptions(SocketChannel socket) {
    // Process the connection
    try {
        //disable blocking, APR style, we are gonna be polling it
        socket.configureBlocking(false);
        Socket sock = socket.socket();
        socketProperties.setProperties(sock);

        NioChannel channel = nioChannels.pop();
        if (channel == null) {
            SocketBufferHandler bufhandler = new SocketBufferHandler(
                    socketProperties.getAppReadBufSize(),
                    socketProperties.getAppWriteBufSize(),
                    socketProperties.getDirectBuffer());
            if (isSSLEnabled()) {
                channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
            } else {
                channel = new NioChannel(socket, bufhandler);
            }
        } else {
            channel.setIOChannel(socket);
            channel.reset();
        }
        getPoller0().register(channel);
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        try {
            log.error(sm.getString("endpoint.socketOptionsError"), t);
        } catch (Throwable tt) {
            ExceptionUtils.handleThrowable(tt);
        }
        // Tell to close the socket
        return false;
    }
    return true;
}

setSocketOptions 方法裏,首先用 socketProperties 給這個 SocketChannel 對象的 Socket 設置了一些屬性。
而後,從 nioChannels 這個 SynchronizedStack<NioChannel> 緩存池裏獲取一個 NioChannel 對象,若是獲取不到就建立一個,建立的 NioChannel 對象的時候也建立了一個 SocketBufferHandler 對象。

public SocketBufferHandler(int readBufferSize, int writeBufferSize,
        boolean direct) {
    this.direct = direct;
    if (direct) {
        readBuffer = ByteBuffer.allocateDirect(readBufferSize);
        writeBuffer = ByteBuffer.allocateDirect(writeBufferSize);
    } else {
        readBuffer = ByteBuffer.allocate(readBufferSize);
        writeBuffer = ByteBuffer.allocate(writeBufferSize);
    }
}

SocketBufferHandler 對象裏包含了兩個 ByteBuffer 對象,一個讀一個寫。

protected SocketChannel sc = null;
protected final SocketBufferHandler bufHandler;

public NioChannel(SocketChannel channel, SocketBufferHandler bufHandler) {
    this.sc = channel;
    this.bufHandler = bufHandler;
}

NioChannel 封裝了對 SocketChannel 對象的讀寫操做。

最後 setSocketOptions 裏調用了 getPoller0().register(channel)。

private Poller[] pollers = null;
private AtomicInteger pollerRotater = new AtomicInteger(0);

public Poller getPoller0() {
    int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
    return pollers[idx];
}

getPoller0() 方法就是從 pollers 數組裏選一個 Poller 對象,選取的算法是輪詢選取。
選出 Poller 對象後,調用其 register(channel) 方法。

1.2 NioEndpoint#Poller#register

/**
 * Registers a newly created socket with the poller.
 *
 * @param socket    The newly created socket
 */
public void register(final NioChannel socket) {
    socket.setPoller(this);
    NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
    socket.setSocketWrapper(ka);
    ka.setPoller(this);
    ka.setReadTimeout(getConnectionTimeout());
    ka.setWriteTimeout(getConnectionTimeout());
    ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
    ka.setSecure(isSSLEnabled());
    PollerEvent r = eventCache.pop();
    ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
    if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
    else r.reset(socket,ka,OP_REGISTER);
    addEvent(r);
}

register 方法傳入的參數是 NioChannel 而不是 SocketChannel 了,SocketChannel 已經與 NioChannel 關聯了。
register 第一行就調用 NioChannel#setPoller 方法,把當前 Poller 對象複製給 NioChannel 的屬性,將 NioChannel 對象與 Poller 對象關聯起來。

接着 建立了一個 NioSocketWrapper 對象並設置了相關屬性,其中最重要的是 ka.interestOps(SelectionKey.OP_READ) 這一行設置了 NioSocketWrapper 所感興趣的操做。
而後把 NioChannel 對象與 NioSocketWrapper 對象關聯起來。

public NioSocketWrapper(NioChannel channel, NioEndpoint endpoint) {
    super(channel, endpoint);
    pool = endpoint.getSelectorPool();
    socketBufferHandler = channel.getBufHandler();
}

NioSocketWrapper 的聲明爲

public static class NioSocketWrapper extends SocketWrapperBase<NioChannel> {

SocketWrapperBase 的構造方法爲

public SocketWrapperBase(E socket, AbstractEndpoint<E,?> endpoint) {
    this.socket = socket;
    this.endpoint = endpoint;
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    this.blockingStatusReadLock = lock.readLock();
    this.blockingStatusWriteLock = lock.writeLock();
}
public static class NioSocketWrapper extends SocketWrapperBase<NioChannel>
public abstract class SocketWrapperBase<E>

SocketWrapperBase 聲明裏有一個泛型 E,而 NioSocketWrapper 的聲明裏,泛型 E 的具體類型則是 NioChannel。

register 方法的最後從 eventCache 緩存池裏獲取一個 PollerEvent 對象,若是獲取不到就建立一個 PollerEvent 對象。

private NioChannel socket;
private int interestOps;
private NioSocketWrapper socketWrapper;

public PollerEvent(NioChannel ch, NioSocketWrapper w, int intOps) {
    reset(ch, w, intOps);
}

public void reset(NioChannel ch, NioSocketWrapper w, int intOps) {
    socket = ch;
    interestOps = intOps;
    socketWrapper = w;
}

建立 PollerEvent 對象時傳入的參數分別是前面的 NioChannel 、NioSocketWrapper 對象,以及一個 int 類型的常量 OP_REGISTER,值爲 0x100,分別賦值給 PollerEvent 的屬性,另外 PollerEvent 也實現了 Runnable 接口,這幾個屬性在 PollerEvent#run 方法裏都有對應的做用。

拿到 PollerEvent 對象後,調用 addEvent(r) 方法把這個對象加入的隊列中等待後續 Poller 線程的處理。

private final SynchronizedQueue<PollerEvent> events =
        new SynchronizedQueue<>();
private void addEvent(PollerEvent event) {
    events.offer(event);
    if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
}

events 緩存的 PollerEvent 對象,會在 Poller#run 方法裏被處理。


小結本文分析了 Acceptor 的 run 方法,也就是 Acceptor 線程作的事情。能夠看出 Acceptor 線程在一個循環裏一直接受客戶端鏈接,生成 SocketChannel 對象,並把這個 SocketChannel 對象封裝成 NioChannel 和 NioSocketWrapper 對象,並把這兩個對象放在一個 PollerEvent 對象裏,並把這個 PollerEvent 對象加入的緩存池裏等待 Poller 線程的處理。

相關文章
相關標籤/搜索