插曲:Kafka源碼預熱篇--- Java NIO

前言

上一篇的前言我都忘了隨便說兩句了hhhjava

由於Kafka的源碼閱讀是須要對Java NIO知識有必定的瞭解的,因此怎麼說,若是以爲本身對於Java這塊算是比較熟悉,一樣做爲插曲篇的這篇是能夠直接忽略。由於這篇也不會涉及什麼重難點,主要仍是過過基礎,讓後面的源碼篇讀起來更加通暢。api

1、NIO基礎

Java New IO是從Java1.4版本開始引入的一個新的IO api,能夠替代以往的標準IO,NIO相比原來的IO有一樣的做用和目的,可是使用的方式徹底不同,NIO是面向緩衝區的,基於通道的IO操做,這也讓它比傳統IO有着更爲高效的讀寫。數組

1.1 IO和NIO的主要區別

IO NIO
面向流 面向緩衝區
阻塞IO 非阻塞IO
選擇器

1.1.1 傳統IO的流

如下用圖來簡單理解一下,在傳統IO中當App要對網絡,磁盤中的文件進行讀寫的時候,它們必須創建一個鏈接,流究竟是一個什麼樣的概念呢,咱們能夠先把它想象成自來水,家裏要用自來水,須要有水管,讓水從水管過來到家裏,起到一個運輸的做用服務器

因此當咱們文件中的數據須要輸入到App裏面時,它們就會創建一個輸入的管道。而當咱們的App有數據須要寫入到文件系統的時候,就會創建一個輸出的管道,這兩條管道就是咱們的輸入流和輸出流。那水歷來沒有逆流而上的呀,因此它們都是單向管道。這麼一講,是否是就很好懂了呢😁?網絡

1.1.2 NIO

也是一樣的文件系統和App,不過此時把流換成了一個channel,如今咱們能夠先認爲它就是一條鐵道,那咱們知道鐵道自己是不能傳遞貨物的呀,因此咱們須要一個載具---火車(也就是緩衝區),App須要的數據就由這個名叫緩衝區的載具運輸過來。那火車是能夠開過來,也能夠開回去的,因此NIO是雙向傳輸的。app

1.2 Buffer

NIO的核心在於,通道(channel)和緩衝區(buffer)兩個。通道是打開到IO設備的鏈接。使用時須要獲取用於鏈接IO設備的通道以及用於容納數據的緩衝區,而後經過操做緩衝區對數據進行處理。(其實就是上面那張圖的事兒,或者一句話就是一個負責傳輸,一個負責存儲)。dom

緩衝區是Java.nio包定義好的,全部緩衝區都是Buffer抽象類的子類。Buffer根據數據類型不一樣,經常使用子類分別是基本數據類型除了Boolean外的xxxBuffer(IntBuffer,DoubleBuffer···等)。不一樣的Buffer類它們的管理方式都是相同的,獲取對象的方法都是性能

// 建立一個容量爲capacity的xxx類型的Buffer對象
static xxxBuffer allocate(int capacity)
複製代碼

並且緩衝區提供了兩個核心方法:get()和put(),put方法是將數據存入到緩衝區,而get是獲取緩衝區的數據。測試

此時咱們用代碼看一下大數據

public class BufferTest {
    @Test
    public void testBuffer(){
        // 建立緩衝區對象
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    }
}
複製代碼

點進去ByteBuffer,會看到這個東西是繼承了Buffer類的

public abstract class ByteBuffer extends Buffer implements Comparable<ByteBuffer>
複製代碼

此時繼續點進去Buffer類,第一眼看到的是有幾個自帶的屬性

1.2.1 buffer的基本屬性

① capacity容量

表示Buffer的最大數據容量,這個值不能爲負。並且建立後是不能更改的。

② limit限制

第一個不能讀取或寫入的數據的索引,位於此索引後的數據不可讀寫。這個數值不能爲負且不能超過capacity,如上圖中第三個緩衝區,在下標爲5以後的數據塊均不能讀寫,那limit爲5

③ position位置

下一個要讀取或寫入的數據的索引,這個數值不能爲負且不能超過capacity,如圖中第二個緩衝區,前面5塊寫完成,此時第6個數據塊的下標爲5,因此position爲5

④ mark標記/reset重置

mark是一個索引,經過Buffer的mark()方法指定Buffer中一個特定的position後,能夠經過reset()方法重置到這個position,這個經過代碼來解釋會比較好說明

1.2.2 code部分(很是簡單)

1.首先咱們建立一個緩衝區對象,而後把它的屬性打印出來

ByteBuffer byteBuffer = ByteBuffer.allocate(10);
System.out.println(byteBuffer.position());
System.out.println(byteBuffer.capacity());
System.out.println(byteBuffer.limit());

運行結果:0,10,10
複製代碼
2.執行一個put()方法,來把一個字符丟進去

String str = "abcde";
byteBuffer.put(str.getBytes());
System.out.println(byteBuffer.position());
System.out.println(byteBuffer.capacity());
System.out.println(byteBuffer.limit());

運行結果:5,10,10
"abcde"長度爲5,position已經變化,其它不變
複製代碼
3.使用flip()切換爲讀模式

byteBuffer.flip();
System.out.println(byteBuffer.position());
System.out.println(byteBuffer.capacity());
System.out.println(byteBuffer.limit());

運行結果:0,10,5
複製代碼

此時position變成爲0了,由於一開始的5,是由於這時候要寫的是下標爲5的數據塊,而轉換成讀模式後,第一個讀的明顯是下標爲0的數據塊呀。limit的數值也變成了5,由於當前能讀到的數據從下標爲5開始就木有了,因此limit爲5

4.簡單獲取一下buffer中的數據
byte[] array = new byte[byteBuffer.limit()];
byteBuffer.get(array);
System.out.println(new String(array,0,array.length));

運行結果:abcde
複製代碼
5.mark() & reset()
byte[] array = new byte[byteBuffer.limit()];
byteBuffer.get(array,0,2);
System.out.println(new String(array,0,2));
System.out.println(byteBuffer.position());

byteBuffer.mark();
byteBuffer.get(array,2,2);
System.out.println(new String(array,2,2));
System.out.println(byteBuffer.position());

byteBuffer.reset();
System.out.println(byteBuffer.position());

運行結果:ab,2,cd,4,2
複製代碼

其實很簡單,就是第一次讀取的時候,只是讀取了前面兩個字符,而後此時position的結果爲2,而後再讀取後兩個,position爲4,但是由於我在讀取前面2個的時候進行了一個mark操做,它就自動回到我mark以前的那個讀取位置而已,就是這麼簡單

6.其餘的一些方法

rewind()方法,可重複讀,clear()清空緩衝區,不過這個方法的清空緩衝區,是一種被遺忘的狀態,就是說,數據仍然還存於緩衝區中,但是自動忽略掉了。此時再次讀取數據,是仍是能夠get()到的。hasRemaining()方法就是表示剩餘可操做的數據量還有多少,好比剛剛的mark的那個例子中,我reset回去以後,剩餘的可操做數據就是3,由於我只讀了ab,還有cde這三個。

1.2.3 直接緩衝區和非直接緩衝區

非直接緩衝區:經過allocate()方法來分配緩衝區。將緩衝區創建在JVM的內存中。

直接緩衝區:經過allocateDirect()方法分配緩衝區,將緩衝區創建在物理內存中。效率更高。

① 非直接緩衝區

應用程序想要在磁盤中讀取數據時,首先它發起請求,讓物理磁盤先把它的數據讀到內核地址空間當中,以後這個內核空間再將這個數據copy一份到用戶地址空間去。而後數據才能經過read()方法將數據返回個應用程序。而應用程序須要寫數據進去,也是同理,先寫到用戶地址空間,而後copy到內核地址空間,再寫入磁盤。此時不難發現,這個copy的操做顯得十分的多餘,因此非直接緩衝區的效率相對來講會低一些。

② 直接緩衝區

直接緩衝區就真的顧名思義很是直接了,寫入的時候,寫到物理內存映射文件中,再由它寫入物理磁盤,讀取也是磁盤把數據讀到這個文件而後再由它讀取到應用程序中便可。沒有了copy的中間過程。

1.3 channel

1.3.1 扯一下概念背景

由java.nio.channels包定義,表示IO源與目標打開的連接,它自己不存在直接訪問數據的能力,只能和Buffer進行交互

傳統的IO由cpu來全權負責,此時這個設計在有大量文件讀取操做時,CPU的利用率會被拉的很是低,由於IO操做把CPU的資源都搶佔了。

在這種背景下進行了一些優化,把對cpu的鏈接取消,轉爲DMA(直接內存存取)的方式。固然DMA這個操做自己也是須要CPU進行調度的。不過這個損耗天然就會比大量的IO要小的多。

此時,就出現了通道這個概念,它是一個徹底獨立的處理器。專門用來負責文件的IO操做。

1.3.2 經常使用通道

Java爲Channel接口提供的主要實現類:

FileChannel:用於讀取,寫入,映射和操做文件的通道
DatagramChannel:經過UDP讀寫網絡中的數據通道
SocketChannel:經過TCP讀寫網絡中的數據通道
ServerSocketChannel:能夠監聽新進來的TCP鏈接,對每個新進來的鏈接
    都會建立一個SocketChannel
複製代碼

獲取channel的一種方式是對支持通道的對象調用getChannel()方法,支持類以下

FileInputStream
FileOutputStream
RandomAccessFile
DatagramSocket
Socket
ServerSocket
複製代碼

獲取的其餘方式是使用Files類的靜態方法newByteChannel()獲取字節通道。再或者是經過通道的靜態方法open()打開並返回指定通道。

1.3.3 經常使用方法和簡單使用

① 使用非直接緩衝區完成文件複製
// 建立輸入輸出流對象
FileInputStream fileInputStream = new FileInputStream("testPic.jpg");
FileOutputStream fileOutputStream = new FileOutputStream("testPic2.jpg");

// 經過流對象獲取通道channel
FileChannel inChannel = fileInputStream.getChannel();
FileChannel outChannel = fileOutputStream.getChannel();

// 建立指定大小的緩衝區
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

// 將通道中的數據寫入到緩衝區中
while (inChannel.read(byteBuffer) != -1){

    // 切換成讀取模式
    byteBuffer.flip();
    // 將緩衝區中的數據寫到輸出通道
    outChannel.write(byteBuffer);

    // 清空緩衝區
    byteBuffer.clear();

}
//回收資源(這裏爲了省時間直接拋出去了,反正這段不過重要)
outChannel.close();
inChannel.close();
fileInputStream.close();
fileOutputStream.close();

運行結果:就天然是複製了一個testPic2出來啦
複製代碼

由於代碼自己不難,註釋已經寫得比較詳細,就不展開了


② 使用直接緩衝區來完成文件的複製

注意這裏的StandardOpenOption是一個枚舉,表示模式,很顯然這裏是要選擇READ讀取模式。

FileChannel inChannel = FileChannel.open(Paths.get("testPic.jpg",StandardOpenOption.READ));
FileChannel outChannel = FileChannel.
        open(Paths.get("testPic2.jpg"),StandardOpenOption.WRITE,StandardOpenOption.READ,StandardOpenOption.CREATE);
// 進行內存映射
MappedByteBuffer inMappedBuffer = inChannel.map(FileChannel.MapMode.READ_ONLY, 0, inChannel.size());
MappedByteBuffer outMapBuffer = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size());

// 對緩衝區進行數據的讀寫操做
byte[] array = new byte[inMappedBuffer.limit()];
inMappedBuffer.get(array);
outMapBuffer.put(array);

// 回收資源
inChannel.close();
outChannel.close();
複製代碼

若是須要看一下它們兩個的時間差,本身用最常規的系統時間來瞧瞧就好,在這裏就再也不加上了。

2、NIO非阻塞式網絡通訊

傳統的IO流都是阻塞式的,當一個線程調用read或者write時,該線程被阻塞,直到數據被讀取或者寫入,該線程在此期間都是不能執行其餘任務的,所以,在完成網絡通訊進行IO操做時,線程被阻塞,因此服務器端必須爲每一個客戶端提供一個獨立線程進行處理,當服務器端須要處理大量客戶端時,性能將會急劇降低。

NIO是非阻塞的,當線程從某通道進行讀寫數據時,若沒有數據可用,該線程能夠進行其餘任務。線程一般將非阻塞IO的空閒時間用於在其餘通道上執行IO操做,因此單獨的線程能夠管理多個輸入和輸出通道。所以NIO可讓服務器端使用一個或有限幾個線程來同時處理鏈接到服務器端的全部客戶端。

2.1 Selector

這個選擇器其實就是在客戶端和服務端之間引入一個通道的註冊器,好比如今個人客戶端要像服務端傳輸數據了,客戶端會給選擇器去發送一個channel的註冊請求,註冊完成後,Selector就會去監控這個channel的IO狀態(讀寫,鏈接)。只有當通道中的數據徹底準備就緒,Selector纔會將數據分配到服務端的某個線程去處理。

這種非阻塞性的流程就能夠更好地去使用CPU的資源。提升CPU的工做效率。這個能夠用收快遞來講明。若是你一開始就告訴我半小時後過來取快遞,而我在這時候已經到目的地了,我有可能就原地不動站着等半個小時。這個期間啥地都去不了,但是你是到了以後,纔打電話告訴我過來取,那我就有了更多的自由時間。

2.2 code(阻塞性IO的網絡通訊)

如今咱們來演示一下阻塞性IO的網絡通訊

2.2.1 client(阻塞性IO)

這個代碼你們能夠嘗試這刪除sChannel.shutdownOutput(),此時會發如今啓動好server,運行client程序的時候,程序也會阻塞,這是由於這時服務端並沒有法肯定你是否已經發送完成數據了,因此client端也產生了阻塞,雙方就一直僵持。

還有一種方法是解阻塞,以後進行闡述。

// 1.獲取通道
SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("你的IP地址",9898));
// 2.建立文件通道
FileChannel inChannel = FileChannel.open(Paths.get("C:/Users/Administrator/Desktop/testPic.jpg"),StandardOpenOption.READ);
// 3.分配指定大小的緩衝區
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

// 4.發送數據,須要讀取文件
while (inChannel.read(byteBuffer) != -1){
    byteBuffer.flip();
    // 將buffer的數據寫入到通道中
    sChannel.write(byteBuffer);
    byteBuffer.clear();
}

// 主動告訴服務端,數據已經發送完畢
sChannel.shutdownOutput();

while (sChannel.read(byteBuffer) != -1){
        byteBuffer.flip();
        System.out.println("接收服務端數據成功···");
        byteBuffer.clear();
    }

// 5.關閉通道
inChannel.close();
sChannel.close();
複製代碼
2.2.2 server(阻塞性IO)
// 1.獲取通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
// 建立一個輸出通道,將讀取到的數據寫入到輸出通道中,保存爲testPic2
FileChannel outChannel = FileChannel.open(Paths.get("testPic2.jpg"),StandardOpenOption.WRITE,StandardOpenOption.CREATE);
// 2.綁定端口
ssChannel.bind(new InetSocketAddress(9898));
// 3.等待客戶端鏈接,鏈接成功時會獲得一個通道
SocketChannel sChannel = ssChannel.accept();
// 4.建立緩衝區
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 5.接收客戶端的數據存儲到本地
while (sChannel.read(byteBuffer) != -1){
    byteBuffer.flip();
    outChannel.write(byteBuffer);
    byteBuffer.clear();
}

// 發送反饋給客戶端
    // 向緩衝區中寫入應答信息
    byteBuffer.put("服務端接收數據成功".getBytes());
    byteBuffer.flip();
    sChannel.write(byteBuffer);

// 關閉通道
sChannel.close();
outChannel.close();
byteBuffer.clear();
複製代碼

而後再當咱們的客戶端運行起來,就會進行copy操做

2.3 Selector完成非阻塞IO

使用NIO完成網絡通訊須要三個核心對象:

channel:java.nio.channels.Channel接口,SocketChannel,ServerSocketChannel,DatagramChannel

管道相關:Pipe.SinkChannel,Pine.SourceChannel

buffer:負責存儲數據

Selector:其中Selector是SelectableChannel的多路複用器,主要是用於監控SelectableChannel的IO狀態

2.3.1 client(非阻塞)
// 1.獲取通道,默認是阻塞的
SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("192.168.80.1",9898));

// 1.1 將阻塞的套接字變成非阻塞
sChannel.configureBlocking(false);

// 2.建立指定大小的緩衝區
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 3.發送數據給服務端,直接將數據存儲到緩衝區
byteBuffer.put(new Date().toString().getBytes());
// 4.將緩衝區的數據寫入到sChannel
byteBuffer.flip();
sChannel.write(byteBuffer);
byteBuffer.clear();

// 關閉
sChannel.close();
複製代碼
2.3.2 server(非阻塞)

代碼的註釋中已經解釋了整個過程的作法,這裏就不一一展開了。

// 1.獲取通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
// 2.將阻塞的套接字設置爲非阻塞的
ssChannel.configureBlocking(false);
// 3.綁定端口號
ssChannel.bind(new InetSocketAddress(9898));

// 4.建立選擇器對象
Selector selector = Selector.open();

// 5.將通道註冊到選擇器上(這裏的第二個參數爲selectionKey),下面有解釋
// 此時選擇器就開始監聽這個通道的接收時間,此時接收工做準備就緒,纔開始下一步的操做
ssChannel.register(selector,SelectionKey.OP_ACCEPT);

// 6.經過輪詢的方式獲取選擇器上準備就緒的事件
// 若是大於0,至少有一個SelectionKey準備就緒
while (selector.select() > 0){
    // 7.獲取當前選擇器中全部註冊的selectionKey(已經準備就緒的監聽事件)
    Iterator<SelectionKey> selectionKeyIterator = selector.selectedKeys().iterator();
    // 迭代獲取已經準備就緒的選擇鍵
    while (selectionKeyIterator.hasNext()){

        // 8.獲取已經準備就緒的事件
        SelectionKey selectionKey = selectionKeyIterator.next();
        if (selectionKey.isAcceptable()){
            // 9.調用accept方法
            SocketChannel sChannel = ssChannel.accept();
            // 將sChannel設置爲非阻塞
            // 再次強調,整個過程不能有任何一條阻塞通道
            sChannel.configureBlocking(false);

            // 進行數據接收工做,並且把sChannel也註冊上選擇器讓選擇器來監聽
            sChannel.register(selector,SelectionKey.OP_READ);
        }else if (selectionKey.isReadable()){
            // 若是讀狀態已經準備就緒,就開始讀取數據
            // 10.獲取當前選擇器上讀狀態準備就緒的通道
            SocketChannel sChannel = (SocketChannel) selectionKey.channel();
            // 11.讀取客戶端發送的數據,須要先建立緩衝區
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

            // 12.讀取緩衝區的數據
            while (sChannel.read(byteBuffer) > 0){
                byteBuffer.flip();
                // 這裏sChannel.read(byteBuffer)就是這個字節數組的長度
                System.out.println(new String(byteBuffer.array(),0,sChannel.read(byteBuffer)));

                // 清空緩衝區
                byteBuffer.clear();
            }
        }
        // 當selectionKey使用完畢須要移除,不然會一直優先
        selectionKeyIterator.remove();
    }

}
複製代碼

當調用register方法將通道註冊到選擇器時,選擇器對通道的監聽事件須要經過第二個參數ops決定

讀:SelectionKey.OP_READ(1)
寫:SelectionKey.OP_WRITE(4)
鏈接:SelectionKey.OP_CONNECT(8)
接收:SelectionKey.OP_ACCEPT(16)
複製代碼

註冊時不只僅只有一個監聽事件,則須要用位或操做符鏈接

int selectionKeySet = SelectionKey.OP_READ|SelectionKey.OP_WRITE
複製代碼

而關於這個selectionKey,它表示着SelectableChannel和Selectr之間的註冊關係。它也有一系列對應的方法

2.3.3 客戶端的改造

引入Scanner接收輸入信息,不過請注意,在測試代碼中輸入IDEA須要進行一些設置,具體作法是在Help-Edit Custom VM Option中加入一行

-Deditable.java.test.console=true
複製代碼

這樣就能夠輸入了。

// 1.獲取通道,默認是阻塞的
SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("192.168.80.1",9898));

// 1.1 將阻塞的套接字變成非阻塞
sChannel.configureBlocking(false);

// 2.建立指定大小的緩衝區
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
    String str = scanner.next();
    // 3.發送數據給服務端,直接將數據存儲到緩衝區
    byteBuffer.put((new Date().toString()+str).getBytes());
    // 4.將緩衝區的數據寫入到sChannel
    byteBuffer.flip();
    sChannel.write(byteBuffer);
    byteBuffer.clear();
}
// 關閉
sChannel.close();
複製代碼

這樣就完成了一個問答模式的網絡通訊。

2.4 Pipe管道

Java NIO中的管道是兩個線程之間的單向數據鏈接,Pipe有一個source管道和一個sink管道,數據會被寫到sink,從source中獲取

// 1.獲取管道
Pipe pipe = Pipe.open();

// 2.建立緩衝區對象
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 3.獲取sink通道
Pipe.SinkChannel sinkChannel = pipe.sink();
byteBuffer.put("經過單向管道傳輸數據".getBytes());

// 4.將數據寫入sinkChannel
byteBuffer.flip();
sinkChannel.write(byteBuffer);
// 5.讀取緩衝區中的數據
Pipe.SourceChannel sourceChannel = pipe.source();
// 6.讀取sourceChannel中的數據放入到緩衝區
byteBuffer.flip();
sourceChannel.read(byteBuffer);
System.out.println(new String(byteBuffer.array(),0,sourceChannel.read(byteBuffer)));

sourceChannel.close();
sinkChannel.close();
    
運行結果就是打印了咱們的那串字符"經過單向管道傳輸數據",沒啥
複製代碼

finally

大體地把NIO的一些基礎知識給列舉了一下,內容看似不少其實並無涉及太難的知識點,都是循序漸進地執行而已。其實若是要深摳的話,仍是有不少其餘的知識點的,好比NIO2的Path,Paths和Files。這裏就再也不列舉說明了。感興趣的朋友能夠自行去了解一下。

相關文章
相關標籤/搜索