深刻了解NIO底層原理

Redis 爲什麼能支持高併發?

Redis底層採用NIO中的多路IO複用的機制,對多個不一樣的鏈接(TCP)實現IO複用,很好地支持高併發,而且能實現線程安全java

Redis官方沒有windows版本,只有Linux版本。nginx

NIO在不一樣的操做系統上實現的方式有所不一樣,在Windows操做系統使用select實現輪訓,並且還存在空輪訓的狀況,效率很是低。時間複雜度是爲O(n)。其次默認對輪訓的數據有必定限制,因此難於支持上萬的TCP鏈接。
在Linux操做系統採用epoll實現事件驅動回調,不會存在空輪訓的狀況,只對活躍的socket鏈接實現主動回調,這樣在性能上有大大的提高,時間複雜度是爲O(1)web

Windows 操做系統是沒有epoll,只有Linux系統纔有epoll。redis

這就是爲何nginx、redis都可以很是好的支持高併發,最終都是Linux中的IO多路複用機制epoll。windows

阻塞和非阻塞

阻塞和非阻塞一般形容多線程間的相互影響。好比一個線程佔用了臨界區資源,那麼其它全部須要這個資源的線程就必須在這個臨界區中進行等待,等待會致使線程掛起。這種狀況就是阻塞。此時,若是佔用資源的線程一直不肯意釋放資源,那麼其它全部阻塞在這個臨界區上的線程都不能工做。而非阻塞容許多個線程同時進入臨界區。安全

阻塞調用是指調用結果返回以前,當前線程會被掛起。調用線程只有在獲得結果以後纔會返回。
非阻塞調用指在不能馬上獲得結果以前,該調用不會阻塞當前線程。多線程

BIO NIO AIO 概念

BIO(blocking IO):就是傳統的 java.io 包,它是基於流模型實現的,交互的方式是同步、阻塞方式,也就是說在讀入輸入流或者輸出流時,在讀寫動做完成以前,線程會一直阻塞在那裏,它們之間的調用是可靠的線性順序。優勢是代碼比較簡單、直觀;缺點是 IO 的效率和擴展性很低,容易成爲應用性能瓶頸。
NIO(non-blocking IO) :Java 1.4 引入的 java.nio 包,提供了 Channel、Selector、Buffer 等新的抽象,能夠構建多路複用的、同步非阻塞 IO 程序,同時提供了更接近操做系統底層高性能的數據操做方式。
AIO(Asynchronous IO) :是 Java 1.7 以後引入的包,是 NIO 的升級版本,提供了異步非堵塞的 IO 操做方式,因此人們叫它 AIO(Asynchronous IO),異步 IO 是基於事件和回調機制實現的,也就是應用操做以後會直接返回,不會堵塞在那裏,當後臺處理完成,操做系統會通知相應的線程進行後續的操做。併發

NIO 講解

咱們知道,BIO是阻塞式IO,是面向於流傳輸也便是根據每一個字節實現傳輸,效率比較低;而NIO是同步非阻塞式的,式面向於緩衝區的,它的亮點是IO多路複用
咱們能夠這樣理解IO多路複用,多路能夠指有多個不一樣的TCP鏈接,複用是一個線程來維護多個不一樣的IO操做。因此它的好處是佔用CPU資源很是小,並且線程安全。異步

NIO核心組件

管道channel:數據傳輸都是通過管道的。channel都是統一註冊到Selector上的。
選擇器Selector:也可稱爲多路複用器。能夠在單線程的狀況下維護多個Channel,也能夠維護多個鏈接。socket

在這裏插入圖片描述

BIO 和 NIO 代碼演示

傳統的BIO阻塞式Socket過程:

先啓動一個Socket服務端,此時控制檯會輸出開始等待接收數據中...,並等待客戶端鏈接。

package com.nobody;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;

/** * @author Mr.nobody * @Description * @date 2020/7/4 */
public class SocketTcpBioServer { 
 
   

    private static byte[] bytes = new byte[1024];

    public static void main(String[] args) { 
 
   

        try { 
 
   
            // 建立ServerSocket
            final ServerSocket serverSocket = new ServerSocket();
            // 綁定監聽端口號
            serverSocket.bind(new InetSocketAddress(8080));

            while (true) { 
 
   
                System.out.println("開始等待接收數據中...");
                Socket accept = serverSocket.accept();
                int read = 0;
                read = accept.getInputStream().read(bytes);
                String result = new String(bytes);
                System.out.println("接收到數據:" + result);
            }

        } catch (IOException e) { 
 
   
            e.printStackTrace();
        }

    }
}

在這裏插入圖片描述

再啓動一個Socket客戶端,先不進行輸入。

package com.nobody;

import java.io.IOException;
import java.net.*;
import java.util.Scanner;

/** * @author Mr.nobody * @Description * @date 2020/7/4 */
public class ClientTcpSocket { 
 
   

    public static void main(String[] args) { 
 
   
        Socket socket = new Socket();
        try { 
 
   
            // 與服務端創建鏈接
            SocketAddress socketAddress = new InetSocketAddress(InetAddress.getLocalHost(), 8080);
            socket.connect(socketAddress);
            while (true) { 
 
   
                Scanner scanner = new Scanner(System.in);
                socket.getOutputStream().write(scanner.next().getBytes());
            }
        } catch (UnknownHostException e) { 
 
   
            e.printStackTrace();
        } catch (IOException e) { 
 
   
            e.printStackTrace();
        }
    }

}

再啓動另一個Socket客戶端02,輸入client02

package com.nobody;

import java.io.IOException;
import java.net.*;
import java.util.Scanner;

/** * @author Mr.nobody * @Description * @date 2020/7/4 */
public class ClientTcpSocket02 { 
 
   

    public static void main(String[] args) { 
 
   
        Socket socket = new Socket();
        try { 
 
   
            // 與服務端創建鏈接
            SocketAddress socketAddress = new InetSocketAddress(InetAddress.getLocalHost(), 8080);
            socket.connect(socketAddress);
            while (true) { 
 
   
                Scanner scanner = new Scanner(System.in);
                socket.getOutputStream().write(scanner.next().getBytes());
            }
        } catch (UnknownHostException e) { 
 
   
            e.printStackTrace();
        } catch (IOException e) { 
 
   
            e.printStackTrace();
        }
    }

}

在這裏插入圖片描述
此時能夠看到服務端沒有接收到數據,由於Socket客戶端01先鏈接,可是還未輸入數據,因此服務端一直等待客戶端01的輸入,致使客戶端02阻塞。

若是咱們這時在客戶端01輸入client01,服務端控制檯顯示以下,先輸出客戶端01的數據,完成後才能輸出客戶端02的數據。
在這裏插入圖片描述
固然,若是不想後鏈接的客戶端不阻塞,可使用多線程實現僞異步IO,只需將服務端代碼修改成以下:

public static void main(String[] args) { 
 
   

    try { 
 
   
        // 建立ServerSocket
        final ServerSocket serverSocket = new ServerSocket();
        // 綁定監聽端口號
        serverSocket.bind(new InetSocketAddress(8080));

        while (true) { 
 
   
            System.out.println("開始等待接收數據中...");
            Socket accept = serverSocket.accept();
            new Thread(new Runnable() { 
 
   
                @Override
                public void run() { 
 
   
                    int read = 0;
                    try { 
 
   
                        read = accept.getInputStream().read(bytes);
                    } catch (IOException e) { 
 
   
                        e.printStackTrace();
                    }
                    String result = new String(bytes);
                    System.out.println("接收到數據:" + result);
                }
            }).start();
        }

    } catch (IOException e) { 
 
   
        e.printStackTrace();
    }
}

固然上面代碼有個缺點是建立的線程會頻繁建立和銷燬,頻繁進行CPU調度,而且也消耗內存資源,可以使用線程池機制優化。

NIO非阻塞式Socket過程:
前面兩個客戶端代碼不變,服務端代碼以下:

package com.nobody.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;

/** * @author Mr.nobody * @Description * @date 2020/7/4 */
public class NioServer { 
 
   

    private Selector selector;

    public void iniServer() { 
 
   
        try { 
 
   
            // 建立管道
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            // 設置管道爲非阻塞
            serverSocketChannel.configureBlocking(false);
            // 將管道綁定到8080端口
            serverSocketChannel.bind(new InetSocketAddress(8080));
            // 建立一個選擇器
            this.selector = Selector.open();
            // 將管道註冊到選擇器上,註冊爲SelectionKey.OP_ACCEPT事件,
            // 當事件到達後,selector.select()會返回,不然改方法會一直阻塞。
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) { 
 
   
            e.printStackTrace();
        }
    }

    public void listen() throws IOException { 
 
   
        System.out.println("服務端啓動成功...");
        // 輪詢訪問Selector
        while (true) { 
 
   
            // 當事件到達後,selector.select()會返回,不然改方法會一直阻塞。
            int select = selector.select(10);
            // 沒有發送消息,跳過
            if (0 == select) { 
 
   
                continue;
            }

            // selector中選中的註冊事件
            Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();
            while (iterator.hasNext()) { 
 
   
                SelectionKey key = iterator.next();
                // 刪除已選中的key,避免重複處理
                iterator.remove();
                if (key.isAcceptable()) { 
 
    // 客戶端鏈接事件
                    ServerSocketChannel server = (ServerSocketChannel) key.channel();
                    // 得到與客戶端鏈接的管道
                    SocketChannel socketChannel = server.accept();
                    // 設置管道爲非阻塞
                    socketChannel.configureBlocking(false);
                    // 與客戶端鏈接後,爲了能接收到客戶端的消息,爲管道設置可讀權限
                    socketChannel.register(this.selector, SelectionKey.OP_READ);
                } else if (key.isReadable()) { 
 
    // 可讀事件
                    // 建立讀取數據的緩衝區
                    ByteBuffer byteBuffer = ByteBuffer.allocate(512);
                    SocketChannel channel = (SocketChannel) key.channel();
                    channel.read(byteBuffer);
                    byte[] bytes = byteBuffer.array();
                    String msg = new String(bytes).trim();
                    System.out.println("服務端收到消息:" + msg);
                    ByteBuffer outByteBuffer = ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8));
                    // 迴應消息給客戶端
                    channel.write(outByteBuffer);
                }
            }
        }
    }

    public static void main(String[] args) throws IOException { 
 
   
        NioServer nioServer = new NioServer();
        nioServer.iniServer();
        nioServer.listen();
    }
}

啓動服務端,而後再啓動兩個客戶端,兩個客戶端都不會阻塞。
在這裏插入圖片描述
在這裏插入圖片描述

本文同步分享在 博客「Μr.ηobοdy」(CSDN)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索