前言bootstrap
最近趁着跟老東家提離職以後、到新公司報道以前的這段空閒時期,着力研究了一番netty框架,對其有了一些淺薄的認識,後續的幾篇文章會以netty爲主,將近期所學記錄一二,也爭取能幫未對netty有過了解的園友對netty創建一個完整的認識。netty做爲一個優秀的網絡框架,值得爲其花費一番時間。數組
netty的內容細究一下也有很多(雖然與Spring這種龐大的框架相比代碼量少不少),本文做爲netty系列的第一篇,決定先攀登一個高峯:講一下netty的串行無鎖化。這是netty的一個招牌特性,能夠說理解了它,就掌握了netty的命門。開始正文以前,須要額外提醒一下,本文雖然是netty系列的第一篇,可是面向的對象是對netty有過必定了解的園友,若是是新人建議從第二篇【EventLoopGroup的初始化】開始看。promise
1、Talk is cheap,show me your code安全
先把netty的示例demo奉上,此處只要服務端構建的代碼就能夠了。網絡
1 public class NettyDemo1 { 2 // netty服務端的通常性寫法 3 public static void main(String[] args) { 4 EventLoopGroup boss = new NioEventLoopGroup(1); 5 EventLoopGroup worker = new NioEventLoopGroup(); 6 try { 7 ServerBootstrap bootstrap = new ServerBootstrap(); 8 bootstrap.group(boss, worker).channel(NioServerSocketChannel.class) 9 .option(ChannelOption.SO_BACKLOG, 100) 10 .childHandler(new ChannelInitializer<SocketChannel>() { 11 @Override 12 protected void initChannel(SocketChannel socketChannel) throws Exception { 13 ChannelPipeline pipeline = socketChannel.pipeline(); 14 pipeline.addLast(new StringDecoder()); 15 pipeline.addLast(new StringEncoder()); 16 pipeline.addLast(new NettyServerHandler()); 17 } 18 }); 19 ChannelFuture channelFuture = bootstrap.bind(90).sync(); 20 channelFuture.channel().closeFuture().sync(); 21 } catch (Exception e) { 22 e.printStackTrace(); 23 } finally { 24 boss.shutdownGracefully(); 25 worker.shutdownGracefully(); 26 } 27 } 28 }
從第4行到第18行,都是在進行初始化的屬性賦值,第19行bind方法觸發真正的串行無鎖化處理邏輯。串行無鎖化如何理解呢?望文生義便可,經過串行(即順序執行),來達到即便沒有鎖也能夠線程安全的效果。具體如何作到呢?且往下追蹤bind方法。框架
2、源碼追蹤socket
一、initAndRegister方法ide
在AbstractBootstrap類中的doBind方法調用了下面的initAndRegister方法(該方法在netty中很重要,如今先記住混個眼熟),前兩步雖然也很重要,但跟本文的主題關係不大,下面主要看第3步:oop
1 final ChannelFuture initAndRegister() { 2 Channel channel = null; 3 try { 4 channel = channelFactory.newChannel();// 一、實例化NioServerSocketChannel 5 init(channel);// 二、初始化該channel 6 } catch (Throwable t) { 7 // ...省略異常處理 8 } 9 ChannelFuture regFuture = config().group().register(channel);// 三、註冊channel 10 if (regFuture.cause() != null) { 11 if (channel.isRegistered()) { 12 channel.close(); 13 } else { 14 channel.unsafe().closeForcibly(); 15 } 16 } 17 return regFuture; 18 }
第3步中,group()方法返回的是AbstractBootstrap中的group屬性,該屬性就是上面服務端demo中的boss變量。this
二、boss.register(channel)方法
追蹤進入MultithreadEventLoopGroup的register方法:
1 public ChannelFuture register(Channel channel) { 2 return next().register(channel); 3 }
next方法即從EventLoopGroup的EventExecutor數組中輪詢取一個EventExecutor實例,即一個NioEventLoop對象,而後再調用NioEventLoop的register方法。
三、NioEventLoop.register(channel)方法
跟蹤到SingleThreadEventLoop的register方法,以下,此處的promise.channel()返回值即以前的NioServerSocketChannel,它的unsafe()方法返回NioMessageUnsafe對象,因此此處最終調用的是NioMessageUnsafe的register方法。
1 public ChannelFuture register(final ChannelPromise promise) { 2 ObjectUtil.checkNotNull(promise, "promise"); 3 promise.channel().unsafe().register(this, promise); 4 return promise; 5 }
四、NioMessageUnsafe.register方法
該方法位於AbstractChannel的內部類AbstractUnsafe中(AbstractUnsafe是NioMessageUnsafe的父類):
1 public final void register(EventLoop eventLoop, final ChannelPromise promise) { 2 //... 省略異常校驗 3 AbstractChannel.this.eventLoop = eventLoop; 4 5 if (eventLoop.inEventLoop()) {// 串行無鎖化的第一個判斷 6 register0(promise); 7 } else { 8 try { 9 eventLoop.execute(new Runnable() { 10 @Override 11 public void run() { 12 register0(promise); 13 } 14 }); 15 } catch (Throwable t) { 16 //... 省略異常處理 17 } 18 } 19 }
在eventLoop的父類SingleThreadEventExecutor中有一個成員變量thread,第5行代碼用來判斷這個thread是否是Thread.currentThread(),若是不是,則進入esle邏輯,執行eventLoop.execute方法。
五、eventLoop.execute方法
該方法位於SingleThreadEventExecutor中,主要作的事情有三步:1)、inEventLoop()方法判斷當前線程是否是eventLoop中記錄的線程;2)、addTask將任務放入隊列中;3)、startThread()判斷是否啓動新線程。每步對應的代碼在下面已經標出,比較簡單。
執行到execute方法時,有三種狀況:1)、eventLoop中記錄的線程爲null,即剛完成實例化;2)、eventLoop中記錄的線程不爲null,但不是當前線程;3)、eventLoop中記錄的線程不爲null,是當前線程(正常不會出現這種狀況)。若是是第一種thread=null,則往隊列中添加完任務後會進入startThread方法,在startThread方法中判斷state屬性是否是未啓動,若是是則建立一個新的線程並經過cas將state置爲已啓動;若是是第二種狀況thread!=null,往隊列中添加任務後也會進入startThread方法,但因爲state屬性已是已啓動了,因此不會建立新的線程。至此,task中都添加了一個任務,且thread也有值了。
task中的任務是合適被執行的?且往下看。
1 public void execute(Runnable task) { 2 if (task == null) { 3 throw new NullPointerException("task"); 4 } 5 6 boolean inEventLoop = inEventLoop(); 7 addTask(task); // 將task任務放入隊列中 8 if (!inEventLoop) { // 串行無鎖化的第二次判斷 9 startThread(); // 若是須要,會啓動一個線程 重要*** 10 if (isShutdown()) { 11 boolean reject = false; 12 try { 13 if (removeTask(task)) { 14 reject = true; 15 } 16 } catch (UnsupportedOperationException e) { 17 } 18 if (reject) { 19 reject(); 20 } 21 } 22 } 23 24 if (!addTaskWakesUp && wakesUpForTask(task)) { 25 wakeup(inEventLoop); 26 } 27 }
六、doStartThread()方法
此方法位於SingleThreadEventExecutor類,executor即封裝的線程池了,在run方法中完成了對thread的賦值,而後執行了當前類的run方法。
七、SingleThreadEventExecutor.run()方法
該方法的實如今NioEventLoop中,以下,多出對 runAllTasks()方法進行調用,就是在這個方法中完成的對隊列中任務的執行,直接調用的task.run方法,即單線程串行消費隊列。
1 protected void run() { 2 for (;;) { 3 try { 4 try { 5 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { 6 case SelectStrategy.CONTINUE: 7 continue; 8 case SelectStrategy.BUSY_WAIT: 9 case SelectStrategy.SELECT: 10 select(wakenUp.getAndSet(false)); 11 if (wakenUp.get()) { 12 selector.wakeup(); 13 } 14 // fall through 15 default: 16 } 17 } catch (IOException e) { 18 // ...異常處理省略 19 continue; 20 } 21 22 cancelledKeys = 0; 23 needsToSelectAgain = false; 24 final int ioRatio = this.ioRatio; 25 if (ioRatio == 100) { 26 try { 27 processSelectedKeys(); 28 } finally { 29 // Ensure we always run tasks. 30 runAllTasks(); 31 } 32 } else { 33 final long ioStartTime = System.nanoTime(); 34 try { 35 processSelectedKeys(); 36 } finally { 37 // Ensure we always run tasks. 38 final long ioTime = System.nanoTime() - ioStartTime; 39 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); 40 } 41 } 42 } catch (Throwable t) { 43 handleLoopException(t); 44 } 45 // ...省略無關代碼 46 } 47 }
3、小結
上面是過了一遍netty串行無鎖化的流程,下面對整個流程作一下總結。
netty串行無鎖化的實現,是藉助了SingleThreadEventExecutor中的Thread thread、int state和Queue<Runnable> taskQueue這三個成員變量。state變量用於判斷當前EventExecutor是否啓動(未啓動則經過線程池建立一個啓動線程並賦值給thread),thread變量用來判斷當前線程是否是啓動線程(經過inEventLoop方法實現),taskQueue用於存放待啓動線程串行執行的任務。在NioEventLoop的run方法中,每一次循環都遍歷一遍taskQueue執行裏面的任務。
能夠知道,一個NioEventLoop對應一個串行執行的啓動線程,Reactor主線程對應的是boss中的一個NioEventLoop,負責串行執行客戶端鏈接事件,Reactor子線程是有客戶端接入事件後由主線程啓用的,對應worker中的一個NioEventLoop,負責串行執行客戶端讀寫時間。
至此,串行無鎖化的內容就結束了,後面將從EventLoopGroup的初始化開始,一步步從零開始拆解netty的做用原理,敬請期待!
原創不易,如有問題,還請批評指正,感謝!