一、Netty是由JBOSS提供的一個java開源框架。Netty提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。也就是說,Netty 是一個基於NIO的客戶、服務器端編程框架,使用Netty 能夠確保你快速和簡單的開發出一個網絡應用,例如實現了某種協議的客戶,服務端應用。Netty至關簡化和流線化了網絡應用的編程開發過程,例如,TCP和UDP的socket服務開發。java
二、目前netty有3個版本netty三、netty四、netty5。3個版本的內容有所不一樣。neety3是核心的代碼介紹。相對於netty四、和netty5的複雜性來講。netty3的源碼是值得學習的。我這裏解析了netty3的一些源碼,僅供你們理解,也是爲了方便你們理解作了不少簡化。不表明做者的開發思路。編程
三、咱們先來看一張圖(這張圖是我在學習源碼的時候扣的,哈哈)數組
1、傳統NIO流安全
1)一個線程裏面,存在一個selector,固然這個selector也承擔起看大門和服務客人的工做。服務器
2)這裏無論多少客戶端進來,都是這個selector來處理。這樣就就加大了這個服務員的工做量網絡
3)爲了加入線程池,讓多個selector同時工做,當時目的性都是同樣的。併發
4)雖然看大門的和服務客人的都是服務員,可是仍是存在差異的。爲了更好的處理多個線程的問題。因此這裏netty就誕生了。app
2、netty框架框架
理解:異步
1)netty3的框架也是基於nio流作出來的。因此這裏會詳細介紹netty3框架的思路
2)將看門的服務員和服務客人的服務員分開。造成兩塊(也就是2個線程池,也就是後面的boss和worker)
3)當一個客人來的時候,首先boss,進行接待。而後boss分配工做給worker,這個,在兩個線程池的工做下,有條不亂。
4)原理:就是將看大門的selector和服務客人的selector分開。而後經過boss線程池,下發任務給對應的worker
四、netty3源碼分析
1)加入對應的jar包。我這裏爲了瞭解源碼用的是netty3的包。
<dependency> <groupId>io.netty</groupId> <artifactId>netty</artifactId> <version>3.10.6.Final</version> </dependency>
2)目錄結構
說明:
a、NettyBoss、NettyWork是針對於selector作區分。雖然他們不少共性,我這裏爲了好理解,並無作抽象類(忽略開發思路)。
b、ThreadHandle是用來初始化線程池和對應的接口。
c、Start爲啓動類
3)NettyBoss(看大門的服務員,第一種線程selector)
package com.troy.application.netty; import java.io.IOException; import java.nio.channels.*; import java.util.Iterator; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; public class NettyBoss { //線程池 public final Executor executor; //boss選擇器 protected Selector selector; //原子變量,主要是用來保護線程安全。當本線程執行的時候,排除其餘線程的執行 protected final AtomicBoolean wakenUp = new AtomicBoolean(); //隊列,線程安全隊列。 public final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<>(); //線程處理,這裏主要是拿到work的線程池 protected ThreadHandle threadHandle; //初始化 public NettyBoss(Executor executor,ThreadHandle threadHandle) { //賦值 this.executor = executor; this.threadHandle = threadHandle; try { //每個線程選擇器 this.selector = Selector.open(); } catch (IOException e) { e.printStackTrace(); } //從線程中獲取一個線程執行如下內容 executor.execute(() -> { while (true) { try { //這裏的目前就是排除其餘線程同事執行,false由於這裏處於阻塞狀態,不用開啓 wakenUp.set(false); //選擇器阻塞 selector.select(); //運行隊列中的任務 while (true) { final Runnable task = taskQueue.poll(); if (task == null) { break; } //若是任務存在開始運行 task.run(); } //對進來的進行處理 this.process(selector); } catch (Exception e) { e.printStackTrace(); } } }); } public void process(Selector selector) throws IOException { Set<SelectionKey> selectedKeys = selector.selectedKeys(); if (selectedKeys.isEmpty()) { return; } for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) { SelectionKey key = i.next(); i.remove(); ServerSocketChannel server = (ServerSocketChannel) key.channel(); // 新客戶端 SocketChannel channel = server.accept(); // 設置爲非阻塞 channel.configureBlocking(false); // 獲取一個worker NettyWork nextworker = threadHandle.workeres[Math.abs(threadHandle.workerIndex.getAndIncrement() % threadHandle.workeres.length)]; // 註冊新客戶端接入任務 Runnable runnable = () -> { try { //將客戶端註冊到selector中 channel.register(nextworker.selector, SelectionKey.OP_READ); } catch (ClosedChannelException e) { e.printStackTrace(); } }; //添加到work的隊列中 nextworker.taskQueue.add(runnable); if (nextworker.selector != null) { //這裏的目前就是開啓執行過程 if (nextworker.wakenUp.compareAndSet(false, true)) { //放開本次阻塞,進行下一步執行 nextworker.selector.wakeup(); } } else { //任務完成移除線程 taskQueue.remove(runnable); } System.out.println("新客戶端連接"); } } }
解釋:
a、初始化的時候,賦值線程池,和線程處理類(線程處理類目的是獲取worker的工做線程)
b、executor爲線程池的執行過程。
c、selector.select()爲造成阻塞,wakenUp爲了線程安全考覈。在接入客戶端的時候用selector.wakeup()來放開本次阻塞(很重要)。
d、而後在worker安全隊列中執行對應工做。(taskQueue的目前在boss和worker中的做用都是爲了考慮線程安全,這裏採用線程安全隊列的目的是爲了避免直接操做其餘線程)
e、wakenUp.compareAndSet(false, true),這裏是考慮併發問題。在本線程運行的時候,其餘線程處於等待狀態。這裏也是爲了線程安全考慮。
4)NettyWork(服務客人的服務員,第二種selector)
package com.troy.application.netty; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; public class NettyWork { //線程池 public final Executor executor; //boss選擇器 protected Selector selector; //原子變量,主要是用來保護線程安全。當本線程執行的時候,排除其餘線程的執行 protected final AtomicBoolean wakenUp = new AtomicBoolean(); //隊列,線程安全隊列。 public final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<>(); //初始化 public NettyWork(Executor executor) { this.executor = executor; try { //每個work也須要一個選擇器用來管理通道 this.selector = Selector.open(); } catch (IOException e) { e.printStackTrace(); } //從線程池中獲取一個線程開始執行 executor.execute(() -> { while (true) { try { //阻塞狀態排除問題 wakenUp.set(false); //阻塞 selector.select(); //處理work任務 while (true) { final Runnable task = taskQueue.poll(); if (task == null) { break; } //存在work任務開始執行 task.run(); } //處理任務 this.process(selector); } catch (Exception e) { e.printStackTrace(); } } }); } public void process(Selector selector) throws IOException { Set<SelectionKey> selectedKeys = selector.selectedKeys(); if (selectedKeys.isEmpty()) { return; } Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator(); while (ite.hasNext()) { SelectionKey key = (SelectionKey) ite.next(); // 移除,防止重複處理 ite.remove(); // 獲得事件發生的Socket通道 SocketChannel channel = (SocketChannel) key.channel(); // 數據總長度 int ret = 0; boolean failure = true; ByteBuffer buffer = ByteBuffer.allocate(1024); //讀取數據 try { ret = channel.read(buffer); failure = false; } catch (Exception e) { // ignore } //判斷是否鏈接已斷開 if (ret <= 0 || failure) { key.cancel(); System.out.println("客戶端斷開鏈接"); }else{ System.out.println("收到數據:" + new String(buffer.array())); //回寫數據 ByteBuffer outBuffer = ByteBuffer.wrap("收到\n".getBytes()); channel.write(outBuffer);// 將消息回送給客戶端 } } } }
解釋:
a、worker的執行方式基本上面和boss的方式是同樣的,只不夠是處理方式不同
b、這裏須要注意的是,都是考慮線程隊列執行。
3)ThreadHandle(線程處理,這裏主要是啓動須要的東西)
package com.troy.application.netty; import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; public class ThreadHandle { public final AtomicInteger bossIndex = new AtomicInteger(); public static NettyBoss[] bosses; public final AtomicInteger workerIndex = new AtomicInteger(); public static NettyWork[] workeres; public ThreadHandle(ExecutorService boss,ExecutorService work) { this.bosses = new NettyBoss[1]; //初始化boss線程池 for (int i = 0; i < bosses.length; i++) { bosses[i] = new NettyBoss(boss,this); } this.workeres = new NettyWork[Runtime.getRuntime().availableProcessors() * 2]; //初始化work線程池 for (int i = 0; i < workeres.length; i++) { workeres[i] = new NettyWork(work); } } public void bind(InetSocketAddress inetSocketAddress) { try { // 得到一個ServerSocket通道 ServerSocketChannel serverChannel = ServerSocketChannel.open(); // 設置通道爲非阻塞 serverChannel.configureBlocking(false); // 將該通道對應的ServerSocket綁定到port端口 serverChannel.socket().bind(inetSocketAddress); //獲取一個boss線程 NettyBoss nextBoss = bosses[Math.abs(bossIndex.getAndIncrement() % workeres.length)]; //向boss註冊一個ServerSocket通道 Runnable runnable = () -> { try { //註冊serverChannel到selector serverChannel.register(nextBoss.selector, SelectionKey.OP_ACCEPT); } catch (ClosedChannelException e) { e.printStackTrace(); } }; //加入任務隊列 nextBoss.taskQueue.add(runnable); if (nextBoss.selector != null) { //排除其餘任務處理 if (nextBoss.wakenUp.compareAndSet(false, true)) { //放開阻塞 nextBoss.selector.wakeup(); } } else { //移除任務 nextBoss.taskQueue.remove(runnable); } } catch (Exception e) { e.printStackTrace(); } } }
解釋:
a、這裏採用數組的形式,主要目的是考慮多個看門的,和多個服務客人的線程。爲了好控制,好選擇,哪個來執行。
b、端口的註冊,在NettyBoss裏面進行初始化的的原理都是同樣的。
4)start
package com.troy.application.netty; import java.net.InetSocketAddress; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Start { public static void main(String[] args) { //聲明線程池 ExecutorService boss = Executors.newCachedThreadPool(); ExecutorService work = Executors.newCachedThreadPool(); //初始化線程池 ThreadHandle threadHandle = new ThreadHandle(boss,work); //聲明端口 threadHandle.bind(new InetSocketAddress(9000)); System.out.println("start"); } }
說明一下流程
a、初始化boss和work。讓boss線程池加入設定第一種boss的selector,而且處於阻塞狀態。work的初始化也基本上是同樣的,只不過換成了第二種selector線程池,處於阻塞狀態。
b、當線程處理類初始化監聽端口的時候。就是選擇boss中其中一個selector。聲明一個線程先監聽,加入boss的線程安全隊列中。而後放開boss阻塞,向下執行。線程執行會監聽對應端口並阻塞。
c、當一個客戶端接入的時候,boss中的selector會監聽到對應端口。而後選擇work線程中的一個selector給work分派任務。
d、最後work中的selector來處理事務。
四、源碼下載:https://pan.baidu.com/s/1pKIxuMf
五、本代碼只是用於理解netty的實現過程,不表明開發思路。其中我爲了簡化代碼,作了不少調整。目的就是壓縮代碼,方便理解。