Tomcat源碼解析系列(十四)Poller 與 PollerEvent

前言
這篇文章裏講到了 NioEndpint 的啓動過程當中建立了多個 Poller 對象,並啓動了 Poller 線程。在上篇文章中介紹了 Acceptor 的 run 方法,其中講到了 Acceptor 的工做就是接受客戶端的鏈接並轉交給 Poller 線程處理,本文將分析 Poller 和 PollerEvent。Poller 和 PollerEvent 都是 NioEndpoint 的內部類。編程


1. PollerEvent#run
Acceptor 線程將接受的鏈接封裝成 PollerEvent 對象,並加入到一個隊列裏等待 Poller 線程的執行。PollerEvent 實現了 Runnable 接口,所以 run 方法是其關鍵方法。segmentfault

private NioChannel socket;
private NioSocketWrapper socketWrapper;

@Override
public void run() {
    if (interestOps == OP_REGISTER) {
        try {
            socket.getIOChannel().register(
                    socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
        } catch (Exception x) {
            log.error(sm.getString("endpoint.nio.registerFail"), x);
        }
    } else {
        final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
        try {
            if (key == null) {
                // The key was cancelled (e.g. due to socket closure)
                // and removed from the selector while it was being
                // processed. Count down the connections at this point
                // since it won't have been counted down when the socket
                // closed.
                socket.socketWrapper.getEndpoint().countDownConnection();
                ((NioSocketWrapper) socket.socketWrapper).closed = true;
            } else {
                final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
                if (socketWrapper != null) {
                    //we are registering the key to start with, reset the fairness counter.
                    int ops = key.interestOps() | interestOps;
                    socketWrapper.interestOps(ops);
                    key.interestOps(ops);
                } else {
                    socket.getPoller().cancelledKey(key);
                }
            }
        } catch (CancelledKeyException ckx) {
            try {
                socket.getPoller().cancelledKey(key);
            } catch (Exception ignore) {}
        }
    }
}

interestOps 是在構造方法裏傳入的。PollerEvent 的構造方法在兩處用到,一處是 Poller#register 方法裏,也就是上篇文章裏提到的,另外一處是在 Poller#add 方法裏,這個 add 方法的調用點有多處,傳入的 interestOps 的值是 SelectionKey.OP_WRITE 或者 SelectionKey.OP_READ。
if 語句塊裏,socket 是在構造方法裏傳入的 NioChannel 對象,緩存

protected SocketChannel sc = null;
public SocketChannel getIOChannel() {
    return sc;
}

NioChannel#getIOChannel 返回的是 SocketChannel 對象,這個對象是在建立 NioChannel 對象是傳入的,是 Acceptor 線程裏調用 endpoint.serverSocketAccept() 獲取到的對象。
socket.getPoller().getSelector() 是獲取 Poller 的 Selector 類型的對象。app

private Selector selector;

public Poller() throws IOException {
    this.selector = Selector.open();
}

public Selector getSelector() { return selector;}

能夠看出,這個 selector 是在 Poller 構造方法裏初始化的,一個 Poller 裏有一個 Selector 對象。
if 語句塊裏,是將 SocketChannel 對象註冊到 Poller 內部的 Selector 對象,並附加了一個 NioSocketWrapper 對象。註冊的感興趣的事件是 SelectionKey.OP_READ,也就是說,這個 Selector 對象會監聽這個 SocketChannel 的讀事件。socket

else 語句塊的邏輯也不復雜,就是將傳入的 interestOps 操做(SelectionKey.OP_WRITE 或者 SelectionKey.OP_READ)附加到 SocketChannel 關聯的 SelectionKey 裏,或者取消掉關聯的 SelectionKey。ide

2. Poller#run
Poller 實現了 Runnable,它的 run 方法是關鍵。oop

/**
 * The background thread that adds sockets to the Poller, checks the
 * poller for triggered events and hands the associated socket off to an
 * appropriate processor as events occur.
 */
@Override
public void run() {
    // Loop until destroy() is called
    while (true) {

        boolean hasEvents = false;

        try {
            if (!close) {
                hasEvents = events();
                if (wakeupCounter.getAndSet(-1) > 0) {
                    //if we are here, means we have other stuff to do
                    //do a non blocking select
                    keyCount = selector.selectNow();
                } else {
                    keyCount = selector.select(selectorTimeout);
                }
                wakeupCounter.set(0);
            }
            if (close) {
                events();
                timeout(0, false);
                try {
                    selector.close();
                } catch (IOException ioe) {
                    log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
                }
                break;
            }
        } catch (Throwable x) {
            ExceptionUtils.handleThrowable(x);
            log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
            continue;
        }
        //either we timed out or we woke up, process events first
        if ( keyCount == 0 ) hasEvents = (hasEvents | events());

        Iterator<SelectionKey> iterator =
            keyCount > 0 ? selector.selectedKeys().iterator() : null;
        // Walk through the collection of ready keys and dispatch
        // any active event.
        while (iterator != null && iterator.hasNext()) {
            SelectionKey sk = iterator.next();
            NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
            // Attachment may be null if another thread has called
            // cancelledKey()
            if (attachment == null) {
                iterator.remove();
            } else {
                iterator.remove();
                processKey(sk, attachment);
            }
        }//while

        //process timeouts
        timeout(keyCount,hasEvents);
    }//while

    getStopLatch().countDown();
}

run 方法裏先執行 if (!close) 語句塊。先調用了 events 方法,ui

/**
 * Processes events in the event queue of the Poller.
 *
 * @return <code>true</code> if some events were processed,
 *   <code>false</code> if queue was empty
 */
public boolean events() {
    boolean result = false;

    PollerEvent pe = null;
    for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) {
        result = true;
        try {
            pe.run();
            pe.reset();
            if (running && !paused) {
                eventCache.push(pe);
            }
        } catch ( Throwable x ) {
            log.error(sm.getString("endpoint.nio.pollerEventError"), x);
        }
    }

    return result;
}

events() 方法就是執行了 events 這個隊列裏的 PollerEvent 的 run 方法,而後把 PollerEvent 對象放在 eventCache 裏以方便複用。PollerEvent#run方法在上面講過了。
以後根據 wakeupCounter 的值判斷是用 selector.selectNow() 仍是 selector.select(selectorTimeout)。wakeupCounter 值在 Poller#addEvent 裏自增1的。
而後就進入 if (close) 語句塊,也是調用 events() 方法,而後調用 timeout(0, false) 和 selector.close() 方法。this

後面就是調用 Selector.selectedKeys() 獲取監聽到的 SelectionKey 集合並逐個調用 processKey(sk, attachment)處理,這是 nio 編程裏的常規操做。
SelectionKey 的 attachment 是 NioSocketWrapper 對象,這個對象是在構造 PollerEvent 傳入的,在 Poller#register 方法裏。spa

2.1. Poller#processKey
processKey 方法就是處理 SelectionKey 的關鍵了。

protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
    try {
        if ( close ) {
            cancelledKey(sk);
        } else if ( sk.isValid() && attachment != null ) {
            if (sk.isReadable() || sk.isWritable() ) {
                if ( attachment.getSendfileData() != null ) {
                    processSendfile(sk,attachment, false);
                } else {
                    unreg(sk, attachment, sk.readyOps());
                    boolean closeSocket = false;
                    // Read goes before write
                    if (sk.isReadable()) {
                        if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
                            closeSocket = true;
                        }
                    }
                    if (!closeSocket && sk.isWritable()) {
                        if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
                            closeSocket = true;
                        }
                    }
                    if (closeSocket) {
                        cancelledKey(sk);
                    }
                }
            }
        } else {
            //invalid key
            cancelledKey(sk);
        }
    } catch ( CancelledKeyException ckx ) {
        cancelledKey(sk);
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        log.error(sm.getString("endpoint.nio.keyProcessingError"), t);
    }
}

能夠看出,attachment.getSendfileData() 不爲 null 的話就調用 processSendfile 方法處理。不然調用 processKey 方法處理。

processSendfile 就是調用 FileChannel#transferTo 方法來發送數據的。這個方法不是重點,這裏就不詳細解析了。

processKey 方法是調用 processSocket(SocketWrapperBase<S> socketWrapper, SocketEvent event, boolean dispatch) 方法分別處理 OP_READ 和 OP_WRITE 事件,傳入的第二個參數分別是 SocketEvent.OPEN_READ 和 SocketEvent.OPEN_WRITE,第三個參數是 true。dispatch 的 true 表示是用另外的線程處理,false 是在 Poller 線程處理。
這個 processSocket 是 AbstractEndpoint 裏的方法。

2.2. AbstractEndpoint#processSocket

/**
 * External Executor based thread pool.
 */
private Executor executor = null;
public Executor getExecutor() { return executor; }

/**
 * Process the given SocketWrapper with the given status. Used to trigger
 * processing as if the Poller (for those endpoints that have one)
 * selected the socket.
 *
 * @param socketWrapper The socket wrapper to process
 * @param event         The socket event to be processed
 * @param dispatch      Should the processing be performed on a new
 *                          container thread
 *
 * @return if processing was triggered successfully
 */
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
        SocketEvent event, boolean dispatch) {
    try {
        if (socketWrapper == null) {
            return false;
        }
        SocketProcessorBase<S> sc = processorCache.pop();
        if (sc == null) {
            sc = createSocketProcessor(socketWrapper, event);
        } else {
            sc.reset(socketWrapper, event);
        }
        Executor executor = getExecutor();
        if (dispatch && executor != null) {
            executor.execute(sc);
        } else {
            sc.run();
        }
    } catch (RejectedExecutionException ree) {
        getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
        return false;
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        // This means we got an OOM or similar creating a thread, or that
        // the pool and its queue are full
        getLog().error(sm.getString("endpoint.process.fail"), t);
        return false;
    }
    return true;
}

processSocket 方法先從 processorCache 的緩存池裏獲取一個 SocketProcessorBase 對象,processorCache 是在 NioEndpoint#startInternal 裏初始化的。若是獲取不到就調用 createSocketProcessor 方法建立一個。
建立SocketProcessorBase 對象時傳入了 SocketWrapperBase(也就是 NioSocketWrapper 對象) 和 SocketEvent 對象。
createSocketProcessor 方法是一個abstract 的,其實如今 NioEndpoint 裏。

@Override
protected SocketProcessorBase<NioChannel> createSocketProcessor(
        SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
    return new SocketProcessor(socketWrapper, event);
}
/**
 * This class is the equivalent of the Worker, but will simply use in an
 * external Executor thread pool.
 */
protected class SocketProcessor extends SocketProcessorBase<NioChannel>

NioEndpoint#createSocketProcessor 方法就是簡單建立一個 SocketProcessor 對象。SocketProcessor 是 NioEndpoint 的內部類。

拿到 SocketProcessorBase 對象後,因爲傳入的 dispatch 爲 true,因此會把這個 SocketProcessorBase 扔到 executor 裏處理。SocketProcessorBase 實現了 Runnable。
executor 是在 AbstractEndpoint#createExecutor 方法裏初始化的,createExecutor 在這篇文章裏介紹過了,這裏就不贅述了。

SocketProcessorBase 的內容以下。

public abstract class SocketProcessorBase<S> implements Runnable {

    protected SocketWrapperBase<S> socketWrapper;
    protected SocketEvent event;

    public SocketProcessorBase(SocketWrapperBase<S> socketWrapper, SocketEvent event) {
        reset(socketWrapper, event);
    }


    public void reset(SocketWrapperBase<S> socketWrapper, SocketEvent event) {
        Objects.requireNonNull(event);
        this.socketWrapper = socketWrapper;
        this.event = event;
    }


    @Override
    public final void run() {
        synchronized (socketWrapper) {
            // It is possible that processing may be triggered for read and
            // write at the same time. The sync above makes sure that processing
            // does not occur in parallel. The test below ensures that if the
            // first event to be processed results in the socket being closed,
            // the subsequent events are not processed.
            if (socketWrapper.isClosed()) {
                return;
            }
            doRun();
        }
    }


    protected abstract void doRun();
}

SocketProcessorBase#run 方法很簡單,就是調用抽象方法 doRun()。因此關鍵在於 SocketProcessor#doRun 方法。

2.3. SocketProcessor#doRun

@Override
protected void doRun() {
    NioChannel socket = socketWrapper.getSocket();
    SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());

    try {
        int handshake = -1;

        try {
            if (key != null) {
                if (socket.isHandshakeComplete()) {
                    // No TLS handshaking required. Let the handler
                    // process this socket / event combination.
                    handshake = 0;
                } else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
                        event == SocketEvent.ERROR) {
                    // Unable to complete the TLS handshake. Treat it as
                    // if the handshake failed.
                    handshake = -1;
                } else {
                    handshake = socket.handshake(key.isReadable(), key.isWritable());
                    // The handshake process reads/writes from/to the
                    // socket. status may therefore be OPEN_WRITE once
                    // the handshake completes. However, the handshake
                    // happens when the socket is opened so the status
                    // must always be OPEN_READ after it completes. It
                    // is OK to always set this as it is only used if
                    // the handshake completes.
                    event = SocketEvent.OPEN_READ;
                }
            }
        } catch (IOException x) {
            handshake = -1;
            if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x);
        } catch (CancelledKeyException ckx) {
            handshake = -1;
        }
        if (handshake == 0) {
            SocketState state = SocketState.OPEN;
            // Process the request from this socket
            if (event == null) {
                state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
            } else {
                state = getHandler().process(socketWrapper, event);
            }
            if (state == SocketState.CLOSED) {
                close(socket, key);
            }
        } else if (handshake == -1 ) {
            close(socket, key);
        } else if (handshake == SelectionKey.OP_READ){
            socketWrapper.registerReadInterest();
        } else if (handshake == SelectionKey.OP_WRITE){
            socketWrapper.registerWriteInterest();
        }
    } catch (CancelledKeyException cx) {
        socket.getPoller().cancelledKey(key);
    } catch (VirtualMachineError vme) {
        ExceptionUtils.handleThrowable(vme);
    } catch (Throwable t) {
        log.error(sm.getString("endpoint.processing.fail"), t);
        socket.getPoller().cancelledKey(key);
    } finally {
        socketWrapper = null;
        event = null;
        //return to cache
        if (running && !paused) {
            processorCache.push(this);
        }
    }
}

doRun 方法裏在開始的 if-else 語句塊裏決定 handshake 變量的值。
先調用 socket.isHandshakeComplete() 也就是 NioChannel#isHandshakeComplete

public boolean isHandshakeComplete() {
    return true;
}

直接返回 true。理論上 else 的語句都不會執行了。其實 handshake 是 HTTPS 裏的內容,NioChannel 不處理 handshake,可是在 NioChannel 的子類 SecureNioChannel 裏會處理。SecureNioChannel 不是本文重點,這裏就很少作介紹了。
因此在第一個 if-else 語句塊了,handshake 的值就已經爲 0 了。

接着是第二個 if-else 語句塊,根據 handshake 的值作不一樣的處理,若是 handshake 的值是 SelectionKey.OP_READ 或者 SelectionKey.OP_WRITE 的話,就調用 socketWrapper.registerReadInterest() 或者 socketWrapper.registerWriteInterest() 從新註冊感興趣事件。

@Override
public void registerReadInterest() {
    getPoller().add(getSocket(), SelectionKey.OP_READ);
}

@Override
public void registerWriteInterest() {
    getPoller().add(getSocket(), SelectionKey.OP_WRITE);
}

這兩個方法其實也就是調用 Poller#add 方法,

/**
 * Add specified socket and associated pool to the poller. The socket will
 * be added to a temporary array, and polled first after a maximum amount
 * of time equal to pollTime (in most cases, latency will be much lower,
 * however).
 *
 * @param socket to add to the poller
 * @param interestOps Operations for which to register this socket with
 *                    the Poller
 */
public void add(final NioChannel socket, final int interestOps) {
    PollerEvent r = eventCache.pop();
    if ( r==null) r = new PollerEvent(socket,null,interestOps);
    else r.reset(socket,null,interestOps);
    addEvent(r);
    if (close) {
        NioEndpoint.NioSocketWrapper ka = (NioEndpoint.NioSocketWrapper)socket.getAttachment();
        processSocket(ka, SocketEvent.STOP, false);
    }
}

Poller#add 就是建立一個 PollerEvent 對象,並將這個對象加入的緩存隊列裏等待 Poller 線程的處理,PollerEvent#run 前面已經講過了。
在 SecureNioChannel 裏,handshake 可能會爲根據 SecureNioChannel#handshake 的處理返回 SelectionKey.OP_READ 或者 SelectionKey.OP_WRITE。可是在 NioChannel 裏 handshake 只會爲 0。

第二個 if-else 語句塊的 if 塊裏就是調用 getHandler().process(socketWrapper, event) 裏處理。
而後獲得一個 SocketState 對象 state,若是 state 的值爲SocketState.CLOSED,則執行 close(socket, key) 方法。

getHandler() 是 AbstractEndpoint 裏的方法

private Handler<S> handler = null;
public Handler<S> getHandler() { return handler; }

Handler 帶一個泛型 S,這個泛型就是 AbstractEndpoint<S,U> 裏的 S。Handler 也是 AbstractEndpoint 的內部接口。
在 NioEndpoint 及其父類 AbstractJsseEndpoint 的聲明裏能夠知道這個泛型 S 的具體類型就是 NioChannel。

這個 Handler 就是在 AbstractHttp11Protocol 的構造方法裏 初始化的 ConnectionHandler 對象。這個在這篇文章裏就講到了,這裏不在贅述了。
ConnectionHandler 會在下篇文章裏介紹,這裏就先很少講了。


小結本文分析了 PollerEvent 和 Poller 的 run 方法,其中 PollerEvent#run 方法就是將 SocketChannel 的讀或者寫事件註冊的 Poller 的 selector 裏。Poller#run 方法就是先處理緩存隊列裏的 PollerEvent,而後處理 selector.selectKeys() 返回的 SelectionKey,也就是 SocketChannel 的讀寫事件。

相關文章
相關標籤/搜索