簡單RPC框架-業務線程池

Netty 線程模型

Netty的線程模型主要是基於React,由於考慮到應用場景的不一樣因此演化出多種版本。html

單線程模式

即接收服務請求以及執行IO操做都由一個線程來完成,因爲採用的是IO多路複用這類無阻塞IO操做,因此在請求量不大的狀況下單線程模式也是能夠解決一部分場景問題的。java

單接收多工做線程模式

當請求量增大後,原有的一個線程處理全部IO操做變得愈來愈沒法支撐相應的性能指標,因此提到了一個工做線程池的概念,此時接收服務請求仍是一個線程,接收請求的線程收到請求後會委託給後面的工做線程池,從線程池中取得一個線程去執行用戶請求。git

多接收多工做線程模式

當請求量進一步增大後,單一的接收服務請求的線程沒法處理全部客戶端的鏈接,因此將接收服務請求的也擴展成線程池,由多個線程同時負責接收客戶端的鏈接。github

RPC 業務線程

上面提到的都是Netty自身的線程模型,伴隨着請求量的增加而不斷髮展出來的優化策略。而RPC請求對應用系統來說最主要仍是業務邏輯的處理,而這類業務有多是計算密集型的也有能夠是IO密集型,像大多數應用都伴隨着數據庫操做,redis或者是鏈接其它的網絡服務等。若是業務請求中有這類耗時的IO操做,推薦將處理業務請求的任務分配給獨立的線程池,不然可能會阻塞netty自身的線程。web

接收請求線程與工做線程分工redis

  • 接收請求線程主要負責建立鏈路,而後將請求委派給工做線程
  • 工做線程負責編碼解碼讀取IO等操做

方案實現

目前我實現的RPC是採用多接收多工做線程模式,在服務端是這樣綁定端口的:數據庫

public void bind(ServiceConfig serviceConfig) {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(this.rpcServerInitializer)
                    .childOption(ChannelOption.SO_KEEPALIVE,true)
            ;

            try {
                ChannelFuture channelFuture = bootstrap.bind(serviceConfig.getHost(),serviceConfig.getPort()).sync();
                //...
                channelFuture.channel().closeFuture().sync();


            } catch (InterruptedException e) {
                throw new RpcException(e);
            }
        }
        finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

boosGroup就是一組用來接收服務請求的
workerGroup就是一組具體負責IO操做的bootstrap

增長業務線程只須要將handle的操做進一步委派給線程池便可,這裏爲了擴展因此須要定義接口:canvas

定義線程池接口

public interface RpcThreadPool {
    Executor getExecutor(int threadSize,int queues);
}

實現固定大小線程池

參考了dubbo線程池ruby

@Qualifier("fixedRpcThreadPool")
@Component
public class FixedRpcThreadPool implements RpcThreadPool {

    private Executor executor;

    @Override
    public Executor getExecutor(int threadSize,int queues) {
        if(null==executor) {
            synchronized (this) {
                if(null==executor) {
                    executor= new ThreadPoolExecutor(threadSize, threadSize, 0L, TimeUnit.MILLISECONDS,
                            queues == 0 ? new SynchronousQueue<Runnable>() :
                                    (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                            : new LinkedBlockingQueue<Runnable>(queues)),
                            new RejectedExecutionHandler() {
                                @Override
                                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                                   //...
                                }
                            });
                }
            }
        }
        return executor;
    }
}

小插曲:
記的有一次一朋友忽然問java 線程池中的那個coreSize是什麼意思?我頓時短路了,因平時也不怎麼寫多線程,想到平時用的比較多的數據庫線程池,裏面的參數卻是印象比較深,但就是想不起來有個coreSize。後來才又仔細看了下線程池的一些參數。如今借這個機會又能夠多多再看看,以避免再次短路。

線程池工廠

當有多個線程池實現時,經過線程池名稱來動態選擇線程池。

@Component
public class RpcThreadPoolFactory {

    @Autowired
    private Map<String,RpcThreadPool> rpcThreadPoolMap;

    public RpcThreadPool getThreadPool(String threadPoolName){
        return this.rpcThreadPoolMap.get(threadPoolName);
    }
}

修改ChannelHandle的channelRead0方法

將方法體包裝成Task交給線程池去執行。

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) {

    this.executor.execute(new Runnable() {
        @Override
        public void run() {
            RpcInvoker rpcInvoker=RpcServerInvoker.this.buildInvokerChain(RpcServerInvoker.this);
            RpcResponse response=(RpcResponse) rpcInvoker.invoke(RpcServerInvoker.this.buildRpcInvocation(rpcRequest));
            channelHandlerContext.writeAndFlush(response);
        }
    });

}

問題

目前缺少壓測,因此暫時沒有明確的數據對比。

源碼地址

https://github.com/jiangmin168168/jim-framework

相關文章
相關標籤/搜索