Nio編程模型總結

終於,這兩天的考試熬過去了, 興致沖沖的來整理筆記來, 這篇博客是我近幾天的NIO印象筆記彙總,記錄了對Selector及Selector的重要參數的理解,對Channel的理解,常見的Channel,對NIO事件驅動的編程模型的理解,NIO與傳統IO的對比,NIO的TCP/IP編程的實踐.java

Channel

什麼是Channel

這個概念絕對是一級概念,Channel是一個管道,用於鏈接字節緩衝區和另外一端的實體, 這個字節緩衝區就是ByteBuffer, 另外一端的實體能夠是一個File 或者是 Socket ;

或者基於IO的網絡編程, 數據的交互藉助於InputStream或者是OutputStream, 而Channel能夠理解成對Stream的又一層封裝;在這種編程模型中 服務端想和客戶端進行交互,就須要從服務端本身的ServerSocketChannel中獲取前來鏈接的客戶端的SocketChannel,並把他註冊關聯上感性趣的事件且本身的Selector選擇器上, 這樣一旦客戶端把Buffer中的數據推送進channel, 服務端就能夠感知,進而處理編程

經常使用的Chanenl

img

  • 文件通道: FileChannel
  • 套接字通道
    • 服務端: ServerSocketChannel
    • 客戶端: SocketChannel
  • 數據包通道: DataGramSocket

Channel 與 Stream

Channel的NIO編程模型中一大組件,它相似IO中的Stream,可是二者也有本質的區別;緩存

爲何說是相似呢? 看下面的兩段代碼, 需求是磁盤上的文件進行讀寫服務器

在IO編程中,咱們第一步可能要像下面這樣獲取輸入流,按字節把磁盤上的數據讀取到程序中,再進行下一步操做網絡

FileInputStream fileInputStream = new FileInputStream("123.txt");

在NIO編程中,目標是須要先獲取通道,再基於Channel進行讀寫多線程

FileInputStream fileInputStream = new FileInputStream("123.txt");
FileChannel channel = fileInputStream.channel();

對用戶來講,在IO / NIO 中這兩種都直接關聯這磁盤上的數據文件,數據的讀寫首先都是獲取Stream和Channel,因此說他們類似;app

可是: 對於Stream來講,全部的Stream都是單向的,對咱們的程序來講,Stream要麼只能是從裏面獲取數據的輸入流,要麼是往裏面輸入數據的輸出流,由於InputStream和outputStream都是抽象類,在java中是不支持多繼承的, 而通道不一樣,他是雙向的,對一個通道可讀可寫dom

怎麼理解 Channel能夠是雙向的?

如上圖,凡是同時實現了readable,writeable接口的類,都雙向的通道. 下面是典型的例子socket

SocketChannel
在NIO網絡編程中,服務端能夠經過ServerSocketChannel獲取客戶端的SocketChannel
這個SocketChannel能夠read() 客戶端的消息存入Buffer, 往客戶端 write()buffer裏的內容
socketChannel1.read(byteBuffer);
socketChannel1.write(byteBuffer);

對於一個channel,咱們既能從中獲取數據,也能往外read數據ide

基於channel的文件拷貝方式和傳統的IO拷貝的競速

效率最低的按字節拷貝

public static  void text4() throws IOException {
    System.out.println("開始: ... ");
        FileInputStream    fis = new FileInputStream("123.txt");
        FileOutputStream   fos = new FileOutputStream("output123.txt");
    int read=0;
    long start =0;
    while((read=fis.read())!=-1){
        fos.write(read);
    }
    System.out.println("耗時: "+(System.currentTimeMillis()-start) );
    fis.close();
    fos.close();
}

一個3901KB的文件的拷貝,在個人機器上跑出了 1561097384707 的好成績; 實屬無奈,擦點覺得編譯器卡死


以NIO,channel+buffer的模型,拷貝文件

try (
    FileInputStream  fis = new FileInputStream("123.txt");
    FileOutputStream   fos = new FileOutputStream("output123.txt");
){
    //1.獲取通道
    FileChannel   inChannel = fis.getChannel();
    FileChannel   outChannel = fos.getChannel();

    //2.分配指定大小的緩衝區
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    long start = System.currentTimeMillis();
    //3.將通道中的數據緩衝區中
    while (inChannel.read(buffer) != -1) {
        buffer.flip();//切換成都數據模式
        //4.將緩衝區中的數據寫入通道中
        outChannel.write(buffer);
        buffer.clear();//清空緩衝區
    }
    System.out.println("總耗時:" + (System.currentTimeMillis() - start));
} catch (Exception e) {
    e.printStackTrace();
}

速度明顯提高 大約平均耗時 110


NIO+零拷貝 複製文件

// 直接獲取通道
    FileChannel inChannel2 = FileChannel.open(Paths.get("123.txt"), StandardOpenOption.READ);
    FileChannel outChannel2 = FileChannel.open(Paths.get("output123.txt"), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);
    //內存映射文件
    MappedByteBuffer inMappedBuf = inChannel2.map(FileChannel.MapMode.READ_ONLY, 0, inChannel2.size());
    MappedByteBuffer outMappedBuf = outChannel2.map(FileChannel.MapMode.READ_WRITE, 0, inChannel2.size());
    //直接對緩衝區進行數據讀寫操做
    byte[] dst = new byte[inMappedBuf.limit()];
    long start = System.currentTimeMillis();
    inMappedBuf.get(dst);
    outMappedBuf.put(dst);
    System.out.println("耗費的時間爲:" + ( System.currentTimeMillis() - start));

    inChannel2.close();
    outChannel2.close();

或者

/*
     * 通道之間的數據傳輸(直接緩衝區)
     */
    FileChannel inChannel3 = FileChannel.open(Paths.get("123.txt"), StandardOpenOption.READ);
    FileChannel outChannel3 = FileChannel.open(Paths.get("output123.txt"), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);
    long start = System.currentTimeMillis();
    inChannel3.transferTo(0, inChannel3.size(), outChannel3);
    System.out.println("耗時: "+(System.currentTimeMillis()-start) );

    //等價於
    // outChannel3.transferFrom(inChannel3, 0, inChannel3.size());

    inChannel3.close();
    outChannel3.close();

零拷貝僅須要耗時 6 就能夠完成


NIO的非阻塞與IO的阻塞

什麼是阻塞? 舉個例子, 若是有一天我碰到了不會的做業題,因而我給老師發了條短息請教咋作, 這時,假如我進入了阻塞模式,我就會一直瞅着手機,別的也不幹,就等着老師回信息, 假如我進入了非阻塞的模式,發完短信後跳過這個題,去作別的題

常見的阻塞好比, 鍵盤錄入, Socket的accept()以及IO的read write, 所有會卡在那行代碼直到執行完畢纔會往下執行, 這種風格的好處是顯而易見的, 及其容易的進行順序編程

可是在NIO中,channel的read,write能夠是阻塞的,也能夠是非阻塞的,這取決於channel是否阻塞, 通常在進行網絡編程時,要搭配上selector選擇器,一塊兒用, 同時channel咱們也會設置成非阻塞的, 想一想也不能讓服務器的讀寫阻塞住,由於它可不是面對一兩個用戶,咱們須要它能夠一遍一遍的正常流水運行

在客戶端,connect方法再也不是阻塞的,和服務端進行數據交互以前,java提供了檢查機確保鏈接百分百健康, 若是服務端沒有接受鏈接,客戶端是是沒辦法進一步操做的

if (selectionKey.isConnectable()) {
// 強轉成 有鏈接事件發生的Channel
client = (SocketChannel) selectionKey.channel();
// 完成鏈接
if (client.isConnectionPending()) {
client.finishConnect();

從通道中的read和write方法也不是阻塞的,便可返回,可讓服務端的業務代碼很流暢的執行完,再接受新的請求,處理新請求

Selector

Selector選擇器NIO的第三個組件,三者的關係圖如上所示

什麼是selector? 做用是什麼?

selector是選擇器的意思, 和它直接關聯的組件是Channel, 沒錯,它的做用就是不斷的輪詢綁定在他身上的全部channel. 一旦有通道發生了它感興趣的事件,接着處理此事件

selector維護了什麼?

不管是服務端的Selector 仍是客戶端的Selector 它都維護了三個Set集合 , 裏面封裝的是 SelectionKey, 他是channel註冊進Selector的產物,通常是使用它反向獲取channel

  1. key set
  • 他是一個全集,每當channel經過register方法註冊進選擇器時,於此同時也會把包含本身信息的key添加到這個全集中來 註冊的信息就會以SelectionKey的封裝形式保存在這個集合中, 選擇器每次輪詢的channel,就是這裏面的channel
  1. selected key
  • 感興趣的key的集合, 舉個例子, 通道1註冊進選擇器時,告訴選擇器,我可能會給你發信息,你得盯着我,讀我給你的信息, 因而選擇器對通道1感性趣的事件是 read, 那麼在選擇器輪詢channel時, 一旦通道1出現了write操做,就會被選擇器感知,開始read

  • 每次遍歷selected key時咱們會執行這行代碼:Set<SelectionKey> selectionKeys = selector.selectedKeys(); 它的意思是,咱們取出了 選擇器的感性事件的set集合,只要程序還在運行,只要選擇器一旦被open(),除非咱們手動的close() 不然選擇器對象就不會被釋放,因此它的感興趣的set集合是不會被自動會收到,因而咱們就得收到的把處理過的感興趣的事件對應的SelectionKey移除出這個set集合,否則下一次輪詢時,這個事件還會再一次被處理,而且無限制的處理下去

  • key有且僅有兩種方式從 selected-key-set 中剔除 1. 經過Set的remove()方法, 2.經過迭代器的remove()方法

  1. cannelled key
  • 取消的key的集合,表明原來感興趣的事件,如今不感興趣了. 下一次輪詢,進行select() 本集合中的SelectionKey會從key set中移除, 意味着它所關聯的channel將會被選擇器丟棄掉,再也不進行監聽
  • 關閉channel 或者是調用了cancel()方法都會將key添加到cannelled key 集合中
  • 使用場景: 通常會在客戶端主動斷開鏈接的時候使用它.

selector的select()方法

select(long); // 設置超時時間

selectNow(); // 當即返回,不阻塞

select(); 阻塞輪詢

select()過程的細節:

  • 第一步, cannelled-key中的每個元素會從全集key set中剔除,表示這些能夠關聯的通道不會被註冊
  • 第二步操做系統幫咱們輪詢每個通道是否有選擇器感性趣的事情發生
    • 對於一條準備就緒的channel(發生事件通道),他至少會發生下面兩件事之一:
      • 它的key會被添加進selected-key-set中,來標識它將被選中,進而處理
      • 若是它的key,已經存在於這個集合中了,下一步就是它的 read-operation將被更新
  • 第三步: 若是在輪詢時發現了有任何key被放置在了cannelled-key-set中,重複第一步,再也不註冊它關聯的通道

romove key 和 cannel key 的區別

前者是把key從selected key set集合,也就是被選中的集合中剔除出去,表示當前的事件已經處理完了

後者是表示,把key從全集中剔除出去, 表示想要廢棄這個key關聯的channel

selector的建立

他是根據不一樣操做系統提供的不一樣的Provider使用provide()建立出來的

NIO編程模型


如上圖, 在NIO網絡編程模式中,再也不是傳統的多線程編程模型,當有新的客戶端的鏈接到來,再也不從新開闢新的線程去跑本次鏈接,而是統一,一條線程處理全部的鏈接, 而一次鏈接本質上就是一個Channel, NIO網絡編程模型是基於事件驅動型的; 即,有了提早約定好的事件發生,接着處理事件,沒有時間發生,選擇器就一直輪詢 下面解釋上圖的流程

  1. 服務端建立表明服務端的Channel,綁定好端口,設置成非阻塞的通道 而且初始化選擇器,而後開始輪詢綁定在本身身上的通道,此時的通道只有一個ServerSocketChannel,而選擇器只關心ServerSocketChannel上發生的OP_ACCEPT事件,而又沒有客戶端來連接 因此他被阻塞在了select()
System.out.println("Server...");
// 獲取服務端的SerSokcetChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// todo 必定要把他配置成 非阻塞的
serverSocketChannel.configureBlocking(false);

// 從通道中獲取 服務端的對象
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(8899));

// 建立選擇器
Selector selector = Selector.open();
// 把通到註冊到 選擇器上
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

  while (true) {
            // 阻塞式等待 channel上有事件發生
            int select = selector.select();
  1. 客戶端 建立表明本身的SocketChannel, 建立選擇器,把本身的註冊在上面,以下代碼, 初始化本身,SocketChannel, 把客戶端的通道註冊進選擇器,並告訴選擇器SocketChannel的感興趣事件是OP_CONNECT鏈接事件; 當執行到下面的socketChannel.connect(new InetSocketAddress("localhost", 8899)); 鏈接的請求就已經發送出去了,也就是說,若是沒有意外,執行完這一行代碼,服務端的select()方法已經返回了, 可是客戶端的connect()是非阻塞的,當即返回,故在客戶端依然會繼續執行, 進而判斷一下是不是真的鏈接上了
// 獲取客戶端的通道
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);

Selector selector = Selector.open();
// 把客戶端的通道註冊進選擇器
socketChannel.register(selector, SelectionKey.OP_CONNECT);
// todo 鏈接客戶端, 執行完這行代碼後, 服務端就能就收到通知!!!
socketChannel.connect(new InetSocketAddress("localhost", 8899));

while (true) {
    int number = selector.select(); // 選擇器阻塞式的 等待 Channel上發生它關心的事件
    System.out.println(" 發生了感興趣的事件: " + number);
    Set<SelectionKey> keySet = selector.selectedKeys();
// 驗證
    for (SelectionKey selectionKey : keySet) {
        SocketChannel client = null;
if (selectionKey.isConnectable()) {
    // 強轉成 有鏈接事件發生的Channel
    client = (SocketChannel) selectionKey.channel();
    // 完成鏈接
    if (client.isConnectionPending()) {
        client.finishConnect();
        ByteBuffer byteBuffer = ByteBuffer.allocate(512);
        byteBuffer.put((LocalDate.now() + "鏈接成功").getBytes());
        byteBuffer.flip();
        client.write(byteBuffer);
  1. 對於服務端,輪詢了這麼久,終於有鏈接進來了,因而進一步處理, 判斷若是當前的鏈接是請求創建鏈接的話,就去創建鏈接, 對於服務端來講,創建鏈接就是然服務端記住客戶端, 客戶端是誰呢?SocketChanel, 怎麼獲取呢? serverSocketChannel1.accept(); 怎麼創建鏈接呢? 實際上就是把當前的客戶端的channel註冊在服務端的選擇器上,並告訴它本身關心的事件啥, 固然一開始創建鏈接時, 服務端確定首先要作的就是監聽客戶端發送過來的數據,因而 綁定上感興趣的事件是read, 而且不要忘了,每次遍歷感興趣的key的集合時,都要及時的把當前的key剔除
selectionKeys.forEach(selectionKey -> {
    SocketChannel socketChannel = null;
    String sendKey = null;
    try {
        if (selectionKey.isAcceptable()) {
            // 1. 用戶請求創建鏈接, 根據SelectionKey 獲取服務端的通道
            // todo 當前的這個SelecttionKey 是有 ServerSocketChannel 和 selector 聯繫生成的, 所以咱們 強制轉換回 ServerSocketChannel
            ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel) selectionKey.channel();

            // todo  !!!!!!!  這是重點, 這裏的accept是非阻塞的 !!!!!!!!
            // 根據服務的 通道  獲取到客戶端的通道
            socketChannel = serverSocketChannel1.accept();
            System.out.println("socketChannel.class: " + socketChannel.getClass());
            // todo 配置成非阻塞的
            socketChannel.configureBlocking(false);

            // todo 新獲取的通道 註冊進選擇器
            socketChannel.register(selector, SelectionKey.OP_READ);

            // 保存客戶端的信息
            String key = "[ " + UUID.randomUUID().toString() + " ]";
            clientMap.put(key, socketChannel);
            // todo   把 擁有當前事件SelectionKey 剔除
  1. 對於客戶端,若是它想往服務端發送鍵盤錄入的內容時,獲取鍵盤錄入對象是免不了的事, 可是這對象會阻塞,因而客戶端不得不開啓一條新的線程運行讀取鍵盤錄入,讓本身具備鍵盤錄入的功能,同時又不會被阻塞, 若是客戶端想要接受服務端推送回來的數據怎麼辦呢? 因而咱們就得告訴客戶端的選擇器,添加一個感興趣的事件,read, 這樣,一旦服務端有數據推送過來的,客戶端的選擇器就會感知到這個事件,而且這個事件的selectionKay是可讀的,這樣一個比較完善的客戶端就ok了
executorService.submit(() -> {
    while (true) {
        try {
            // 清空上面的緩存
            byteBuffer.clear();
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
            String msg = bufferedReader.readLine();
            byteBuffer.put(msg.getBytes());
            byteBuffer.flip();
            finalClient.write(byteBuffer);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
});
}

// 上面的代碼是發生了 請求鏈接事件
// todo 給客戶端註冊一個讀取客戶端返回數據的事件
client.register(selector, SelectionKey.OP_READ);
  1. 服務端在創建鏈接時,就給客戶端的通道綁定了感興趣的事件是read, 因而當客戶端往channel中write數據了,服務端就會來到下面的代碼塊, 若是是羣聊的話, 咱們就得知道,往哪些用戶轉發信息, 因而咱們提早構造了map,這個map存放就是一個一個和服務的channel創建鏈接的SocketChannel; 只須要遍歷map, 往裏面的chanel,write數據便可
else if (selectionKey.isReadable()) {
    System.out.println("readable...");
    // 獲取客戶端的通道
    socketChannel = (SocketChannel) selectionKey.channel();
    System.out.println("當前的客戶端 通道實例: socketChannel == " + socketChannel);
    // 獲取當前 是哪一個客戶端發起的信息
    ByteBuffer byteBuffer = ByteBuffer.allocate(512);
    // 讀取客戶端發送的消息
    while (true) {// todo todo todo  很重要的一點!!!  read方法是非阻塞的, 極可能還有沒讀取到數據就返回了
        int read = socketChannel.read(byteBuffer);
        System.out.println("read == : " + read);
        if (read <= 0) {
            break;
        }
    }
    // 往其餘客戶端寫
    byteBuffer.flip();
    Charset charset = Charset.forName("utf-8");
    String msg = String.valueOf(charset.decode(byteBuffer).array());
    // Buffer轉字符串
    System.out.println("收到客戶端: " + socketChannel + "  發送的消息: " + msg);
    // 遍歷map
    for (Map.Entry<String, SocketChannel> map : clientMap.entrySet()) {
        if (socketChannel == map.getValue()) {
            sendKey = map.getKey();
        }
    }
    // todo 轉發給所有的客戶端發送
    for (Map.Entry<String, SocketChannel> map : clientMap.entrySet()) {
        SocketChannel socketChannel1 = map.getValue();
        ByteBuffer byteBuffer1 = ByteBuffer.allocate(512);
        // 把信息放進 byteBuffer1中
        String message = msg + " : " + sendKey;
        byteBuffer1.put(message.getBytes());
        byteBuffer.flip();
        socketChannel1.write(byteBuffer);
    }
  1. 客戶端斷開了怎麼辦呢? 在一臺電腦上,手動將一個客戶端停掉,服務端會運行到selectionKey.isReadable() 而且進入這個if塊, 當它嘗試從裏面讀取的時候,就發現這個鏈接已經壞掉了,因而報錯,強制斷開鏈接, 由於還要繼續輪詢,全集key set 中依然保存着當前的客戶端的channel, 因此會一直報錯下去, 怎麼辦呢? 以下
//  selectionKey.cancel();  常規

try {
    // 這樣也能取消這個鍵
    socketChannel.close();
} catch (IOException e1) {
    e1.printStackTrace();
}

// 固然咱們如今還要多一步,  由於他還在咱們的map裏面  否則一會發消息的時候,會出錯
// todo 移除出map 中失效的 channel
// todo 遍歷map
for (Map.Entry<String, SocketChannel> map : clientMap.entrySet()) {
    if (socketChannel == map.getValue()) {
        sendKey = map.getKey();
    }
}
clientMap.remove(sendKey, socketChannel);
相關文章
相關標籤/搜索