ExecutorService executor = Excutors.newFixedThreadPollExecutor(100);//線程池 ServerSocket serverSocket = new ServerSocket(); serverSocket.bind(8088); while(!Thread.currentThread.isInturrupted()){//主線程死循環等待新鏈接到來 Socket socket = serverSocket.accept();//blocking executor.submit(new ConnectIOnHandler(socket));//爲新的鏈接建立新的線程 } class ConnectIOnHandler extends Thread{ private Socket socket; public ConnectIOnHandler(Socket socket){ this.socket = socket; } public void run(){ while(!Thread.currentThread.isInturrupted()&&!socket.isClosed()){死循環處理讀寫事件 String someThing = socket.read()....//讀取數據(blocking) if(someThing!=null){ ......//處理數據 socket.write()....//寫數據 } } } }
不足:java
一、線程的建立和銷燬成本很高 二、線程的切換成本是很高 三、線程數量過多,使系統負載壓力過大。 四、沒有充分利用多核CPU
while (true) { //無事件到底阻塞 selector.select(); Iterator<SelectionKey> keys = this.selector.selectedKeys().iterator(); while (keys.hasNext()) { SelectionKey key = keys.next(); keys.remove(); handler(key); } } /** * 處理不一樣事件的請求 * @param key */ private void handler(SelectionKey key) throws IOException { if (key.isAcceptable()) { handleAccept(key); } else if (key.isReadable()) { handleRead(key); } } //處理鏈接請求 private void handleAccept(SelectionKey key){ ... } //處理讀操做 private void handleRead(SelectionKey key){ ... }
NIO和 BIO的對比bootstrap
private void startServer() throws InterruptedException { //建立boss接收進來的鏈接 EventLoopGroup boss = new NioEventLoopGroup(); //建立worker處理已經接收的鏈接 EventLoopGroup worker = new NioEventLoopGroup(); try { //建立nio輔助啓動類 ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, worker).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { channel.pipeline().addLast(new EchoServerHandler()); } }); //綁定端口準備接收進來的鏈接 ChannelFuture future = bootstrap.bind(port).sync(); //等待服務器socket關閉 future.channel().closeFuture().sync(); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } }
Bootstrapping 有兩種類型,一種是用於客戶端的Bootstrap,一種是用於服務端的ServerBootstrap 服務器
Bootstrap如何引導客戶端:網絡
1.當 bind() 調用時,Bootstrap 將建立一個新的管道, 當 connect() 調用在 Channel 來創建鏈接 2.Bootstrap 將建立一個新的管道, 當 connect() 調用時 3.新的 Channel
ServerBootstrap如何引導服務端:多線程
1.當調用 bind() 後 ServerBootstrap 將建立一個新的管道,這個管道將會在綁定成功後接收子管道 2.接收新鏈接給每一個子管道 3.接收鏈接的 Channel
Netty 中EventLoopGroup是 Reactor 模型的一個實現併發
什麼是Reactor呢?能夠這樣理解,Reactor就是一個執行while (true) { selector.select(); ...}循環的線程,會源源不斷的產生新的事件,稱做反應堆很貼切。 事件又分爲鏈接事件、IO讀和IO寫事件,通常把鏈接事件單獨放一線程裏處理,即主Reactor(MainReactor),IO讀和IO寫事件放到另外的一組線程裏處理,即從Reactor(SubReactor),從Reactor線程數量通常爲2*(CPUs - 1)。 因此在運行時,MainReactor只處理Accept事件,鏈接到來,立刻按照策略轉發給從Reactor之一,只處理鏈接,故開銷很是小;每一個SubReactor管理多個鏈接,負責這些鏈接的讀和寫,屬於IO密集型線程,讀到完整的消息就丟給業務線程池處理業務,處理完比後,響應消息通常放到隊列裏,SubReactor會去處理隊列,而後將消息寫回。
Reactor單線程模型:app
所謂單線程, 即 acceptor 處理和 handler 處理都在一個線程中處理. 這個模型的壞處顯而易見: 當其中某個 handler 阻塞時, 會致使其餘全部的 client 的 handler 都得不到執行, 而且更嚴重的是, handler 的阻塞也會致使整個服務不能接收新的 client 請求(由於 acceptor 也被阻塞了). 由於有這麼多的缺陷, 所以單線程Reactor 模型用的比較少.
Reactor多線程模型:框架
Reactor主從多線程模型dom
注意:異步
服務器端的ServerSocketChannel 只綁定到了bossGroup 中的一個線程, 所以在調用Java NIO 的 Selector.select 處理客戶端的鏈接請求時, 其實是在一個線程中的, 因此對只有一個服務的應用來講, bossGroup設置多個線程是沒有什麼做用的, 反而還會形成資源浪費.
NioEventLoopGroup 與 Reactor 線程模型的對應
//單線程模型 EventLoopGroup bossGroup = new NioEventLoopGroup(1); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup) .channel(NioServerSocketChannel.class) ... //多線程模型 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) ... //主從多線程模型 EventLoopGroup bossGroup = new NioEventLoopGroup(4); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) ...
NioEventLoop主要幹兩件事:
IO 事件的處理
//爲了保證定時任務的執行不會由於過分擠佔IO事件的處理,Netty提供了IO執行比例供用戶設置,用戶能夠設置分 //配給IO的執行比例,防止由於海量定時任務的執行致使IO處理超時或者積壓。默認是1:1 final int ioRatio = this.ioRatio;//默認爲50 if (ioRatio == 100) { try { processSelectedKeys(); } finally { // Ensure we always run tasks. runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } }
hannelHandlerContext ctx = context;
ChannelPipeline pipeline = ctx.pipeline(); //1 pipeline.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
ChannelHandlerContext ctx = context; ctx.write(Unpooled.copiedBuffer("Netty in Action",CharsetUtil.UTF_8));
Netty的IO線程NioEventLoop因爲聚合了多路複用器Selector,能夠同時併發處理成百上千個客戶端Channel,因爲讀寫操做都是非阻塞的,這就能夠充分提高IO線程的運行效率,避免因爲頻繁IO阻塞致使的線程掛起。
Netty的接收和發送ByteBuffer採用DIRECT BUFFERS,直接使用堆外直接內存進行Socket讀寫。
ByteBuf header = ... ByteBuf body = ... CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer(); compositeByteBuf.addComponents(true, header, body);
雖然看起來 CompositeByteBuf 是由兩個 ByteBuf 組合而成的, 不過在 CompositeByteBuf內部, 這兩個 ByteBuf 都是單獨存在的, CompositeByteBuf 只是邏輯上是一個總體. 以下:
ByteBuf byteBuf = ... ByteBuf header = byteBuf.slice(0, 5); ByteBuf body = byteBuf.slice(5, 10);
byte[] bytes = ... ByteBuf byteBuf = Unpooled.buffer(); byteBuf.writeBytes(bytes); //Netty byte[] bytes = ... ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);
//傳統io byte[] temp = new byte[1024]; FileInputStream in = new FileInputStream(srcFile); FileOutputStream out = new FileOutputStream(destFile); int length; while ((length = in.read(temp)) != -1) { out.write(temp, 0, length); } //netty RandomAccessFile srcFile = new RandomAccessFile(srcFileName, "r"); FileChannel srcFileChannel = srcFile.getChannel(); RandomAccessFile destFile = new RandomAccessFile(destFileName, "rw"); FileChannel destFileChannel = destFile.getChannel(); long position = 0; long count = srcFileChannel.size(); srcFileChannel.transferTo(position, count, destFileChannel);
PoolChunkList<T> qInit:存儲內存利用率0-25%的chunk PoolChunkList<T> q000:存儲內存利用率1-50%的chunk PoolChunkList<T> q025:存儲內存利用率25-75%的chunk PoolChunkList<T> q050:存儲內存利用率50-100%的chunk PoolChunkList<T> q075:存儲內存利用率75-100%的chunk PoolChunkList<T> q100:存儲內存利用率100%的chunk
PoolArena中申請內存:
爲了儘量提高性能,Netty採用了串行無鎖化設計,在IO線程內部進行串行操做,避免多線程競爭致使的性能降低。表面上看,串行化設計彷佛CPU利用率不高,併發程度不夠。可是,經過調整NIO線程池的線程參數,能夠同時啓動多個串行化的線程並行運行,這種局部無鎖化的串行線程設計相比一個隊列-多個工做線程模型性能更優
Netty的NioEventLoop讀取到消息以後,直接調用ChannelPipeline的fireChannelRead(Object msg),只要用戶不主動切換線程,一直會由NioEventLoop調用到用戶的Handler,期間不進行線程切換,這種串行化處理方式避免了多線程操做致使的鎖的競爭,從性能角度看是最優的。
Netty默認提供了對Google Protobuf的支持(Protobuf性能比較好),經過擴展Netty的編解碼接口,用戶能夠實現其它的高性能序列化框架