Netty的線程模型主要是基於React,由於考慮到應用場景的不一樣因此演化出多種版本。html
即接收服務請求以及執行IO操做都由一個線程來完成,因爲採用的是IO多路複用這類無阻塞IO操做,因此在請求量不大的狀況下單線程模式也是能夠解決一部分場景問題的。java
當請求量增大後,原有的一個線程處理全部IO操做變得愈來愈沒法支撐相應的性能指標,因此提到了一個工做線程池的概念,此時接收服務請求仍是一個線程,接收請求的線程收到請求後會委託給後面的工做線程池,從線程池中取得一個線程去執行用戶請求。git
當請求量進一步增大後,單一的接收服務請求的線程沒法處理全部客戶端的鏈接,因此將接收服務請求的也擴展成線程池,由多個線程同時負責接收客戶端的鏈接。github
上面提到的都是Netty自身的線程模型,伴隨着請求量的增加而不斷髮展出來的優化策略。而RPC請求對應用系統來說最主要仍是業務邏輯的處理,而這類業務有多是計算密集型的也有能夠是IO密集型,像大多數應用都伴隨着數據庫操做,redis或者是鏈接其它的網絡服務等。若是業務請求中有這類耗時的IO操做,推薦將處理業務請求的任務分配給獨立的線程池,不然可能會阻塞netty自身的線程。web
接收請求線程與工做線程分工redis
目前我實現的RPC是採用多接收多工做線程模式,在服務端是這樣綁定端口的:數據庫
public void bind(ServiceConfig serviceConfig) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(this.rpcServerInitializer)
.childOption(ChannelOption.SO_KEEPALIVE,true)
;
try {
ChannelFuture channelFuture = bootstrap.bind(serviceConfig.getHost(),serviceConfig.getPort()).sync();
//...
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
throw new RpcException(e);
}
}
finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
boosGroup就是一組用來接收服務請求的
workerGroup就是一組具體負責IO操做的bootstrap
增長業務線程只須要將handle的操做進一步委派給線程池便可,這裏爲了擴展因此須要定義接口:canvas
public interface RpcThreadPool {
Executor getExecutor(int threadSize,int queues);
}
參考了dubbo線程池ruby
@Qualifier("fixedRpcThreadPool")
@Component
public class FixedRpcThreadPool implements RpcThreadPool {
private Executor executor;
@Override
public Executor getExecutor(int threadSize,int queues) {
if(null==executor) {
synchronized (this) {
if(null==executor) {
executor= new ThreadPoolExecutor(threadSize, threadSize, 0L, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//...
}
});
}
}
}
return executor;
}
}
小插曲:
記的有一次一朋友忽然問java 線程池中的那個coreSize是什麼意思?我頓時短路了,因平時也不怎麼寫多線程,想到平時用的比較多的數據庫線程池,裏面的參數卻是印象比較深,但就是想不起來有個coreSize。後來才又仔細看了下線程池的一些參數。如今借這個機會又能夠多多再看看,以避免再次短路。
當有多個線程池實現時,經過線程池名稱來動態選擇線程池。
@Component
public class RpcThreadPoolFactory {
@Autowired
private Map<String,RpcThreadPool> rpcThreadPoolMap;
public RpcThreadPool getThreadPool(String threadPoolName){
return this.rpcThreadPoolMap.get(threadPoolName);
}
}
將方法體包裝成Task交給線程池去執行。
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) {
this.executor.execute(new Runnable() {
@Override
public void run() {
RpcInvoker rpcInvoker=RpcServerInvoker.this.buildInvokerChain(RpcServerInvoker.this);
RpcResponse response=(RpcResponse) rpcInvoker.invoke(RpcServerInvoker.this.buildRpcInvocation(rpcRequest));
channelHandlerContext.writeAndFlush(response);
}
});
}
目前缺少壓測,因此暫時沒有明確的數據對比。