Netty學習與實戰(一)

準備

首先了解一下 Socket

網絡上的兩個程序經過一個雙向的通訊鏈接實現數據的交換,這個鏈接的一端稱爲一個socket。
創建網絡通訊鏈接至少要一對端口號(socket)。socket本質是編程接口(API),對TCP/IP的封裝,TCP/IP也要提供可供程序員作網絡開發所用的接口,這就是Socket編程接口;HTTP是轎車,提供了封裝或者顯示數據的具體形式;Socket是發動機,提供了網絡通訊的能力。
Socket的英文原義是「孔」或「插座」。做爲BSD UNIX的進程通訊機制,取後一種意思。一般也稱做套接字,用於描述IP地址和端口,是一個通訊鏈的句柄,能夠用來實現不一樣虛擬機或不一樣計算機之間的通訊。在Internet上的主機通常運行了多個服務軟件,同時提供幾種服務。每種服務都打開一個Socket,並綁定到一個端口上,不一樣的端口對應於不一樣的服務

而後回顧一下HTTP服務流程

  1. 服務端建立ServerSocket,監聽一個端口
  2. 客戶端請求服務端
  3. 服務端獲取一個請求的Socket對象
  4. 開啓新線程
  5. 讀取socket 字節流
  6. 解碼協議(HTTP協議),獲得http請求對象
  7. 處理業務,將結果封裝成一個HttpResponse對象
  8. 編碼協議(HTTP協議),將結果序列化字節流寫進socket
  9. 將字節流返回給客戶端
  10. 結束

示例代碼html

//Server 端首先建立了一個serverSocket來監聽 8000 端口,而後建立一個線程,線程裏面不斷調用阻塞方法 serversocket.accept();獲取新的鏈接,見(1),當獲取到新的鏈接以後,給每條鏈接建立一個新的線程,這個線程負責從該鏈接中讀取數據,見(2),而後讀取數據是以字節流的方式,見(3)。

public class IOServer {
    public static void main(String[] args) throws Exception {

        ServerSocket serverSocket = new ServerSocket(8000);

        // (1) 接收新鏈接線程
        new Thread(() -> {
            while (true) {
                try {
                    // (1) 阻塞方法獲取新的鏈接
                    Socket socket = serverSocket.accept();

                    // (2) 每個新的鏈接都建立一個線程,負責讀取數據
                    new Thread(() -> {
                        try {
                            int len;
                            byte[] data = new byte[1024];
                            InputStream inputStream = socket.getInputStream();
                            // (3) 按字節流方式讀取數據
                            while ((len = inputStream.read(data)) != -1) {
                                System.out.println(new String(data, 0, len));
                            }
                        } catch (IOException e) {
                        }
                    }).start();

                } catch (IOException e) {
                }

            }
        }).start();
    }
}
//鏈接上服務端 8000 端口以後,每隔 2 秒,咱們向服務端寫一個帶有時間戳的 "hello world"
public class IOClient {

    public static void main(String[] args) {
        new Thread(() -> {
            try {
                Socket socket = new Socket("127.0.0.1", 8000);
                while (true) {
                    try {
                        socket.getOutputStream().write((new Date() + ": hello world").getBytes());
                        Thread.sleep(2000);
                    } catch (Exception e) {
                    }
                }
            } catch (IOException e) {
            }
        }).start();
    }
}

IO NIO AIO BIO

同步&異步,阻塞&非阻塞

  • 同步與異步(synchronous/asynchronous):同步是一種可靠的有序運行機制,當咱們進行同步操做時,後續的任務是等待當前調用返回,纔會進行下一步;而異步則相反,其餘任務不須要等待當前調用返回,一般依靠事件、回調等機制來實現任務間次序關係
  • 阻塞與非阻塞:在進行阻塞操做時,當前線程會處於阻塞狀態,沒法從事其餘任務,只有當條件就緒才能繼續,好比ServerSocket新鏈接創建完畢,或者數據讀取、寫入操做完成;而非阻塞則是無論IO操做是否結束,直接返回,相應操做在後臺繼續處理
java.io包的好處是代碼比較簡單、直觀,缺點則是 IO 效率和擴展性存在侷限性,容易成爲應用性能的瓶頸。

不少時候,人們也把 java.net下面提供的部分網絡 API,好比 Socket、ServerSocket、HttpURLConnection 也歸類到同步阻塞 IO 類庫,由於網絡通訊一樣是 IO 行爲。java

在 Java 1.4 中引入了 NIO 框架(java.nio 包),提供了 Channel、Selector、Buffer 等新的抽象,能夠構建多路複用的、同步非阻塞 IO 程序,同時提供了更接近操做系統底層的高性能數據操做方式。react

在 Java 7 中,NIO 有了進一步的改進,也就是 NIO 2,引入了異步非阻塞 IO 方式,也有不少人叫它 AIO(Asynchronous IO)。異步 IO 操做基於事件和回調機制,能夠簡單理解爲,應用操做直接返回,而不會阻塞在那裏,當後臺處理完成,操做系統會通知相應線程進行後續工做git

IO(同步、阻塞)

IO流即input和output流,是同步 阻塞
javaIO.png程序員

NIO(同步、非阻塞)

NIO之因此是同步,是由於它的accept/read/write方法的內核I/O操做都會阻塞當前線程
NIO三個組成部分,Channel(通道)、Buffer(緩衝區)、Selector(選擇器)算法

  • Channel:Channel是一個對象,能夠經過它讀取和寫入數據。能夠把它看作是IO中的流,不一樣的是:編程

    Channel是雙向的,既能夠讀又能夠寫,而流是單向的
      Channel能夠進行異步的讀寫
      對Channel的讀寫必須經過buffer對象

    全部數據都經過Buffer對象處理,因此永遠不會將字節直接寫入到Channel中
    在Java NIO中的Channel主要有以下幾種類型:segmentfault

    FileChannel:從文件讀取數據的
      DatagramChannel:讀寫UDP網絡協議數據
      SocketChannel:讀寫TCP網絡協議數據
      ServerSocketChannel:能夠監聽TCP鏈接
  • Buffer:Buffer是一個對象,它包含一些要寫入或者讀到Stream對象的。應用程序不能直接對 Channel 進行讀寫操做,而必須經過 Buffer 來進行,即 Channel 是經過 Buffer 來讀寫數據的
    使用 Buffer 讀寫數據通常遵循如下四個步驟:數組

    1.寫入數據到 Buffer;
      2.調用 flip() 方法;
      3.從 Buffer 中讀取數據;
      4.調用 clear() 方法或者 compact() 方法。
//CopyFile執行三個基本的操做:建立一個Buffer,而後從源文件讀取數據到緩衝區,而後再將緩衝區寫入目標文件 
public static void copyFileUseNIO(String src,String dst) throws IOException{
//聲明源文件和目標文件
        FileInputStream fi=new FileInputStream(new File(src));
        FileOutputStream fo=new FileOutputStream(new File(dst));
        //得到傳輸通道channel
        FileChannel inChannel=fi.getChannel();
        FileChannel outChannel=fo.getChannel();
        //得到容器buffer
        ByteBuffer buffer=ByteBuffer.allocate(1024);
        while(true){
            //判斷是否讀完文件
            int eof =inChannel.read(buffer);
            if(eof==-1){
                break;  
            }
            //重設一下buffer的position=0,limit=position
            buffer.flip();
            //開始寫
            outChannel.write(buffer);
            //寫完要重置buffer,重設position=0,limit=capacity
            buffer.clear();
        }
        inChannel.close();
        outChannel.close();
        fi.close();
        fo.close();
}
  • Selector:Selector是一個對象,它能夠註冊到不少個Channel上,監聽各個Channel上發生的事件,而且可以根據事件狀況決定Channel讀寫。這樣,經過一個線程管理多個Channel,就能夠處理大量網絡鏈接了。
    一條鏈接來了以後,如今不建立一個 while 死循環去監聽是否有數據可讀了,而是直接把這條鏈接註冊到 selector 上,而後,經過檢查這個 selector,就能夠批量監測出有數據可讀的鏈接,進而讀取數據
    線程之間的切換對操做系統來講代價是很高的,而且每一個線程也會佔用必定的系統資源。因此,對系統來講使用的線程越少越好。

NIO多路複用

主要步驟和元素:緩存

  • 首先,經過 Selector.open() 建立一個 Selector,做爲相似調度員的角色。
  • 而後,建立一個 ServerSocketChannel,而且向 Selector 註冊,經過指定 SelectionKey.OP_ACCEPT,告訴調度員,它關注的是新的鏈接請求。
  • 注意,爲何咱們要明確配置非阻塞模式呢?這是由於阻塞模式下,註冊操做是不容許的,會拋出 IllegalBlockingModeException 異常。
  • Selector 阻塞在 select 操做,當有 Channel 發生接入請求,就會被喚醒。
  • 在 具體的 方法中,經過 SocketChannel 和 Buffer 進行數據操做

IO 都是同步阻塞模式,因此須要多線程以實現多任務處理。而 NIO 則是利用了單線程輪詢事件的機制,經過高效地定位就緒的 Channel,來決定作什麼,僅僅 select 階段是阻塞的,能夠有效避免大量客戶端鏈接時,頻繁線程切換帶來的問題,應用的擴展能力有了很是大的提升

NIO2(異步、非阻塞)

AIO是異步IO的縮寫

對於NIO來講,咱們的業務線程是在IO操做準備好時,獲得通知,接着就由這個線程自行進行IO操做,IO操做自己是同步的
可是對AIO來講,則更加進了一步,它不是在IO準備好時再通知線程,而是在IO操做已經完成後,再給線程發出通知。所以AIO是不會阻塞的,此時咱們的業務邏輯將變成一個回調函數,等待IO操做完成後,由系統自動觸發
與NIO不一樣,當進行讀寫操做時,只須直接調用API的read或write方法便可。這兩種方法均爲異步的,對於讀操做而言,當有流可讀取時,操做系統會將可讀的流傳入read方法的緩衝區,並通知應用程序;對於寫操做而言,當操做系統將write方法傳遞的流寫入完畢時,操做系統主動通知應用程序。 便可以理解爲,read/write方法都是異步的,完成後會主動調用回調函數。 在JDK1.7中,這部份內容被稱做NIO.2,主要在Java.nio.channels包下增長了下面四個異步通道:

  • AsynchronousSocketChannel
  • AsynchronousServerSocketChannel
  • AsynchronousFileChannel
  • AsynchronousDatagramChannel

在AIO socket編程中,服務端通道是AsynchronousServerSocketChannel,這個類提供了一個open()靜態工廠,一個bind()方法用於綁定服務端IP地址(還有端口號),另外還提供了accept()用於接收用戶鏈接請求。在客戶端使用的通道是AsynchronousSocketChannel,這個通道處理提供open靜態工廠方法外,還提供了read和write方法。
在AIO編程中,發出一個事件(accept read write等)以後要指定事件處理類(回調函數),AIO中的事件處理類是CompletionHandler<V,A>,這個接口定義了以下兩個方法,分別在異步操做成功和失敗時被回調。
void completed(V result, A attachment);
void failed(Throwable exc, A attachment);

單工 半雙工 雙工通訊

單工通訊(simplex)

只支持信號在一個方向上傳輸
適用:數據收集系統,如氣象數據收集 話費收集的集中計算等

半雙工通訊(half-duplex)

容許信號在兩個方向上傳輸,但某一時刻只容許在一個信道上單向傳輸
適用:問訊 檢索 科學計算等數據通訊系統 如對講機

全雙工通訊(dull-duplex)

容許數據同時在兩個方向上傳輸,既有兩個信道
適用:如計算機 手機 電話通訊

協議&消息

通訊中,協議是指雙方實體完成通訊或服務所必須遵循的規則和約定,是雙方對傳輸/接收數據流的編解碼的實現算法。
數據在網絡上是以字節流(二進制流)的形式傳輸的,而字節的定義在全部計算機上都是8bit。因此面向協議編程是與語言無關的。
三要素:

  1. 語法:約定通訊的數據格式,編碼,信號等級
  2. 語義:在語法的基礎上傳遞的數據內容
  3. 定時規則:明確通訊內容的時序

消息:協議實例化後即是消息。

消息分類:

  • 一類消息:服務端與客戶端之間通訊的全部消息長度都是必定範圍的。
  • 二類消息:絕大部分消息長度都未超過某閾值,但偶爾有幾個消息長度超過,但不能夠超過太多。
  • 三類消息:消息太長而沒法完整的進行內存存儲

通常消息有兩部分組成

  1. 消息頭:固定一個byte長度,記錄消息體長度
  2. 消息體:根據消息頭中的數值決定長度,記錄消息內容

消息長度=消息頭長度+消息體長度

一個簡單的協議算法:

  1. 標誌當前buffer的position位置
  2. 獲取本次消息的消息體長度,position遞增1位
  3. 判斷當前讀取的消息長度是否知足消息體長度
  4. 出現半包,數據不完整,重置標誌位,並返回null終止本次解碼
  5. buffer中包含完整的消息體內容,則進行讀取,position=position+增長消息體長度
  6. 更新標誌位
  7. 將已讀數據轉換爲字符串並返回,解碼成功
public class StringProtocol implements Protocol<String> {
    public String decode(ByteBuffer buffer, AioSession<String> session) {
        buffer.mark(); // 1
        byte length = buffer.get(); // 2
        if (buffer.remaining() < length) { // 3
            buffer.reset(); // 4
            return null;
        }
        byte[] body = new byte[length];
        buffer.get(body); // 5 
        buffer.mark(); // 6
        return new String(body); // 7
    }
}

絕大多數協議或協議中的某字段約定的解析規則有兩種:定長解析,特殊結束符解析

Reactor

Reactor模式也叫反應器模式,大多數IO相關組件如Netty、Redis在使用的IO模式,是高性能網絡編程的必知必會模式

簡介

  • 最原始的網絡編程思路是,服務器用一個while循環,不斷監聽端口是否有新的套接字鏈接,若是有,那麼就調用一個處理函數處理。僞代碼:
while(true){
socket = accept();
handle(socket)
}

這種同步阻塞方式沒法併發,效率過低

  • 以後,想到了使用多線程,也就是很經典的connection per thread,每個鏈接用一個線程處理。tomcat服務器的早期版本確實是這樣實現的。
  • 這種同步非阻塞方式極大地提升了服務器的吞吐量,但資源要求過高,系統中建立線程是須要比較高的系統資源的,若是鏈接數過高,系統沒法承受,並且,線程的反覆建立-銷燬也須要代價
  • 改進,即是Reactor模式,採用基於事件驅動的設計,當有事件觸發時,纔會調用處理器進行數據處理,對線程的數量進行控制,一個線程處理大量的事件
實際上的Reactor模式,是基於Java NIO的,在他的基礎上,抽象出來兩個組件——Reactor和Handler兩個組件:

(1)Reactor:負責響應IO事件,當檢測到一個新的事件,將其發送給相應的Handler去處理;新的事件包含鏈接創建就緒、讀就緒、寫就緒等。

(2)Handler:將自身(handler)與事件綁定,負責事件的處理,完成channel的讀入,完成處理業務邏輯後,負責將結果寫出channel

  • 網絡鏈接:兩個流,輸入流,輸出流
  • channel: 一個鏈接就是一個channel,全部操做起始於channel
  • selector:IO時間查詢器,是一個獨立線程,將channel註冊到selector中後,selector就會不斷select這些channel的IO事件(可讀,可寫,鏈接完成等)
  • buffer:channel讀寫操做的內存,利用byte[]做爲緩存區,一些屬性:

    屬性 描述
    capacity 容量,便可以容納的最大數據量;在緩衝區建立時被設定而且不能改變
    limit 上界,緩衝區中當前數據量
    position 位置,下一個要被讀或寫的元素的索引
    mark(位置標記) 調用mark(pos)來設置mark=pos,再調用reset()可讓position恢復到標記的位置即position=mark

Reactor單線程模型

reactor單線程.png
缺點:當其中某個 handler 阻塞時, 會致使其餘全部的 client 的 handler 都得不到執行, 而且更嚴重的是, handler 的阻塞也會致使整個服務不能接收新的 client 請求(由於 acceptor 也被阻塞了)。 由於有這麼多的缺陷, 所以單線程Reactor 模型用的比較少。這種單線程模型不能充分利用多核資源,因此實際使用的很少,
僅僅適用於handler 中業務處理組件能快速完成的場景。

Reactor多線程模型

將Handler處理器的執行放入線程池,多線程進行業務處理

Reactor多線程模型.png

Reactor主從模型

對於多個CPU的機器,爲充分利用系統資源,將Reactor拆分爲兩部分
Reactor主從模型.png

小結

Netty能夠基於如上三種模型進行靈活的配置

Reactor編程的優勢

1)響應快,沒必要爲單個同步時間所阻塞,雖然Reactor自己依然是同步的;
2)編程相對簡單,能夠最大程度的避免複雜的多線程及同步問題,而且避免了多線程/進程的切換開銷;
3)可擴展性,能夠方便的經過增長Reactor實例個數來充分利用CPU資源;
4)可複用性,reactor框架自己與具體事件處理邏輯無關,具備很高的複用性;

和缺點

1)相比傳統的簡單模型,Reactor增長了必定的複雜性,於是有必定的門檻,而且不易於調試。
2)Reactor模式須要底層的Synchronous Event Demultiplexer支持,好比Java中的Selector支持,操做系統的select系統調用支持,若是要本身實現Synchronous Event Demultiplexer可能不會有那麼高效。
3) Reactor模式在IO讀寫數據時仍是在同一個線程中實現的,即便使用多個Reactor機制的狀況下,那些共享一個Reactor的Channel若是出現一個長時間的數據讀寫,會影響這個Reactor中其餘Channel的相應時間,好比在大文件傳輸時,IO操做就會影響其餘Client的相應時間,於是對這種操做,使用傳統的Thread-Per-Connection或許是一個更好的選擇,或則此時使用改進版的Reactor模式如Proactor模式。

拆包、粘包、半包

Netty位於應用層,而網絡數據的處理是操做系統底層按照字節流來讀寫的,而後傳到應用層從新拼裝bytebuf,這就有可能形成讀寫不對等。
在Netty中,已經造好了許多類型的拆包器,能夠直接使用:
1175191256-5bf8f9753ec13_articlex.png

零拷貝

傳統意義上發送數據:

  1. 數據從磁盤讀取到內核的read buffer
  2. 數據從內核緩衝區拷貝到用戶緩衝區
  3. 數據從用戶緩衝區拷貝到內核的socket buffer
  4. 數據從內核的socket buffer拷貝到網卡接口(硬件)的緩衝區

經過java的FileChannel.transferTo方法,能夠避免上面兩次多餘的拷貝(固然這須要底層操做系統支持)

  1. 調用transferTo,數據從文件由DMA引擎拷貝到內核read buffer
  2. 接着DMA從內核read buffer將數據拷貝到網卡接口buffer

Netty中的零拷貝

  • bytebuffer

Netty發送和接收消息主要使用bytebuffer,bytebuffer使用對外內存(DirectMemory)直接進行Socket讀寫。
緣由:若是使用傳統的堆內存進行Socket讀寫,JVM會將堆內存buffer拷貝一份到直接內存中而後再寫入socket,多了一次緩衝區的內存拷貝。DirectMemory中能夠直接經過DMA發送到網卡接口

  • Composite Buffers

傳統的ByteBuffer,若是須要將兩個ByteBuffer中的數據組合到一塊兒,咱們須要首先建立一個size=size1+size2大小的新的數組,而後將兩個數組中的數據拷貝到新的數組中。可是使用Netty提供的組合ByteBuf,就能夠避免這樣的操做,由於CompositeByteBuf並無真正將多個Buffer組合起來,而是保存了它們的引用,從而避免了數據的拷貝,實現了零拷貝。

  • 對於FileChannel.transferTo的使用

Netty中使用了FileChannel的transferTo方法,該方法依賴於操做系統實現零拷貝


借鑑、引用&感謝:
完全理解Netty,這一篇文章就夠了
smart-socket
Reactor模式

相關文章
相關標籤/搜索