一個http請求在play框架中的前世此生(下)

上一篇提到了play底層的網絡通訊基於netty實現,因而粗略地研究了一下netty,總結以下。(netty版本是3.2.5,不一樣版本的實現可能差別較大)react

1、netty的組件編程

channelBuffer:網絡

傳輸Buffer和抽象後的邏輯Buffer的結合,將NIO底層的多個buffer合併成了一個能夠表明完整消息內容的buffer,能夠理解爲一個messagesocket


channel:tcp

對於Java的old IO和NIO的輸入|輸出通道的一個封裝ide


channelPipeline:oop

Netty的ChannelPipeline包含兩條線路:Upstream和Downstream。Upstream對應上行,接收到的消息、被動的狀態改變,都屬於Upstream。Downstream則對應下行,發送的消息、主動的狀態改變,都屬於Downstream。ChannelPipeline接口包含了兩個重要的方法:sendUpstream(ChannelEvent e)和sendDownstream(ChannelEvent e),就分別對應了Upstream和Downstream。this


handler:.net

 ChannelPipeline裏包含的ChannelHandler也包含兩類:ChannelUpstreamHandler和ChannelDownstreamHandler。每條線路的Handler是互相獨立的。它們都很簡單的只包含一個方法:ChannelUpstreamHandler.handleUpstream和ChannelDownstreamHandler.handleDownstream。線程

handler要實現messageReceive方法,在這裏面作特定協議的實現

 @Override

    public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent messageEvent) throws Exception {

          //do something

}


2、編程模型

1.實現本身的handler

2.實現本身的pipeline factory,將handler註冊到pipeline中

3.在netty啓動類中指定本身的pipeline

3、線程模型(Reactor模式)

netty的線程模型採用了reactor模式,以下圖,boss線程(reactor)使用selector接收請求,而後委派給acceptor,worker線程(圖中的線程池表示)從acceptor領取任務並執行。


----------------------------------------------初始化過程-----------------------------------------------------------------

經過閱讀Netty中的NioServerSocketChannelFactory類的代碼,能夠詳細瞭解reactor模式對應的組件的初始化過程。

NioServerSocketChannelFactory負責構造一個NioServerSocket,包含一個ChannelSink的引用,ChannelSink中管理着worker線程池:

NioServerSocketPipelineSink(Executor workerExecutor, int workerCount) {

        workers = new NioWorker[workerCount];

        for (int i = 0; i < workers.length; i ++) {

            workers[i] = new NioWorker(id , i + 1, workerExecutor);

        }

    }

NioServerSocketPipelineSink實現ChannelSink接口,包含eventSunk方法,負責severSocker綁定到tcp端口時的初始化工做和創建新的客戶端socket鏈接的處理。以下述代碼所示:

public void eventSunk(

            ChannelPipeline pipeline, ChannelEvent e) throws Exception {

        Channel channel = e.getChannel();

        if (channel instanceof NioServerSocketChannel) {//serverSocket綁定端口和建立channel

            handleServerSocket(e);

        } else if (channel instanceof NioSocketChannel) {//client socket的鏈接創建

            handleAcceptedSocket(e);

        }

handlerServerSocket會調用以下的bind方法,初始化扮演mainReactor角色的Boss線程。

private void bind(

            NioServerSocketChannel channel, ChannelFuture future,

            SocketAddress localAddress) {


        boolean bound = false ;

        boolean bossStarted = false;

        try {

            channel. socket.socket().bind(localAddress, channel.getConfig().getBacklog());

            bound = true;


            future.setSuccess();

            fireChannelBound(channel, channel.getLocalAddress());


            Executor bossExecutor =

                ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor ;

            DeadLockProofWorker. start(

                    bossExecutor,

                    new ThreadRenamingRunnable(

                            new Boss(channel),

                            "New I/O server boss #" + id + " (" + channel + ')'));

            bossStarted = true;

        } catch (Throwable t) {

            future.setFailure(t);

            fireExceptionCaught(channel, t);

        } finally {

            if (!bossStarted && bound) {

                close(channel, future);

            }

        }

    }

----------------------------------------------初始化過程-----------------------------------------------------------------

到此爲止Boss線程使用NIO的selector監聽咱們制定的socket端口是否有客戶端鏈接的IO事件發生,它的主循環loop以下:

 public void run() {

            final Thread currentThread = Thread.currentThread();


            channel. shutdownLock.lock();

                for (;;) {

                        if (selector .select(1000) > 0) {

                            selector.selectedKeys().clear();

                        }


                        SocketChannel acceptedSocket = channel. socket.accept();

                        if (acceptedSocket != null) {

                            registerAcceptedChannel(acceptedSocket, currentThread);

                        }

               }

        }

只幹了一件事情,就是講接收到的客戶端sokcet鏈接註冊到Acceptor中。爲了提升boss線程的處理效率?註冊工做會被抽象成一個runnable對象,放入註冊任務隊列中,交給worker線程池來執行。worker的主循環以下:

    thread = Thread.currentThread();


        boolean shutdown = false;

        Selector selector = this.selector ;

        for (;;) {

            wakenUp.set( false);


            try {

                SelectorUtil. select(selector);


                if (wakenUp .get()) {

                    selector.wakeup();

                }


                cancelledKeys = 0;

                processRegisterTaskQueue();

                processWriteTaskQueue();

                processSelectedKeys(selector.selectedKeys());

            } catch (Throwable t) {

                logger.warn(

                        "Unexpected exception in the selector loop." , t);

            }

        }

    }

worker中採用了流水線模式,在每一次循環中,執行如下三個任務:

1.從acceptor中拿到mainReactor中接收的客戶端鏈接,註冊到本身的selector中(每一個worker一個selector)

2.若是寫任務隊裏中有數據須要執行,取出一個寫任務並執行

3.對於已經註冊到selector的channel,處理它的read或write IO事件。

4、play線程與netty線程的協做

play線程和netty線程的對於請求的協做處理過程以下:

請求->netty master->netty worker->本身啓動一個新的線程->netty response->netty worker。

下面是協做過程的詳細分析:

1.netty worker轉交請求處理工做給play

PlayHandler裏的messageRecived:


public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)

            throws Exception{

....

Invoker.invoke(new NettyInvokation(request, response, ctx, nettyRequest, e));

}

經過在messageReceived中向線程池提交任務的方式來完成request的業務邏輯處理部分,messageReceived結束後,worker線程脫離這個request,這個request再也不會佔用worker了。這樣不但能夠保持長鏈接不關閉,並且不會佔用netty的worker線程。

2.play將寫回數據響應交回給netty worker

playHandler用來返回響應數據的copyResponse方法,底層會委託給下面這個方法:

public static ChannelFuture write(Channel channel, Object message, SocketAddress remoteAddress) {

        ChannelFuture future = future(channel);

        channel.getPipeline().sendDownstream(

                new DownstreamMessageEvent(channel, future, message, remoteAddress));

        return future;

    }

可見返回的數據以message爲載體又回到了channel中註冊的pipeline中,netty的每個pipeline最後都有一個channelSink負責收尾工做,對於Play應用,這個sink是上面提到的NioServerSocketPipelineSink

private void handleAcceptedSocket (ChannelEvent e) {

        if (e instanceof ChannelStateEvent) {

            //此處省略

        } else if (e instanceof MessageEvent) {

            MessageEvent event = (MessageEvent) e;

            NioSocketChannel channel = (NioSocketChannel) event.getChannel();

            boolean offered = channel.writeBuffer .offer(event);

            assert offered;

            channel. worker.writeFromUserCode(channel);

        }

    }

worker的writeFromUserCode方法會將寫任務放入worker的寫任務隊列中,而後等待worker的主循環來處理

void writeFromUserCode(final NioSocketChannel channel) {

        if (!channel.isConnected()) {

            cleanUpWriteBuffer(channel);

            return;

        }


        if (scheduleWriteIfNecessary(channel)) {//放入worker的寫任務隊列

            return;

        }


        // From here, we are sure Thread.currentThread() == workerThread.


        if (channel.writeSuspended ) {

            return;

        }


        if (channel.inWriteNowLoop ) {

            return;

        }


        write0(channel);

    }


若是在一次循環中write0方法沒有寫完數據,write0方法還會主動設置channel的OP_WRITE狀態,讓主循環的processSelectedKeys方法來處理,一直到數據寫完,才清除這個OP_WRITE狀態。

相關文章
相關標籤/搜索