Netty是一個基於異步與事件驅動的網絡應用程序框架,它支持快速與簡單地開發可維護的高性能的服務器與客戶端。html
所謂事件驅動就是由經過各類事件響應來決定程序的流程,在Netty中處處都充滿了異步與事件驅動,這種特色使得應用程序能夠以任意的順序響應在任意的時間點產生的事件,它帶來了很是高的可伸縮性,讓你的應用能夠在須要處理的工做不斷增加時,經過某種可行的方式或者擴大它的處理能力來適應這種增加。java
Netty提供了高性能與易用性,它具備如下特色:linux
擁有設計良好且統一的API,支持NIO與OIO(阻塞IO)等多種傳輸類型,支持真正的無鏈接UDP Socket。c++
簡單而強大的線程模型,可高度定製線程(池)。git
良好的模塊化與解耦,支持可擴展和靈活的事件模型,能夠很輕鬆地分離關注點以複用邏輯組件(可插拔的)。github
性能高效,擁有比Java核心API更高的吞吐量,經過zero-copy功能以實現最少的內存複製消耗。算法
內置了許多經常使用的協議編解碼器,如HTTP、SSL、WebScoket等常見協議能夠經過Netty作到開箱即用。用戶也能夠利用Netty簡單方便地實現本身的應用層協議。編程
大多數人使用Netty主要仍是爲了提升應用的性能,而高性能則離不開非阻塞IO。Netty的非阻塞IO是基於Java NIO的,而且對其進行了封裝(直接使用Java NIO API在高複雜度下的應用中是一項很是繁瑣且容易出錯的操做,而Netty幫你封裝了這些複雜操做)。api
NIO能夠稱爲New IO也能夠稱爲Non-blocking IO,它比Java舊的阻塞IO在性能上要高效許多(若是讓每個鏈接中的IO操做都單首創建一個線程,那麼阻塞IO並不會比NIO在性能上落後,但不可能建立無限多的線程,在鏈接數很是多的狀況下會很糟糕)。數組
ByteBuffer:NIO的數據傳輸是基於緩衝區的,ByteBuffer正是NIO數據傳輸中所使用的緩衝區抽象。ByteBuffer支持在堆外分配內存,而且嘗試避免在執行I/O操做中的多餘複製。通常的I/O操做都須要進行系統調用,這樣會先切換到內核態,內核態要先從文件讀取數據到它的緩衝區,只有等數據準備完畢後,纔會從內核態把數據寫到用戶態,所謂的阻塞IO其實就是說的在等待數據準備好的這段時間內進行阻塞。若是想要避免這個額外的內核操做,能夠經過使用mmap(虛擬內存映射)的方式來讓用戶態直接操做文件。
Channel:它相似於文件描述符,簡單地來講它表明了一個實體(如一個硬件設備、文件、Socket或者一個可以執行一個或多個不一樣的I/O操做的程序組件)。你能夠從一個Channel中讀取數據到緩衝區,也能夠將一個緩衝區中的數據寫入到Channel。
Selector:選擇器是NIO實現的關鍵,NIO採用的是I/O多路複用的方式來實現非阻塞,Selector經過在一個線程中監聽每一個Channel的IO事件來肯定有哪些已經準備好進行IO操做的Channel,所以能夠在任什麼時候間檢查任意的讀操做或寫操做的完成狀態。這種方式避免了等待IO操做準備數據時的阻塞,使用較少的線程即可以處理許多鏈接,減小了線程切換與維護的開銷。
瞭解了NIO的實現思想以後,我以爲還頗有必要了解一下Unix中的I/O模型,Unix中擁有如下5種I/O模型:
阻塞I/O(Blocking I/O)
非阻塞I/O(Non-blocking I/O)
I/O多路複用(I/O multiplexing (select and poll))
信號驅動I/O(signal driven I/O (SIGIO))
異步I/O(asynchronous I/O (the POSIX aio_functions))
阻塞I/O模型是最多見的I/O模型,一般咱們使用的InputStream/OutputStream都是基於阻塞I/O模型。在上圖中,咱們使用UDP做爲例子,recvfrom()函數是UDP協議用於接收數據的函數,它須要使用系統調用並一直阻塞到內核將數據準備好,以後再由內核緩衝區複製數據到用戶態(便是recvfrom()接收到數據),所謂阻塞就是在等待內核準備數據的這段時間內什麼也不幹。
舉個生活中的例子,阻塞I/O就像是你去餐廳吃飯,在等待飯作好的時間段中,你只能在餐廳中坐着乾等(若是你在玩手機那麼這就是非阻塞I/O了)。
在非阻塞I/O模型中,內核在數據還沒有準備好的狀況下回返回一個錯誤碼EWOULDBLOCK
,而recvfrom並無在失敗的狀況下選擇阻塞休眠,而是不斷地向內核詢問是否已經準備完畢,在上圖中,前三次內核都返回了EWOULDBLOCK
,直到第四次詢問時,內核數據準備完畢,而後開始將內核中緩存的數據複製到用戶態。這種不斷詢問內核以查看某種狀態是否完成的方式被稱爲polling(輪詢)
。
非阻塞I/O就像是你在點外賣,只不過你很是心急,每隔一段時間就要打電話問外賣小哥有沒有到。
I/O多路複用的思想跟非阻塞I/O是同樣的,只不過在非阻塞I/O中,是在recvfrom的用戶態(或一個線程)中去輪詢內核,這種方式會消耗大量的CPU時間。而I/O多路複用則是經過select()或poll()系統調用來負責進行輪詢,以實現監聽I/O讀寫事件的狀態。如上圖中,select監聽到一個datagram可讀時,就交由recvfrom去發送系統調用將內核中的數據複製到用戶態。
這種方式的優勢很明顯,經過I/O多路複用能夠監聽多個文件描述符,且在內核中完成監控的任務。但缺點是至少須要兩個系統調用(select()與recvfrom())。
I/O多路複用一樣適用於點外賣這個例子,只不過你在等外賣的期間徹底能夠作本身的事情,當外賣到的時候會經過外賣APP或者由外賣小哥打電話來通知你。
Unix中提供了兩種I/O多路複用函數,select()和poll()。select()的兼容性更好,但它在單個進程中所能監控的文件描述符是有限的,這個值與FD_SETSIZE
相關,32位系統中默認爲1024,64位系統中爲2048。select()還有一個缺點就是他輪詢的方式,它採起了線性掃描的輪詢方式,每次都要遍歷FD_SETSIZE個文件描述符,無論它們是否活不活躍的。poll()本質上與select()的實現沒有區別,不過在數據結構上區別很大,用戶必須分配一個pollfd結構數組,該數組維護在內核態中,正因如此,poll()並不像select()那樣擁有大小上限的限制,但缺點一樣也很明顯,大量的fd數組會在用戶態與內核態之間不斷複製,無論這樣的複製是否有意義。
還有一種比select()與poll()更加高效的實現叫作epoll(),它是由Linux內核2.6推出的可伸縮的I/O多路複用實現,目的是爲了替代select()與poll()。epoll()一樣沒有文件描述符上限的限制,它使用一個文件描述符來管理多個文件描述符,並使用一個紅黑樹來做爲存儲結構。同時它還支持邊緣觸發(edge-triggered)與水平觸發(level-triggered)兩種模式(poll()只支持水平觸發),在邊緣觸發模式下,epoll_wait
僅會在新的事件對象首次被加入到epoll時返回,而在水平觸發模式下,epoll_wait
會在事件狀態未變動前不斷地觸發。也就是說,邊緣觸發模式只會在文件描述符變爲就緒狀態時通知一次,水平觸發模式會不斷地通知該文件描述符直到被處理。
關於epoll_wait
請參考以下epoll API。
// 建立一個epoll對象並返回它的文件描述符。
// 參數flags容許修改epoll的行爲,它只有一個有效值EPOLL_CLOEXEC。
int epoll_create1(int flags);
// 配置對象,該對象負責描述監控哪些文件描述符和哪些事件。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// 等待與epoll_ctl註冊的任何事件,直至事件發生一次或超時。
// 返回在events中發生的事件,最多同時返回maxevents個。
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
複製代碼
epoll另外一亮點是採用了事件驅動的方式而不是輪詢,在epoll_ctl
中註冊的文件描述符在事件觸發的時候會經過一個回調機制來激活該文件描述符,epoll_wait
即可以收到通知。這樣效率就不會與文件描述符的數量成正比。epoll還採用了mmap來減小內核態與用戶態之間的數據傳輸開銷。
在Java NIO2(從JDK1.7開始引入)中,只要Linux內核版本在2.6以上,就會採用epoll,以下源碼所示(DefaultSelectorProvider.java)。
public static SelectorProvider create() {
String osname = AccessController.doPrivileged(
new GetPropertyAction("os.name"));
if ("SunOS".equals(osname)) {
return new sun.nio.ch.DevPollSelectorProvider();
}
// use EPollSelectorProvider for Linux kernels >= 2.6
if ("Linux".equals(osname)) {
String osversion = AccessController.doPrivileged(
new GetPropertyAction("os.version"));
String[] vers = osversion.split("\\.", 0);
if (vers.length >= 2) {
try {
int major = Integer.parseInt(vers[0]);
int minor = Integer.parseInt(vers[1]);
if (major > 2 || (major == 2 && minor >= 6)) {
return new sun.nio.ch.EPollSelectorProvider();
}
} catch (NumberFormatException x) {
// format not recognized
}
}
}
return new sun.nio.ch.PollSelectorProvider();
}
複製代碼
信號驅動I/O模型使用到了信號,內核在數據準備就緒時會經過信號來進行通知。咱們首先開啓了一個信號驅動I/O套接字,並使用sigaction系統調用來安裝信號處理程序,內核直接返回,不會阻塞用戶態。當datagram準備好時,內核會發送SIGIO信號,recvfrom接收到信號後會發送系統調用開始進行I/O操做。
這種模型的優勢是主進程(線程)不會被阻塞,當數據準備就緒時,經過信號處理程序來通知主進程(線程)準備進行I/O操做與對數據的處理。
咱們以前討論的各類I/O模型不管是阻塞仍是非阻塞,它們所說的阻塞都是指的數據準備階段。異步I/O模型一樣依賴於信號處理程序來進行通知,但與以上I/O模型都不相同的是,異步I/O模型通知的是I/O操做已經完成,而不是數據準備完成。
能夠說異步I/O模型纔是真正的非阻塞,主進程只管作本身的事情,而後在I/O操做完成時調用回調函數來完成一些對數據的處理操做便可。
閒扯了這麼多,想必你們已經對I/O模型有了一個深入的認識。以後,咱們將會結合部分源碼(Netty4.X)來探討Netty中的各大核心組件,以及如何使用Netty,你會發現實現一個Netty程序是多麼簡單(並且還伴隨了高性能與可維護性)。
本文做者爲SylvanasSun(sylvanas.sun@gmail.com),首發於SylvanasSun’s Blog。 原文連接:https://sylvanassun.github.io/2017/11/30/2017-11-30-netty_introduction/ (轉載請務必保留本段聲明,而且保留超連接。)
網絡傳輸的基本單位是字節,在Java NIO中提供了ByteBuffer做爲字節緩衝區容器,但該類的API使用起來不太方便,因此Netty實現了ByteBuf做爲其替代品,下面是使用ByteBuf的優勢:
相比ByteBuffer使用起來更加簡單。
經過內置的複合緩衝區類型實現了透明的zero-copy。
容量能夠按需增加。
讀和寫使用了不一樣的索引指針。
支持鏈式調用。
支持引用計數與池化。
能夠被用戶自定義的緩衝區類型擴展。
在討論ByteBuf以前,咱們先須要瞭解一下ByteBuffer的實現,這樣才能比較深入地明白它們之間的區別。
ByteBuffer繼承於abstract class Buffer
(因此還有LongBuffer、IntBuffer等其餘類型的實現),本質上它只是一個有限的線性的元素序列,包含了三個重要的屬性。
Capacity:緩衝區中元素的容量大小,你只能將capacity個數量的元素寫入緩衝區,一旦緩衝區已滿就須要清理緩衝區才能繼續寫數據。
Position:指向下一個寫入數據位置的索引指針,初始位置爲0,最大爲capacity-1。當寫模式轉換爲讀模式時,position須要被重置爲0。
Limit:在寫模式中,limit是能夠寫入緩衝區的最大索引,也就是說它在寫模式中等價於緩衝區的容量。在讀模式中,limit表示能夠讀取數據的最大索引。
因爲Buffer中只維護了position一個索引指針,因此它在讀寫模式之間的切換須要調用一個flip()方法來重置指針。使用Buffer的流程通常以下:
寫入數據到緩衝區。
調用flip()方法。
從緩衝區中讀取數據
調用buffer.clear()或者buffer.compact()清理緩衝區,以便下次寫入數據。
RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw");
FileChannel inChannel = aFile.getChannel();
// 分配一個48字節大小的緩衝區
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buf); // 讀取數據到緩衝區
while (bytesRead != -1) {
buf.flip(); // 將position重置爲0
while(buf.hasRemaining()){
System.out.print((char) buf.get()); // 讀取數據並輸出到控制檯
}
buf.clear(); // 清理緩衝區
bytesRead = inChannel.read(buf);
}
aFile.close();
複製代碼
Buffer中核心方法的實現也很是簡單,主要就是在操做指針position。
/** * Sets this buffer's mark at its position. * * @return This buffer */
public final Buffer mark() {
mark = position; // mark屬性是用來標記當前索引位置的
return this;
}
// 將當前索引位置重置爲mark所標記的位置
public final Buffer reset() {
int m = mark;
if (m < 0)
throw new InvalidMarkException();
position = m;
return this;
}
// 翻轉這個Buffer,將limit設置爲當前索引位置,而後再把position重置爲0
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
// 清理緩衝區
// 說是清理,也只是把postion與limit進行重置,以後再寫入數據就會覆蓋以前的數據了
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}
// 返回剩餘空間
public final int remaining() {
return limit - position;
}
複製代碼
Java NIO中的Buffer API操做的麻煩之處就在於讀寫轉換須要手動重置指針。而ByteBuf沒有這種繁瑣性,它維護了兩個不一樣的索引,一個用於讀取,一個用於寫入。當你從ByteBuf讀取數據時,它的readerIndex將會被遞增已經被讀取的字節數,一樣的,當你寫入數據時,writerIndex則會遞增。readerIndex的最大範圍在writerIndex的所在位置,若是試圖移動readerIndex超過該值則會觸發異常。
ByteBuf中名稱以read或write開頭的方法將會遞增它們其對應的索引,而名稱以get或set開頭的方法則不會。ByteBuf一樣能夠指定一個最大容量,試圖移動writerIndex超過該值則會觸發異常。
public byte readByte() {
this.checkReadableBytes0(1); // 檢查readerIndex是否已越界
int i = this.readerIndex;
byte b = this._getByte(i);
this.readerIndex = i + 1; // 遞增readerIndex
return b;
}
private void checkReadableBytes0(int minimumReadableBytes) {
this.ensureAccessible();
if(this.readerIndex > this.writerIndex - minimumReadableBytes) {
throw new IndexOutOfBoundsException(String.format("readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s", new Object[]{Integer.valueOf(this.readerIndex), Integer.valueOf(minimumReadableBytes), Integer.valueOf(this.writerIndex), this}));
}
}
public ByteBuf writeByte(int value) {
this.ensureAccessible();
this.ensureWritable0(1); // 檢查writerIndex是否會越過capacity
this._setByte(this.writerIndex++, value);
return this;
}
private void ensureWritable0(int minWritableBytes) {
if(minWritableBytes > this.writableBytes()) {
if(minWritableBytes > this.maxCapacity - this.writerIndex) {
throw new IndexOutOfBoundsException(String.format("writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s", new Object[]{Integer.valueOf(this.writerIndex), Integer.valueOf(minWritableBytes), Integer.valueOf(this.maxCapacity), this}));
} else {
int newCapacity = this.alloc().calculateNewCapacity(this.writerIndex + minWritableBytes, this.maxCapacity);
this.capacity(newCapacity);
}
}
}
// get與set只對傳入的索引進行了檢查,而後對其位置進行get或set
public byte getByte(int index) {
this.checkIndex(index);
return this._getByte(index);
}
public ByteBuf setByte(int index, int value) {
this.checkIndex(index);
this._setByte(index, value);
return this;
}
複製代碼
ByteBuf一樣支持在堆內和堆外進行分配。在堆內分配也被稱爲支撐數組模式,它能在沒有使用池化的狀況下提供快速的分配和釋放。
ByteBuf heapBuf = Unpooled.copiedBuffer(bytes);
if (heapBuf.hasArray()) { // 判斷是否有一個支撐數組
byte[] array = heapBuf.array();
// 計算第一個字節的偏移量
int offset = heapBuf.arrayOffset() + heapBuf.readerIndex();
int length = heapBuf.readableBytes(); // 得到可讀字節
handleArray(array,offset,length); // 調用你的處理方法
}
複製代碼
另外一種模式爲堆外分配,Java NIO ByteBuffer類在JDK1.4時就已經容許JVM實現經過JNI調用來在堆外分配內存(調用malloc()函數在JVM堆外分配內存),這主要是爲了不額外的緩衝區複製操做。
ByteBuf directBuf = Unpooled.directBuffer(capacity);
if (!directBuf.hasArray()) {
int length = directBuf.readableBytes();
byte[] array = new byte[length];
// 將字節複製到數組中
directBuf.getBytes(directBuf.readerIndex(),array);
handleArray(array,0,length);
}
複製代碼
ByteBuf還支持第三種模式,它被稱爲複合緩衝區,爲多個ByteBuf提供了一個聚合視圖。在這個視圖中,你能夠根據須要添加或者刪除ByteBuf實例,ByteBuf的子類CompositeByteBuf實現了該模式。
一個適合使用複合緩衝區的場景是HTTP協議,經過HTTP協議傳輸的消息都會被分紅兩部分——頭部和主體,若是這兩部分由應用程序的不一樣模塊產生,將在消息發送時進行組裝,而且該應用程序還會爲多個消息複用相同的消息主體,這樣對於每一個消息都將會建立一個新的頭部,產生了不少沒必要要的內存操做。使用CompositeByteBuf是一個很好的選擇,它消除了這些額外的複製,以幫助你複用這些消息。
CompositeByteBuf messageBuf = Unpooled.compositeBuffer();
ByteBuf headerBuf = ....;
ByteBuf bodyBuf = ....;
messageBuf.addComponents(headerBuf,bodyBuf);
for (ByteBuf buf : messageBuf) {
System.out.println(buf.toString());
}
複製代碼
CompositeByteBuf透明的實現了zero-copy,zero-copy其實就是避免數據在兩個內存區域中來回的複製。從操做系統層面上來說,zero-copy指的是避免在內核態與用戶態之間的數據緩衝區複製(經過mmap避免),而Netty中的zero-copy更偏向於在用戶態中的數據操做的優化,就像使用CompositeByteBuf來複用多個ByteBuf以免額外的複製,也可使用wrap()方法來將一個字節數組包裝成ByteBuf,又或者使用ByteBuf的slice()方法把它分割爲多個共享同一內存區域的ByteBuf,這些都是爲了優化內存的使用率。
那麼如何建立ByteBuf呢?在上面的代碼中使用到了Unpooled,它是Netty提供的一個用於建立與分配ByteBuf的工具類,建議都使用這個工具類來建立你的緩衝區,不要本身去調用構造函數。常用的是wrappedBuffer()與copiedBuffer(),它們一個是用於將一個字節數組或ByteBuffer包裝爲一個ByteBuf,一個是根據傳入的字節數組與ByteBuffer/ByteBuf來複製出一個新的ByteBuf。
// 經過array.clone()來複制一個數組進行包裝
public static ByteBuf copiedBuffer(byte[] array) {
return array.length == 0?EMPTY_BUFFER:wrappedBuffer((byte[])array.clone());
}
// 默認是堆內分配
public static ByteBuf wrappedBuffer(byte[] array) {
return (ByteBuf)(array.length == 0?EMPTY_BUFFER:new UnpooledHeapByteBuf(ALLOC, array, array.length));
}
// 也提供了堆外分配的方法
private static final ByteBufAllocator ALLOC;
public static ByteBuf directBuffer(int initialCapacity) {
return ALLOC.directBuffer(initialCapacity);
}
複製代碼
相對底層的分配方法是使用ByteBufAllocator,Netty實現了PooledByteBufAllocator和UnpooledByteBufAllocator,前者使用了jemalloc(一種malloc()的實現)來分配內存,而且實現了對ByteBuf的池化以提升性能。後者分配的是未池化的ByteBuf,其分配方式與以前講的一致。
Channel channel = ...;
ByteBufAllocator allocator = channel.alloc();
ByteBuf buffer = allocator.directBuffer();
do something.......
複製代碼
爲了優化內存使用率,Netty提供了一套手動的方式來追蹤不活躍對象,像UnpooledHeapByteBuf這種分配在堆內的對象得益於JVM的GC管理,無需額外操心,而UnpooledDirectByteBuf是在堆外分配的,它的內部基於DirectByteBuffer,DirectByteBuffer會先向Bits類申請一個額度(Bits還擁有一個全局變量totalCapacity,記錄了全部DirectByteBuffer總大小),每次申請前都會查看是否已經超過-XX:MaxDirectMemorySize所設置的上限,若是超限就會嘗試調用Sytem.gc(),以試圖回收一部份內存,而後休眠100毫秒,若是內存仍是不足,則只能拋出OOM異常。堆外內存的回收雖然有了這麼一層保障,但爲了提升性能與使用率,主動回收也是頗有必要的。因爲Netty還實現了ByteBuf的池化,像PooledHeapByteBuf和PooledDirectByteBuf就必須依賴於手動的方式來進行回收(放回池中)。
Netty使用了引用計數器的方式來追蹤那些不活躍的對象。引用計數的接口爲ReferenceCounted,它的思想很簡單,只要ByteBuf對象的引用計數大於0,就保證該對象不會被釋放回收,能夠經過手動調用release()與retain()方法來操做該對象的引用計數值遞減或遞增。用戶也能夠經過自定義一個ReferenceCounted的實現類,以知足自定義的規則。
package io.netty.buffer;
public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {
// 因爲ByteBuf的實例對象會很是多,因此這裏沒有將refCnt包裝爲AtomicInteger
// 而是使用一個全局的AtomicIntegerFieldUpdater來負責操做refCnt
private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
// 每一個ByteBuf的初始引用值都爲1
private volatile int refCnt = 1;
public int refCnt() {
return this.refCnt;
}
protected final void setRefCnt(int refCnt) {
this.refCnt = refCnt;
}
public ByteBuf retain() {
return this.retain0(1);
}
// 引用計數值遞增increment,increment必須大於0
public ByteBuf retain(int increment) {
return this.retain0(ObjectUtil.checkPositive(increment, "increment"));
}
public static int checkPositive(int i, String name) {
if(i <= 0) {
throw new IllegalArgumentException(name + ": " + i + " (expected: > 0)");
} else {
return i;
}
}
// 使用CAS操做不斷嘗試更新值
private ByteBuf retain0(int increment) {
int refCnt;
int nextCnt;
do {
refCnt = this.refCnt;
nextCnt = refCnt + increment;
if(nextCnt <= increment) {
throw new IllegalReferenceCountException(refCnt, increment);
}
} while(!refCntUpdater.compareAndSet(this, refCnt, nextCnt));
return this;
}
public boolean release() {
return this.release0(1);
}
public boolean release(int decrement) {
return this.release0(ObjectUtil.checkPositive(decrement, "decrement"));
}
private boolean release0(int decrement) {
int refCnt;
do {
refCnt = this.refCnt;
if(refCnt < decrement) {
throw new IllegalReferenceCountException(refCnt, -decrement);
}
} while(!refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement));
if(refCnt == decrement) {
this.deallocate();
return true;
} else {
return false;
}
}
protected abstract void deallocate();
}
複製代碼
Netty中的Channel與Java NIO的概念同樣,都是對一個實體或鏈接的抽象,但Netty提供了一套更加通用的API。就以網絡套接字爲例,在Java中OIO與NIO是大相徑庭的兩套API,假設你以前使用的是OIO而又想更改成NIO實現,那麼幾乎須要重寫全部代碼。而在Netty中,只須要更改短短几行代碼(更改Channel與EventLoop的實現類,如把OioServerSocketChannel替換爲NioServerSocketChannel),就能夠完成OIO與NIO(或其餘)之間的轉換。
每一個Channel最終都會被分配一個ChannelPipeline和ChannelConfig,前者持有全部負責處理入站與出站數據以及事件的ChannelHandler,後者包含了該Channel的全部配置設置,而且支持熱更新,因爲不一樣的傳輸類型可能具備其特別的配置,因此該類可能會實現爲ChannelConfig的不一樣子類。
Channel是線程安全的(與以後要講的線程模型有關),所以你徹底能夠在多個線程中複用同一個Channel,就像以下代碼所示。
final Channel channel = ...
final ByteBuf buffer = Unpooled.copiedBuffer("Hello,World!", CharsetUtil.UTF_8).retain();
Runnable writer = new Runnable() {
@Override
public void run() {
channel.writeAndFlush(buffer.duplicate());
}
};
Executor executor = Executors.newCachedThreadPool();
executor.execute(writer);
executor.execute(writer);
.......
複製代碼
Netty除了支持常見的NIO與OIO,還內置了其餘的傳輸類型。
Nmae | Package | Description |
---|---|---|
NIO | io.netty.channel.socket.nio | 以Java NIO爲基礎實現 |
OIO | io.netty.channel.socket.oio | 以java.net爲基礎實現,使用阻塞I/O模型 |
Epoll | io.netty.channel.epoll | 由JNI驅動epoll()實現的更高性能的非阻塞I/O,它只能使用在Linux |
Local | io.netty.channel.local | 本地傳輸,在JVM內部經過管道進行通訊 |
Embedded | io.netty.channel.embedded | 容許在不須要真實網絡傳輸的環境下使用ChannelHandler,主要用於對ChannelHandler進行測試 |
NIO、OIO、Epoll咱們應該已經很熟悉了,下面主要說說Local與Embedded。
Local傳輸用於在同一個JVM中運行的客戶端和服務器程序之間的異步通訊,與服務器Channel相關聯的SocketAddress並無綁定真正的物理網絡地址,它會被存儲在註冊表中,並在Channel關閉時註銷。所以Local傳輸不會接受真正的網絡流量,也就是說它不能與其餘傳輸實現進行互操做。
Embedded傳輸主要用於對ChannelHandler進行單元測試,ChannelHandler是用於處理消息的邏輯組件,Netty經過將入站消息與出站消息都寫入到EmbeddedChannel中的方式(提供了write/readInbound()與write/readOutbound()來讀寫入站與出站消息)來實現對ChannelHandler的單元測試。
ChannelHandler充當了處理入站和出站數據的應用程序邏輯的容器,該類是基於事件驅動的,它會響應相關的事件而後去調用其關聯的回調函數,例如當一個新的鏈接被創建時,ChannelHandler的channelActive()方法將會被調用。
關於入站消息和出站消息的數據流向定義,若是以客戶端爲主視角來講的話,那麼從客戶端流向服務器的數據被稱爲出站,反之爲入站。
入站事件是可能被入站數據或者相關的狀態更改而觸發的事件,包括:鏈接已被激活、鏈接失活、讀取入站數據、用戶事件、發生異常等。
出站事件是將來將會觸發的某個動做的結果的事件,這些動做包括:打開或關閉遠程節點的鏈接、將數據寫(或沖刷)到套接字。
ChannelHandler的主要用途包括:
對入站與出站數據的業務邏輯處理
記錄日誌
將數據從一種格式轉換爲另外一種格式,實現編解碼器。以一次HTTP協議(或者其餘應用層協議)的流程爲例,數據在網絡傳輸時的單位爲字節,當客戶端發送請求到服務器時,服務器須要經過解碼器(處理入站消息)將字節解碼爲協議的消息內容,服務器在發送響應的時候(處理出站消息),還須要經過編碼器將消息內容編碼爲字節。
捕獲異常
提供Channel生命週期內的通知,如Channel活動時與非活動時
Netty中處處都充滿了異步與事件驅動,而回調函數正是用於響應事件以後的操做。因爲異步會直接返回一個結果,因此Netty提供了ChannelFuture(實現了java.util.concurrent.Future)來做爲異步調用返回的佔位符,真正的結果會在將來的某個時刻完成,到時候就能夠經過ChannelFuture對其進行訪問,每一個Netty的出站I/O操做都將會返回一個ChannelFuture。
Netty還提供了ChannelFutureListener接口來監聽ChannelFuture是否成功,並採起對應的操做。
Channel channel = ...
ChannelFuture future = channel.connect(new InetSocketAddress("192.168.0.1",6666));
// 註冊一個監聽器
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// do something....
} else {
// 輸出錯誤信息
Throwable cause = future.cause();
cause.printStackTrace();
// do something....
}
}
});
複製代碼
ChannelFutureListener接口中還提供了幾個簡單的默認實現,方便咱們使用。
package io.netty.channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.GenericFutureListener;
public interface ChannelFutureListener extends GenericFutureListener<ChannelFuture> {
// 在Future完成時關閉
ChannelFutureListener CLOSE = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
future.channel().close();
}
};
// 若是失敗則關閉
ChannelFutureListener CLOSE_ON_FAILURE = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
if(!future.isSuccess()) {
future.channel().close();
}
}
};
// 將異常信息傳遞給下一個ChannelHandler
ChannelFutureListener FIRE_EXCEPTION_ON_FAILURE = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
if(!future.isSuccess()) {
future.channel().pipeline().fireExceptionCaught(future.cause());
}
}
};
}
複製代碼
ChannelHandler接口定義了對它生命週期進行監聽的回調函數,在ChannelHandler被添加到ChannelPipeline或者被移除時都會調用這些函數。
package io.netty.channel;
public interface ChannelHandler {
void handlerAdded(ChannelHandlerContext var1) throws Exception;
void handlerRemoved(ChannelHandlerContext var1) throws Exception;
/** @deprecated */
@Deprecated
void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;
// 該註解代表這個ChannelHandler可被其餘線程複用
@Inherited
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface Sharable {
}
}
複製代碼
入站消息與出站消息由其對應的接口ChannelInboundHandler與ChannelOutboundHandler負責,這兩個接口定義了監聽Channel的生命週期的狀態改變事件的回調函數。
package io.netty.channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
public interface ChannelInboundHandler extends ChannelHandler {
// 當channel被註冊到EventLoop時被調用
void channelRegistered(ChannelHandlerContext var1) throws Exception;
// 當channel已經被建立,但還未註冊到EventLoop(或者從EventLoop中註銷)被調用
void channelUnregistered(ChannelHandlerContext var1) throws Exception;
// 當channel處於活動狀態(鏈接到遠程節點)被調用
void channelActive(ChannelHandlerContext var1) throws Exception;
// 當channel處於非活動狀態(沒有鏈接到遠程節點)被調用
void channelInactive(ChannelHandlerContext var1) throws Exception;
// 當從channel讀取數據時被調用
void channelRead(ChannelHandlerContext var1, Object var2) throws Exception;
// 當channel的上一個讀操做完成時被調用
void channelReadComplete(ChannelHandlerContext var1) throws Exception;
// 當ChannelInboundHandler.fireUserEventTriggered()方法被調用時被調用
void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception;
// 當channel的可寫狀態發生改變時被調用
void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception;
// 當處理過程當中發生異常時被調用
void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;
}
package io.netty.channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import java.net.SocketAddress;
public interface ChannelOutboundHandler extends ChannelHandler {
// 當請求將Channel綁定到一個地址時被調用
// ChannelPromise是ChannelFuture的一個子接口,定義瞭如setSuccess(),setFailure()等方法
void bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) throws Exception;
// 當請求將Channel鏈接到遠程節點時被調用
void connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) throws Exception;
// 當請求將Channel從遠程節點斷開時被調用
void disconnect(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
// 當請求關閉Channel時被調用
void close(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
// 當請求將Channel從它的EventLoop中註銷時被調用
void deregister(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
// 當請求從Channel讀取數據時被調用
void read(ChannelHandlerContext var1) throws Exception;
// 當請求經過Channel將數據寫到遠程節點時被調用
void write(ChannelHandlerContext var1, Object var2, ChannelPromise var3) throws Exception;
// 當請求經過Channel將緩衝中的數據沖刷到遠程節點時被調用
void flush(ChannelHandlerContext var1) throws Exception;
}
複製代碼
經過實現ChannelInboundHandler或者ChannelOutboundHandler就能夠完成用戶自定義的應用邏輯處理程序,不過Netty已經幫你實現了一些基本操做,用戶只須要繼承並擴展ChannelInboundHandlerAdapter或ChannelOutboundHandlerAdapter來做爲自定義實現的起始點。
ChannelInboundHandlerAdapter與ChannelOutboundHandlerAdapter都繼承於ChannelHandlerAdapter,該抽象類簡單實現了ChannelHandler接口。
public abstract class ChannelHandlerAdapter implements ChannelHandler {
boolean added;
public ChannelHandlerAdapter() {
}
// 該方法不容許將此ChannelHandler共享複用
protected void ensureNotSharable() {
if(this.isSharable()) {
throw new IllegalStateException("ChannelHandler " + this.getClass().getName() + " is not allowed to be shared");
}
}
// 使用反射判斷實現類有沒有@Sharable註解,以確認該類是否爲可共享複用的
public boolean isSharable() {
Class clazz = this.getClass();
Map cache = InternalThreadLocalMap.get().handlerSharableCache();
Boolean sharable = (Boolean)cache.get(clazz);
if(sharable == null) {
sharable = Boolean.valueOf(clazz.isAnnotationPresent(Sharable.class));
cache.put(clazz, sharable);
}
return sharable.booleanValue();
}
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
}
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
}
複製代碼
ChannelInboundHandlerAdapter與ChannelOutboundHandlerAdapter默認只是簡單地將請求傳遞給ChannelPipeline中的下一個ChannelHandler,源碼以下:
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
public ChannelInboundHandlerAdapter() {
}
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
}
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
}
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
public ChannelOutboundHandlerAdapter() {
}
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.bind(localAddress, promise);
}
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.connect(remoteAddress, localAddress, promise);
}
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.disconnect(promise);
}
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.close(promise);
}
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.deregister(promise);
}
public void read(ChannelHandlerContext ctx) throws Exception {
ctx.read();
}
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}
public void flush(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
複製代碼
對於處理入站消息,另一種選擇是繼承SimpleChannelInboundHandler,它是Netty的一個繼承於ChannelInboundHandlerAdapter的抽象類,並在其之上實現了自動釋放資源的功能。
咱們在瞭解ByteBuf時就已經知道了Netty使用了一套本身實現的引用計數算法來主動釋放資源,假設你的ChannelHandler繼承於ChannelInboundHandlerAdapter或ChannelOutboundHandlerAdapter,那麼你就有責任去管理你所分配的ByteBuf,通常來講,一個消息對象(ByteBuf)已經被消費(或丟棄)了,而且不會傳遞給ChannelHandler鏈中的下一個處理器(若是該消息到達了實際的傳輸層,那麼當它被寫入或Channel關閉時,都會被自動釋放),那麼你就須要去手動釋放它。經過一個簡單的工具類ReferenceCountUtil的release方法,就能夠作到這一點。
// 這個泛型爲消息對象的類型
public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter {
private final TypeParameterMatcher matcher;
private final boolean autoRelease;
protected SimpleChannelInboundHandler() {
this(true);
}
protected SimpleChannelInboundHandler(boolean autoRelease) {
this.matcher = TypeParameterMatcher.find(this, SimpleChannelInboundHandler.class, "I");
this.autoRelease = autoRelease;
}
protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType) {
this(inboundMessageType, true);
}
protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType, boolean autoRelease) {
this.matcher = TypeParameterMatcher.get(inboundMessageType);
this.autoRelease = autoRelease;
}
public boolean acceptInboundMessage(Object msg) throws Exception {
return this.matcher.match(msg);
}
// SimpleChannelInboundHandler只是替你作了ReferenceCountUtil.release()
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
if(this.acceptInboundMessage(msg)) {
this.channelRead0(ctx, msg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if(this.autoRelease && release) {
ReferenceCountUtil.release(msg);
}
}
}
// 這個方法纔是咱們須要實現的方法
protected abstract void channelRead0(ChannelHandlerContext var1, I var2) throws Exception;
}
// ReferenceCountUtil中的源碼,release方法對消息對象的類型進行判斷而後調用它的release()方法
public static boolean release(Object msg) {
return msg instanceof ReferenceCounted?((ReferenceCounted)msg).release():false;
}
複製代碼
爲了模塊化與解耦合,不可能由一個ChannelHandler來完成全部應用邏輯,因此Netty採用了攔截器鏈的設計。ChannelPipeline就是用來管理ChannelHandler實例鏈的容器,它的職責就是保證明例鏈的流動。
每個新建立的Channel都將會被分配一個新的ChannelPipeline,這種關聯關係是永久性的,一個Channel一輩子只能對應一個ChannelPipeline。
一個入站事件被觸發時,它會先從ChannelPipeline的最左端(頭部)開始一直傳播到ChannelPipeline的最右端(尾部),而出站事件正好與入站事件順序相反(從最右端一直傳播到最左端)。這個順序是定死的,Netty老是將ChannelPipeline的入站口做爲頭部,而將出站口做爲尾部。在事件傳播的過程當中,ChannelPipeline會判斷下一個ChannelHandler的類型是否和事件的運動方向相匹配,若是不匹配,就跳過該ChannelHandler並繼續檢查下一個(保證入站事件只會被ChannelInboundHandler處理),一個ChannelHandler也能夠同時實現ChannelInboundHandler與ChannelOutboundHandler,它在入站事件與出站事件中都會被調用。
在閱讀ChannelHandler的源碼時,發現不少方法須要一個ChannelHandlerContext類型的參數,該接口是ChannelPipeline與ChannelHandler之間相關聯的關鍵。ChannelHandlerContext能夠通知ChannelPipeline中的當前ChannelHandler的下一個ChannelHandler,還能夠動態地改變當前ChannelHandler在ChannelPipeline中的位置(經過調用ChannelPipeline中的各類方法來修改)。
ChannelHandlerContext負責了在同一個ChannelPipeline中的ChannelHandler與其餘ChannelHandler之間的交互,每一個ChannelHandlerContext都對應了一個ChannelHandler。在DefaultChannelPipeline的源碼中,已經表現的很明顯了。
public class DefaultChannelPipeline implements ChannelPipeline {
.........
// 頭部節點和尾部節點的引用變量
// ChannelHandlerContext在ChannelPipeline中是以鏈表的形式組織的
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
.........
// 添加一個ChannelHandler到鏈表尾部
public final ChannelPipeline addLast(String name, ChannelHandler handler) {
return this.addLast((EventExecutorGroup)null, name, handler);
}
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized(this) {
// 檢查ChannelHandler是否爲一個共享對象(@Sharable)
// 若是該ChannelHandler沒有@Sharable註解,而且是已被添加過的那麼就拋出異常
checkMultiplicity(handler);
// 返回一個DefaultChannelHandlerContext,注意該對象持有了傳入的ChannelHandler
newCtx = this.newContext(group, this.filterName(name, handler), handler);
this.addLast0(newCtx);
// 若是當前ChannelPipeline沒有被註冊,那麼就先加到未決鏈表中
if(!this.registered) {
newCtx.setAddPending();
this.callHandlerCallbackLater(newCtx, true);
return this;
}
// 不然就調用ChannelHandler中的handlerAdded()
EventExecutor executor = newCtx.executor();
if(!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
public void run() {
DefaultChannelPipeline.this.callHandlerAdded0(newCtx);
}
});
return this;
}
}
this.callHandlerAdded0(newCtx);
return this;
}
// 將新的ChannelHandlerContext插入到尾部與尾部以前的節點之間
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = this.tail.prev;
newCtx.prev = prev;
newCtx.next = this.tail;
prev.next = newCtx;
this.tail.prev = newCtx;
}
.....
}
複製代碼
ChannelHandlerContext還定義了許多與Channel和ChannelPipeline重合的方法(像read()、write()、connect()這些用於出站的方法或者如fireChannelXXXX()這樣用於入站的方法),不一樣之處在於調用Channel或者ChannelPipeline上的這些方法,它們將會從頭沿着整個ChannelHandler實例鏈進行傳播,而調用位於ChannelHandlerContext上的相同方法,則會從當前所關聯的ChannelHandler開始,且只會傳播給實例鏈中的下一個ChannelHandler。並且,事件之間的移動(從一個ChannelHandler到下一個ChannelHandler)也是經過ChannelHandlerContext中的方法調用完成的。
public class DefaultChannelPipeline implements ChannelPipeline {
public final ChannelPipeline fireChannelRead(Object msg) {
// 注意這裏將頭節點傳入了進去
AbstractChannelHandlerContext.invokeChannelRead(this.head, msg);
return this;
}
}
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if(executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
if(this.invokeHandler()) {
try {
((ChannelInboundHandler)this.handler()).channelRead(this, msg);
} catch (Throwable var3) {
this.notifyHandlerException(var3);
}
} else {
// 尋找下一個ChannelHandler
this.fireChannelRead(msg);
}
}
public ChannelHandlerContext fireChannelRead(Object msg) {
invokeChannelRead(this.findContextInbound(), msg);
return this;
}
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while(!ctx.inbound); // 直到找到一個ChannelInboundHandler
return ctx;
}
}
複製代碼
爲了最大限度地提供高性能和可維護性,Netty設計了一套強大又易用的線程模型。在一個網絡框架中,最重要的能力是可以快速高效地處理在鏈接的生命週期內發生的各類事件,與之相匹配的程序構造被稱爲事件循環,Netty定義了接口EventLoop來負責這項工做。
若是是常常用Java進行多線程開發的童鞋想必常常會使用到線程池,也就是Executor這套API。Netty就是從Executor(java.util.concurrent)之上擴展了本身的EventExecutorGroup(io.netty.util.concurrent),同時爲了與Channel的事件進行交互,還擴展了EventLoopGroup接口(io.netty.channel)。在io.netty.util.concurrent包下的EventExecutorXXX負責實現線程併發相關的工做,而在io.netty.channel包下的EventLoopXXX負責實現網絡編程相關的工做(處理Channel中的事件)。
在Netty的線程模型中,一個EventLoop將由一個永遠不會改變的Thread驅動,而一個Channel一輩子只會使用一個EventLoop(可是一個EventLoop可能會被指派用於服務多個Channel),在Channel中的全部I/O操做和事件都由EventLoop中的線程處理,也就是說一個Channel的一輩子之中都只會使用到一個線程。不過在Netty3,只有入站事件會被EventLoop處理,全部出站事件都會由調用線程處理,這種設計致使了ChannelHandler的線程安全問題。Netty4簡化了線程模型,經過在同一個線程處理全部事件,既解決了這個問題,還提供了一個更加簡單的架構。
package io.netty.channel;
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16, SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", 2147483647));
private final Queue<Runnable> tailTasks;
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
this(parent, threadFactory, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
}
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp) {
this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
}
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
this.tailTasks = this.newTaskQueue(maxPendingTasks);
}
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
this.tailTasks = this.newTaskQueue(maxPendingTasks);
}
// 返回它所在的EventLoopGroup
public EventLoopGroup parent() {
return (EventLoopGroup)super.parent();
}
public EventLoop next() {
return (EventLoop)super.next();
}
// 註冊Channel,這裏ChannelPromise和Channel關聯到了一塊兒
public ChannelFuture register(Channel channel) {
return this.register((ChannelPromise)(new DefaultChannelPromise(channel, this)));
}
public ChannelFuture register(ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
// 剩下這些函數都是用於調度任務
public final void executeAfterEventLoopIteration(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
if(this.isShutdown()) {
reject();
}
if(!this.tailTasks.offer(task)) {
this.reject(task);
}
if(this.wakesUpForTask(task)) {
this.wakeup(this.inEventLoop());
}
}
final boolean removeAfterEventLoopIterationTask(Runnable task) {
return this.tailTasks.remove(ObjectUtil.checkNotNull(task, "task"));
}
protected boolean wakesUpForTask(Runnable task) {
return !(task instanceof SingleThreadEventLoop.NonWakeupRunnable);
}
protected void afterRunningAllTasks() {
this.runAllTasksFrom(this.tailTasks);
}
protected boolean hasTasks() {
return super.hasTasks() || !this.tailTasks.isEmpty();
}
public int pendingTasks() {
return super.pendingTasks() + this.tailTasks.size();
}
interface NonWakeupRunnable extends Runnable {
}
}
複製代碼
爲了確保一個Channel的整個生命週期中的I/O事件會被一個EventLoop負責,Netty經過inEventLoop()方法來判斷當前執行的線程的身份,肯定它是不是分配給當前Channel以及它的EventLoop的那一個線程。若是當前(調用)線程正是EventLoop中的線程,那麼所提交的任務將會被直接執行,不然,EventLoop將調度該任務以便稍後執行,並將它放入內部的任務隊列(每一個EventLoop都有它本身的任務隊列,從SingleThreadEventLoop的源碼就能發現不少用於調度內部任務隊列的方法),在下次處理它的事件時,將會執行隊列中的那些任務。這種設計可讓任何線程與Channel直接交互,而無需在ChannelHandler中進行額外的同步。
從性能上來考慮,千萬不要將一個須要長時間來運行的任務放入到任務隊列中,它會影響到該隊列中的其餘任務的執行。解決方案是使用一個專門的EventExecutor來執行它(ChannelPipeline提供了帶有EventExecutorGroup參數的addXXX()方法,該方法能夠將傳入的ChannelHandler綁定到你傳入的EventExecutor之中),這樣它就會在另外一條線程中執行,與其餘任務隔離。
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
.....
public void execute(Runnable task) {
if(task == null) {
throw new NullPointerException("task");
} else {
boolean inEventLoop = this.inEventLoop();
if(inEventLoop) {
this.addTask(task);
} else {
this.startThread();
this.addTask(task);
if(this.isShutdown() && this.removeTask(task)) {
reject();
}
}
if(!this.addTaskWakesUp && this.wakesUpForTask(task)) {
this.wakeup(inEventLoop);
}
}
}
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
.....
}
複製代碼
EventLoopGroup負責管理和分配EventLoop(建立EventLoop和爲每一個新建立的Channel分配EventLoop),根據不一樣的傳輸類型,EventLoop的建立和分配方式也不一樣。例如,使用NIO傳輸類型,EventLoopGroup就會只使用較少的EventLoop(一個EventLoop服務於多個Channel),這是由於NIO基於I/O多路複用,一個線程能夠處理多個鏈接,而若是使用的是OIO,那麼新建立一個Channel(鏈接)就須要分配一個EventLoop(線程)。
在深刻了解地Netty的核心組件以後,發現它們的設計都很模塊化,若是想要實現你本身的應用程序,就須要將這些組件組裝到一塊兒。Netty經過Bootstrap類,以對一個Netty應用程序進行配置(組裝各個組件),並最終使它運行起來。對於客戶端程序和服務器程序所使用到的Bootstrap類是不一樣的,後者須要使用ServerBootstrap,這樣設計是由於,在如TCP這樣有鏈接的協議中,服務器程序每每須要一個以上的Channel,經過父Channel來接受來自客戶端的鏈接,而後建立子Channel用於它們之間的通訊,而像UDP這樣無鏈接的協議,它不須要每一個鏈接都建立子Channel,只須要一個Channel便可。
一個比較明顯的差別就是Bootstrap與ServerBootstrap的group()方法,後者提供了一個接收2個EventLoopGroup的版本。
// 該方法在Bootstrap的父類AbstractBootstrap中,泛型B爲它當前子類的類型(爲了鏈式調用)
public B group(EventLoopGroup group) {
if(group == null) {
throw new NullPointerException("group");
} else if(this.group != null) {
throw new IllegalStateException("group set already");
} else {
this.group = group;
return this;
}
}
// ServerBootstrap中的實現,它也支持只用一個EventLoopGroup
public ServerBootstrap group(EventLoopGroup group) {
return this.group(group, group);
}
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if(childGroup == null) {
throw new NullPointerException("childGroup");
} else if(this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
} else {
this.childGroup = childGroup;
return this;
}
}
複製代碼
Bootstrap其實沒有什麼能夠好說的,它就只是一個裝配工,將各個組件拼裝組合到一塊兒,而後進行一些配置,有關它的詳細API請參考Netty JavaDoc。下面咱們將經過一個經典的Echo客戶端與服務器的例子,來梳理一遍建立Netty應用的流程。
首先實現的是服務器,咱們先實現一個EchoServerInboundHandler,處理入站消息。
public class EchoServerInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
System.out.printf("Server received: %s \n", in.toString(CharsetUtil.UTF_8));
// 因爲讀事件不是一次性就能把完整消息發送過來的,這裏並無調用writeAndFlush
ctx.write(in); // 直接把消息寫回給客戶端(會被出站消息處理器處理,不過咱們的應用沒有實現任何出站消息處理器)
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 等讀事件已經完成時,沖刷以前寫數據的緩衝區
// 而後添加了一個監聽器,它會在Future完成時進行關閉該Channel.
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}
// 處理異常,輸出異常信息,而後關閉Channel
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
複製代碼
服務器的應用邏輯只有這麼多,剩下就是用ServerBootstrap進行配置了。
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public void start() throws Exception {
final EchoServerInboundHandler serverHandler = new EchoServerInboundHandler();
EventLoopGroup group = new NioEventLoopGroup(); // 傳輸類型使用NIO
try {
ServerBootstrap b = new ServerBootstrap();
b.group(group) // 配置EventLoopGroup
.channel(NioServerSocketChannel.class) // 配置Channel的類型
.localAddress(new InetSocketAddress(port)) // 配置端口號
.childHandler(new ChannelInitializer<SocketChannel>() {
// 實現一個ChannelInitializer,它能夠方便地添加多個ChannelHandler
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(serverHandler);
}
});
// i綁定地址,同步等待它完成
ChannelFuture f = b.bind().sync();
// 關閉這個Future
f.channel().closeFuture().sync();
} finally {
// 關閉應用程序,通常來講Netty應用只須要調用這個方法就夠了
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err.printf(
"Usage: %s <port> \n",
EchoServer.class.getSimpleName()
);
return;
}
int port = Integer.parseInt(args[0]);
new EchoServer(port).start();
}
}
複製代碼
接下來實現客戶端,一樣須要先實現一個入站消息處理器。
public class EchoClientInboundHandler extends SimpleChannelInboundHandler<ByteBuf> {
/** * 咱們在Channel鏈接到遠程節點直接發送一條消息給服務器 */
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Netty!", CharsetUtil.UTF_8));
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
// 輸出從服務器Echo的消息
System.out.printf("Client received: %s \n", byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
複製代碼
而後配置客戶端。
public class EchoClient {
private final String host;
private final int port;
public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port)) // 服務器的地址
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new EchoClientInboundHandler());
}
});
ChannelFuture f = b.connect().sync(); // 鏈接到服務器
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.printf("Usage: %s <host> <port> \n", EchoClient.class.getSimpleName());
return;
}
String host = args[0];
int port = Integer.parseInt(args[1]);
new EchoClient(host, port).start();
}
}
複製代碼
實現一個Netty應用程序就是如此簡單,用戶大多數都是在編寫各類應用邏輯的ChannelHandler(或者使用Netty內置的各類實用ChannelHandler),而後只須要將它們所有添加到ChannelPipeline便可。