Netty添加線程池實現異步處理

  tomcat 異步線程模型大概能夠理解爲:acceptor負責接受新來的鏈接,而後把鏈接初始化後丟給poller來作io,而後又交給處理業務的exec線程池異步處理業務邏輯。java

  因此若是IO線程和handler 在一個線程裏面,若是handler 執行某個邏輯比較耗時,好比查數據庫、服務間通訊等會嚴重影響整個netty的性能。這時候就須要考慮將耗時操做異步處理。數據庫

netty 中加入線程池有兩種方式:bootstrap

第一種是handler 中加入線程池promise

第二種是Context 中加入線程池tomcat

1. handler 加入線程池

核心代碼以下:服務器

1. 服務端相關代碼

EchoServer異步

package cn.xm.netty.example.echo;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;

public final class EchoServer {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            if (sslCtx != null) {
                                p.addLast(sslCtx.newHandler(ch.alloc()));
                            }
                            p.addLast(new EchoServerHandler2());
                            p.addLast(serverHandler);
                        }
                    });

            ChannelFuture f = b.bind(PORT).sync();

            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

EchoServerHandlersocket

package cn.xm.netty.example.echo;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.util.CharsetUtil;

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    private static final DefaultEventLoopGroup eventExecutors = new DefaultEventLoopGroup(16);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("cn.xm.netty.example.echo.EchoServerHandler.channelRead thread: " + Thread.currentThread().getName());
        // 強轉爲netty的ByteBuffer(實際就是包裝的ByteBuffer)
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("客戶端發送的消息是:" + byteBuf.toString(CharsetUtil.UTF_8));
        System.out.println("客戶端地址:" + ctx.channel().remoteAddress());
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端!0!", CharsetUtil.UTF_8));

//        ctx.channel().eventLoop().execute(new Runnable() {
        eventExecutors.execute(new Runnable() {
            @Override
            public void run() {
                // 好比這裏咱們將一個特別耗時的任務轉爲異步執行(也就是任務提交到NioEventLoop的taskQueue中)
                System.out.println("java.lang.Runnable.run thread: " + Thread.currentThread().getName());
                try {
                    Thread.sleep(10 * 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端!1!", CharsetUtil.UTF_8));
            }
        });
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

EchoServerHandler2ide

package cn.xm.netty.example.echo;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultEventLoopGroup;

public class EchoServerHandler2 extends ChannelOutboundHandlerAdapter {

    private static final DefaultEventLoopGroup eventExecutors = new DefaultEventLoopGroup(16);

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        super.write(ctx, msg, promise);
        System.out.println("cn.xm.netty.example.echo.EchoServerHandler2.write called, threadName: " + Thread.currentThread().getName());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

2. client 代碼

EchoClientoop

package cn.xm.netty.example.echo;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;

public final class EchoClient {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final String HOST = System.getProperty("host", "127.0.0.1");
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        final SslContext sslCtx;
        if (SSL) {
            sslCtx = SslContextBuilder.forClient()
                    .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
        } else {
            sslCtx = null;
        }

        // Configure the client.
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            if (sslCtx != null) {
                                p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
                            }
                            p.addLast(new EchoClientHandler());
                        }
                    });

            // Start the client.
            ChannelFuture f = b.connect(HOST, PORT).sync();

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down the event loop to terminate all threads.
            group.shutdownGracefully();
        }
    }
}

EchoClientHandler

package cn.xm.netty.example.echo;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;


public class EchoClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("ClientHandler ctx: " + ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 服務器!", CharsetUtil.UTF_8));
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 強轉爲netty的ByteBuffer(實際就是包裝的ByteBuffer)
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("服務器會送的消息是:" + byteBuf.toString(CharsetUtil.UTF_8));
        System.out.println("服務器地址:" + ctx.channel().remoteAddress());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

3. 測試

先啓動服務端,而後啓動客戶端,而後查看服務端控制檯以下:

cn.xm.netty.example.echo.EchoServerHandler.channelRead thread: nioEventLoopGroup-3-1
客戶端發送的消息是:hello, 服務器!
客戶端地址:/127.0.0.1:54247
cn.xm.netty.example.echo.EchoServerHandler2.write called, threadName: nioEventLoopGroup-3-1
java.lang.Runnable.run thread: defaultEventLoopGroup-4-1
cn.xm.netty.example.echo.EchoServerHandler2.write called, threadName: nioEventLoopGroup-3-1

4. 分析

能夠看到上面的邏輯是:

(1) 當IO線程輪詢到一個socket 事件後,IO線程開始處理,當走到EchoServerHandler 比較耗時的操做以後,將耗時任務交給線程池。

(2) 當耗時任務執行完畢再執行ctx.writeAndFlush 時,會將這個任務再交給IO線程,過程以下(也就是最終的寫操做都會交給IO線程):

1》io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, boolean, io.netty.channel.ChannelPromise)

    private void write(Object msg, boolean flush, ChannelPromise promise) {
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            safeExecute(executor, task, promise, m);
        }
    }

這裏走的是else 代碼塊的代碼,由於 當前線程不屬於IO線程裏面, 因此就走else。 else 代碼塊的邏輯是建立一個寫Task, 而後調用io.netty.channel.AbstractChannelHandlerContext#safeExecute:

    private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
        try {
            executor.execute(runnable);
        } catch (Throwable cause) {
            try {
                promise.setFailure(cause);
            } finally {
                if (msg != null) {
                    ReferenceCountUtil.release(msg);
                }
            }
        }
    }

  能夠看到是調用execotor.execute 方法加入本身的任務隊列裏面。io.netty.util.concurrent.SingleThreadEventExecutor#execute

    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

補充:Handler 中加異步還有一種方式就是建立一個任務,加入到本身的任務隊列,這個實際也佔用的是IO線程

package cn.xm.netty.example.echo;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("cn.xm.netty.example.echo.EchoServerHandler.channelRead thread: " + Thread.currentThread().getName());
        // 強轉爲netty的ByteBuffer(實際就是包裝的ByteBuffer)
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("客戶端發送的消息是:" + byteBuf.toString(CharsetUtil.UTF_8));
        System.out.println("客戶端地址:" + ctx.channel().remoteAddress());
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端!0!", CharsetUtil.UTF_8));

        ctx.channel().eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                // 好比這裏咱們將一個特別耗時的任務轉爲異步執行(也就是任務提交到NioEventLoop的taskQueue中)
                System.out.println("java.lang.Runnable.run thread: " + Thread.currentThread().getName());
                try {
                    Thread.sleep(10 * 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端!1!", CharsetUtil.UTF_8));
            }
        });
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

測試: 能夠看出異步也用的是當前的IO線程

cn.xm.netty.example.echo.EchoServerHandler.channelRead thread: nioEventLoopGroup-3-1
客戶端發送的消息是:hello, 服務器!
客戶端地址:/127.0.0.1:53721
cn.xm.netty.example.echo.EchoServerHandler2.write called, threadName: nioEventLoopGroup-3-1
java.lang.Runnable.run thread: nioEventLoopGroup-3-1
cn.xm.netty.example.echo.EchoServerHandler2.write called, threadName: nioEventLoopGroup-3-1

2. Context 中增長異步線程池

1. 代碼改造

EchoServer 代碼改造

package cn.xm.netty.example.echo;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;

public final class EchoServer {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        DefaultEventLoopGroup group = new DefaultEventLoopGroup(16);
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            if (sslCtx != null) {
                                p.addLast(sslCtx.newHandler(ch.alloc()));
                            }
                            p.addLast(group, new EchoServerHandler2());
                            p.addLast(group, serverHandler);
                        }
                    });

            ChannelFuture f = b.bind(PORT).sync();

            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

  調用p.addLast 的時候指定使用的線程組。 若是不指定,默認使用的是IO線程組。 若是指定了就使用指定的線程組。 這樣就相似於Tomcat8 的線程模型。接收請求-》IO-》處理  分別在不一樣的線程裏面。

EchoServerHandler代碼改造: 正常處理,無需異步開線程

package cn.xm.netty.example.echo;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("cn.xm.netty.example.echo.EchoServerHandler.channelRead thread: " + Thread.currentThread().getName());
        // 強轉爲netty的ByteBuffer(實際就是包裝的ByteBuffer)
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("客戶端發送的消息是:" + byteBuf.toString(CharsetUtil.UTF_8));
        System.out.println("客戶端地址:" + ctx.channel().remoteAddress());
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端!0!", CharsetUtil.UTF_8));

        // 好比這裏咱們將一個特別耗時的任務轉爲異步執行(也就是任務提交到NioEventLoop的taskQueue中)
        System.out.println("java.lang.Runnable.run thread: " + Thread.currentThread().getName());
        try {
            Thread.sleep(10 * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端!1!", CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

2. 測試結果:

cn.xm.netty.example.echo.EchoServerHandler.channelRead thread: defaultEventLoopGroup-4-1
客戶端發送的消息是:hello, 服務器!
客戶端地址:/127.0.0.1:52966
cn.xm.netty.example.echo.EchoServerHandler2.write called, threadName: defaultEventLoopGroup-4-1
java.lang.Runnable.run thread: defaultEventLoopGroup-4-1
cn.xm.netty.example.echo.EchoServerHandler2.write called, threadName: defaultEventLoopGroup-4-1

  能夠看到都是在本身開的線程組裏面完成的任務。

3. 代碼查看

(1)從以前的源碼查閱到, context 封裝了handler、pipeline、executor 等信息。 在p.addLast 的時候咱們指定了本身的線程組,查看源碼

io.netty.channel.DefaultChannelPipeline#addLast(io.netty.util.concurrent.EventExecutorGroup, io.netty.channel.ChannelHandler...)

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        if (handlers == null) {
            throw new NullPointerException("handlers");
        }

        for (ChannelHandler h: handlers) {
            if (h == null) {
                break;
            }
            addLast(executor, null, h);
        }

        return this;
    }

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);

            newCtx = newContext(group, filterName(name, handler), handler);

            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }

io.netty.channel.DefaultChannelPipeline#newContext

    private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }

  能夠看到使用了自定義的線程組。而且記錄到了DefaultChannelHandlerContext 屬性裏。

(2) 不指定線程組,默認使用的是null

io.netty.channel.DefaultChannelPipeline#addLast(io.netty.channel.ChannelHandler...)

    public final ChannelPipeline addLast(ChannelHandler... handlers) {
        return addLast(null, handlers);
    }

(3) io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(io.netty.channel.AbstractChannelHandlerContext, java.lang.Object)

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }

咱們查看next屬性以下: 

1》 io.netty.channel.AbstractChannelHandlerContext#executor 獲取executor 方法以下:

    @Override
    public EventExecutor executor() {
        if (executor == null) {
            return channel().eventLoop();
        } else {
            return executor;
        }
    }

  能夠看到,若是指定了就返回指定的,未指定返回channel 的executor, 也就是IO線程。

2》接下來executor.inEventLoop() 爲false, 因此走else 代碼塊的異步邏輯。

 

總結:

第一種在handler中添加異步,比較靈活,能夠只將耗時的代碼塊加入異步。異步也會延長接口響應時間,由於須要先加入隊列。

第二種方式是netty的標準方式,至關於整個handler 都異步操做。不論耗時不耗時,都加入隊列異步進行處理。這樣理解清洗,可能不夠靈活。

 

【當你用心寫完每一篇博客以後,你會發現它比你用代碼實現功能更有成就感!】
相關文章
相關標籤/搜索