Socket-IO 系列(三)基於 NIO 的同步非阻塞式編程

Socket-IO 系列(三)基於 NIO 的同步非阻塞式編程

  • 緩衝區(Buffer) 用於存儲數據java

  • 通道(Channel) 用於傳輸數據編程

  • 多路複用器(Selector) 用於輪詢 Channel 狀態,四種狀態:Connect(鏈接),Accept(阻塞),Read(讀),Write(寫)數組

1、Buffer(緩衝區)

不一樣於面向流的 IO 中將數據直接寫入或讀取到 Stream 對象中,在 NIO 中,全部數據都是用緩衝區處理(讀寫)。緩衝區一般是一個字節數組(ByteBuffer),這個數組提供了數據的訪問讀寫操做屬性,如 position、limit、capacity、mark 等。安全

Buffer 類型:ByteBuffer、CharBuffer、ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。網絡

  • capacity: 容量,表示緩衝區中最大存儲數據的容量。一旦聲明不能改變。app

  • limit: 界限,表示緩衝區中能夠操做數據的大小。(limit後的數據不能讀)dom

  • position: 位置,表示緩衝區中正在操做數據的位置socket

  • mark: 標記,表示記錄當前 postion 的位置。能夠經過 reset 恢復到時 mark 的位置。工具

注:mark <= postion <= limit <= capacitypost

import java.nio.ByteBuffer;

public class BufferTest {

   public static void main(String[] args) {
       //1. 分配一個非直接緩衝區(用戶地址空間,即JVM)
       ByteBuffer buf = ByteBuffer.allocate(1024);
       System.out.println(buf);//java.nio.HeapByteBuffer[pos=0 lim=1024 cap=1024] 堆空間

       //2. 向buf中寫入數據
       buf.put("abcd".getBytes());
       System.out.println(buf); //[pos=4 lim=1024 cap=1024]

       //3. 切換成讀模式
       buf.flip(); //limit = position; position = 0; mark = -1;
       System.out.println(buf); //[pos=0 lim=4 cap=1024]

       //4. buf.get()讀數據
       for (int i = 0; i < buf.limit(); i++) {
           System.out.println((char) buf.get());
       }
       System.out.println(buf); //[pos=4 lim=4 cap=1024]

       //5. buf.rewind()再讀數據
       buf.rewind(); //position = 0; mark = -1;
       System.out.println(buf); //[pos=0 lim=4 cap=1024]

       //6. buf.get(bytes)讀一個字節數組
       byte[] bytes = new byte[buf.limit()];
       buf.get(bytes);
       System.out.println(new String(bytes));  //[pos=4 lim=4 cap=1024]

       //7. buf.reset()復位到上一個標記位
       buf.position(1).mark();
       System.out.println(buf);
       buf.get();
       System.out.println(buf);
       buf.reset();
       System.out.println(buf);

       //8. buf.hasRemaining()
       if (buf.hasRemaining()) {
           System.out.println(buf.remaining());
       }
       
       //9. buf.duplicate()
       ByteBuffer buf2 = buf.duplicate();
      
       //10. buf.clear() 清空緩衝區,可是緩衝區依舊存在,只是處於「遺忘」狀態
       buf.clear();
   }
}

1.1 直接緩衝區和非直接緩衝區

  • 非直接緩衝區:將緩衝區開闢在用戶地址空間(JVM)。

    ByteBuffer.allocate(1024);

JDK 源碼:

public static ByteBuffer allocate(int capacity) {
    if (capacity < 0)
        throw new IllegalArgumentException();
    //在堆中開闢空間
    return new HeapByteBuffer(capacity, capacity);
}
  • 直接緩衝區(少用):將緩衝區直接開闢在內核地址空間(OS),減小了將數據從內核地址空間複製到用戶地址空間的過程,提升了效率。但直接操做內核地址不安全,分配銷燬開銷大,不易操做。通常狀況下,用於本機IO操做影響的大型、持久的緩衝區,而且性能有明顯提高的狀況。

    ByteBuffer.allocateDirect(1024);

JDK 源碼:

public static ByteBuffer allocateDirect(int capacity) {
    return new DirectByteBuffer(capacity);
}

2、Channel(通道)

通道:IO 中源與目標以前的鏈接,負責傳輸數據。緩衝區用於存儲數據。

java.nio.channels.Channel 接口:
    |--FileChannel
    |--SocketChannel
    |--ServerSocketChannel
    |--DatagramChannel

2.1 獲取通道的有有三種方法

  1. Java 針對支持通道的類提供了 getChannel() 方法。

    • 本地IO FileInputStream/FileOutputStream/RandomAccessFile

    • 網絡IO Socket/ServerSocket/DatagramSocket

  2. 在 JDK 1.7 中的 NIO.2 針對各個通道提供了靜態方法 open()

  3. 在 JDK 1.7 中的 NIO.2 的 Files 工具類的 newByteChannel()

// 經過通道傳輸數據
public void test1() throws Exception {
    FileInputStream fis = new FileInputStream("1.png");
    FileOutputStream fos = new FileOutputStream("2.png");

    //1. 獲取通道
    FileChannel inChannel = fis.getChannel();
    FileChannel outChannel = fos.getChannel();

    //2. 分配緩衝區
    ByteBuffer buf = ByteBuffer.allocate(1024);

    //3. 用通道傳輸數據
    while (inChannel.read(buf) != -1) {
        buf.flip();
        outChannel.write(buf);
        buf.clear();
    }

    //4. 關閉
    outChannel.close();
    inChannel.close();
    fis.close();
    fos.close();
}

// 內存映射文件,直接緩衝區
public void test2 () throws IOException {
    FileChannel inChannel = FileChannel.open(Paths.get("1.png"), StandardOpenOption.READ);
    FileChannel outChannel = FileChannel.open(Paths.get("3.png"), 
      StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);

    //內存映射文件,直接緩衝區
    MappedByteBuffer inMappedBuf = inChannel.map(FileChannel.MapMode.READ_ONLY, 0, inChannel.size());
    MappedByteBuffer outMappedBuf = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size());

    byte[] bytes = new byte[inMappedBuf.limit()];
    inMappedBuf.get(bytes);
    outMappedBuf.put(bytes);

    inChannel.close();
    outChannel.close();
}

// transferTo
public void test3 () throws IOException {
    FileChannel inChannel = FileChannel.open(Paths.get("1.png"), StandardOpenOption.READ);
    FileChannel outChannel = FileChannel.open(Paths.get("4.png"), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);

    //內存映射文件,直接緩衝區
    inChannel.transferTo(0, inChannel.size(), outChannel);
    //outChannel.transferFrom(inChannel, 0, inChannel.size());

    inChannel.close();
    outChannel.close();
}

2.2 分散(Scatter)與彙集(Gather)

  • 分散讀取(Scatter Read):按照緩衝區的順序,從 Channel 中讀取的依次將緩衝區填滿

  • 彙集寫入(Gather Write):將多個緩衝區的數據依次寫入 Channel

public static void main(String[] args) throws IOException {
    RandomAccessFile raf = new RandomAccessFile("1.txt", "rw");
    FileChannel inChannel = raf.getChannel();

    //1. 獲取通道
    ByteBuffer buf1 = ByteBuffer.allocate(10);
    ByteBuffer buf2 = ByteBuffer.allocate(20);

    // Gather
    ByteBuffer[] bufs = {buf1, buf2};
    inChannel.read(bufs);

    for (ByteBuffer buf : bufs) {
        buf.flip();
    }

    System.out.println(new String(bufs[0].array(), 0, bufs[0].limit()));
    System.out.println(new String(bufs[1].array(), 0, bufs[1].limit()));

    // Scanner
    RandomAccessFile raf2 = new RandomAccessFile("2.txt", "rw");
    FileChannel outChannel = raf2.getChannel();

    outChannel.write(bufs);

}

2.2 補充:編碼與解碼

// 輸出有效的字符集
public void test1 () {
    SortedMap<String, Charset> charsets = Charset.availableCharsets();
    for (Map.Entry<String, Charset> me : charsets.entrySet()) {
        System.out.println(me.getKey() + ": " + me.getValue());
    }
}

// 編碼與解碼
public void test2 () throws CharacterCodingException {
    Charset cs = Charset.forName("gbk");

    //編碼器和解碼器
    CharsetEncoder encoder = cs.newEncoder();
    CharsetDecoder decoder = cs.newDecoder();

    CharBuffer cBuf = CharBuffer.allocate(1024);
    cBuf.put("中華人民共和國");
    cBuf.flip();

    //編碼
    ByteBuffer bBuf = encoder.encode(cBuf);

    try {
        System.out.println(new String(bBuf.array(), "gbk"));
    } catch (UnsupportedEncodingException e) {
        ;
    }

    //解碼
    CharBuffer cBuf2 = decoder.decode(bBuf);
    System.out.println(cBuf2.limit());
    System.out.println(cBuf2.toString());
}

3、Selector(多路複用器)


天天用心記錄一點點。內容也許不重要,但習慣很重要!

相關文章
相關標籤/搜索