本文內容涉及同步與異步, 阻塞與非阻塞, BIO、NIO、AIO等概念, 這塊內容自己比較複雜, 很難用三言兩語說明白. 而書上的定義更不容易理解是什麼意思. 下面跟着我一塊兒解開它們神祕的面紗。java
從簡單的開始,咱們以經典的讀取文件的模型舉例。(對操做系統而言,全部的輸入輸出設備都被抽象成文件。)git
在發起讀取文件的請求時,應用層會調用系統內核的I/O接口。github
若是應用層調用的是阻塞型I/O,那麼在調用以後,應用層即刻被掛起,一直出於等待數據返回的狀態,直到系統內核從磁盤讀取完數據並返回給應用層,應用層才用得到的數據進行接下來的其餘操做。編程
若是應用層調用的是非阻塞I/O,那麼調用後,系統內核會當即返回(雖然尚未文件內容的數據),應用層並不會被掛起,它能夠作其餘任意它想作的操做。(至於文件內容數據如何返回給應用層,這已經超出了阻塞和非阻塞的辨別範疇。)數組
這即是(脫離同步和異步來講以後)阻塞和非阻塞的區別。總結來講,是不是阻塞仍是非阻塞,關注的是接口調用(發出請求)後等待數據返回時的狀態。被掛起沒法執行其餘操做的則是阻塞型的,能夠被當即「抽離」去完成其餘「任務」的則是非阻塞型的。 緩存
阻塞和非阻塞解決了應用層等待數據返回時的狀態問題,那系統內核獲取到的數據到底如何返回給應用層呢?這裏不一樣類型的操做便體現的是同步和異步的區別。bash
對於同步型的調用,應用層須要本身去向系統內核問詢,若是數據還未讀取完畢,那此時讀取文件的任務還未完成,應用層根據其阻塞和非阻塞的劃分,或掛起或去作其餘事情(因此同步和異步並不決定其等待數據返回時的狀態);若是數據已經讀取完畢,那此時系統內核將數據返回給應用層,應用層便可以用取得的數據作其餘相關的事情。服務器
而對於異步型的調用,應用層無需主動向系統內核問詢,在系統內核讀取完文件數據以後,會主動通知應用層數據已經讀取完畢,此時應用層便可以接收系統內核返回過來的數據,再作其餘事情。網絡
這即是(脫離阻塞和非阻塞來講以後)同步和異步的區別。也就是說,是不是同步仍是異步,關注的是任務完成時消息通知的方式。由調用方盲目主動問詢的方式是同步調用,由被調用方主動通知調用方任務已完成的方式是異步調用。數據結構
假設小明須要在網上下載一個軟件:
若是小明點擊下載按鈕以後,就一直幹瞪着進度條不作其餘任何事情直到軟件下載完成,這是同步阻塞; 若是小明點擊下載按鈕以後,就一直幹瞪着進度條不作其餘任何事情直到軟件下載完成,可是軟件下載完成實際上是會「叮」的一聲通知的(但小明依然那樣乾等着),這是異步阻塞;(不常見) 若是小明點擊下載按鈕以後,就去作其餘事情了,不過他總須要時不時瞄一眼屏幕看軟件是否是下載完成了,這是同步非阻塞; 若是小明點擊下載按鈕以後,就去作其餘事情了,軟件下載完以後「叮」的一聲通知小明,小明再回來繼續處理下載完的軟件,這是異步非阻塞。 相信看完以上這個案例以後,這幾個概念已經可以分辨得很清楚了。
總的來講,同步和異步關注的是任務完成消息通知的機制,而阻塞和非阻塞關注的是等待任務完成時請求者的狀態。
咱們經過 客戶端像服務端查詢信息做爲一個例子。分別經過三種模型來實現。
在傳統的網絡編程中,服務端監聽端口,客戶端請求服務端的ip跟監聽的端口,跟服務端通訊,必須三次握手創建。若是鏈接成功,經過套接字(socket)進行通訊。 在BIO通訊模型:採用BIO通訊模型的服務端,一般由一個獨立的Acceptor線程負責監聽客戶端的鏈接,它接收到客戶端鏈接請求以後爲每一個客戶端建立一個新的線程進行鏈路處理沒處理完成後,經過輸出流返回應答給客戶端,線程銷燬。即典型的一請求一應答通訊模型。
從圖中能夠得知,該模型中每個請求對應一個線程處理,在線程數量有限的狀況下,請求數量多,那麼服務器就會由於資源不足而掛掉。 服務端代碼/** * @author yukong * @date 2018年8月24日18:51:40 * 服務端 */
public class Server {
/** * 默認端口 */
private static final Integer DEFAULT_PORT = 6789;
public void start() throws IOException {
start(DEFAULT_PORT);
}
public void start(Integer port) throws IOException {
ServerSocket serverSocket = new ServerSocket(port);
System.out.println("小yu機器人啓動,監聽端口爲:" + port);
//經過無線循環監聽客戶端鏈接
while (true) {
// 阻塞方法,直至有客戶端鏈接成功
Socket socket = serverSocket.accept();
// 多線程處理客戶端請求
new Thread(new ServerHandler(socket)).start();
}
}
public static void main(String[] args) throws IOException {
new Server().start();
}
}
複製代碼
服務端處理器代碼
/**
* @author yukong
* @date 2018年8月24日18:51:40
* 服務端業務邏輯處理器
*/
public class ServerHandler implements Runnable{
private Socket socket;
public ServerHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
BufferedReader in = null;
PrintWriter out = null;
try {
// 獲取socket的字符緩存輸入流 也就是獲取客戶端給服務器的字符流
in = new BufferedReader( new InputStreamReader(this.socket.getInputStream()));
// 獲取socket的字符輸出流 也就是發送的客戶的字符流 第二個參數自動刷新
out = new PrintWriter( new OutputStreamWriter(this.socket.getOutputStream()), true);
String request, response;
// 讀取輸入流的消息 若是爲空 則退出讀取
while ((request = in.readLine()) != null) {
System.out.println("[" + Thread.currentThread().getName()+ "]" + "小yu機器人收到消息:" + request);
// 具體業務邏輯處理 查詢信息。
response = ResponseUtil.queryMessage(request);
out.println(response);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
// 資源釋放
if (in != null) {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
in = null;
}
if (out != null) {
out.close();
out = null;
}
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
socket = null;
}
}
}
}
複製代碼
客戶端代碼
/** * @author yukong * @date 2018年8月24日18:51:40 * 客戶端 */
public class Client {
/** * 默認端口 */
private static final Integer DEFAULT_PORT = 6789;
/** * 默認端口 */
private static final String DEFAULT_HOST = "localhost";
public void send(String key){
send(DEFAULT_PORT,key);
}
public void send(int port,String key){
System.out.println("查詢的key爲:" + key);
Socket socket = null;
BufferedReader in = null;
PrintWriter out = null;
try{
socket = new Socket(DEFAULT_HOST,port);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(),true);
out.println(key);
System.out.println("查詢的結果爲:" + in.readLine());
}catch(Exception e){
e.printStackTrace();
}finally{
if(in != null){
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
in = null;
}
if(out != null){
out.close();
out = null;
}
if(socket != null){
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
socket = null;
}
}
}
public static void main(String[] args) {
Scanner scanner = new Scanner(System.in);
Client client = new Client();
while (scanner.hasNext()) {
String key = scanner.next();
client.send(key);
}
}
}
複製代碼
從代碼中能夠得知,咱們每次請求都是new Thread去處理,意味着線程消耗巨大,可能會有朋友說道,那就用線程池,一樣的若是使用線程池,當達到線程最大數量,也會達到瓶頸。該模式不適合高併發的訪問。
NIO提供了與傳統BIO模型中的Socket和ServerSocket相對應的SocketChannel和ServerSocketChannel兩種不一樣的套接字通道實現。 新增的着兩種通道都支持阻塞和非阻塞兩種模式。 阻塞模式使用就像傳統中的支持同樣,比較簡單,可是性能和可靠性都很差;非阻塞模式正好與之相反。 對於低負載、低併發的應用程序,可使用同步阻塞I/O來提高開發速率和更好的維護性;對於高負載、高併發的(網絡)應用,應使用NIO的非阻塞模式來開發。 下面會先對基礎知識進行介紹。
Buffer是一個對象,包含一些要寫入或者讀出的數據。
在NIO庫中,全部數據都是用緩衝區處理的。在讀取數據時,它是直接讀到緩衝區中的;在寫入數據時,也是寫入到緩衝區中。任什麼時候候訪問NIO中的數據,都是經過緩衝區進行操做。 緩衝區其實是一個數組,並提供了對數據結構化訪問以及維護讀寫位置等信息。 具體的緩存區有這些:ByteBuffe、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。他們實現了相同的接口:Buffer。
咱們對數據的讀取和寫入要經過Channel,它就像水管同樣,是一個通道。通道不一樣於流的地方就是通道是雙向的,能夠用於讀、寫和同時讀寫操做。 底層的操做系統的通道通常都是全雙工的,因此全雙工的Channel比流能更好的映射底層操做系統的API。 Channel主要分兩大類:
後面代碼會涉及的ServerSocketChannel和SocketChannel都是SelectableChannel的子類。
Selector是Java NIO 編程的基礎。 Selector提供選擇已經就緒的任務的能力:Selector會不斷輪詢註冊在其上的Channel,若是某個Channel上面發生讀或者寫事件,這個Channel就處於就緒狀態,會被Selector輪詢出來,而後經過SelectionKey能夠獲取就緒Channel的集合,進行後續的I/O操做。 一個Selector能夠同時輪詢多個Channel,由於JDK使用了epoll()代替傳統的select實現,因此沒有最大鏈接句柄1024/2048的限制。因此,只須要一個線程負責Selector的輪詢,就能夠接入成千上萬的客戶端。 服務端代碼
/** * 服務端 */
public class Server {
/** * 默認端口 */
private static final Integer DEFAULT_PORT = 6780;
public void start() throws IOException {
start(DEFAULT_PORT);
}
public void start(Integer port) throws IOException {
// 打開多路複用選擇器
Selector selector = Selector.open();
// 打開服務端監聽通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 設置爲非阻塞模式
serverSocketChannel.configureBlocking(false);
// 綁定監聽的端口
serverSocketChannel.bind(new InetSocketAddress(port));
// 將選擇器綁定到監聽信道,只有非阻塞信道才能夠註冊選擇器.並在註冊過程當中指出該信道能夠進行Accept操做
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("小yu機器人啓動,監聽端口爲:" + port);
new Thread(new ServerHandler(selector)).start();
}
public static void main(String[] args) throws IOException {
new Server().start();
}
}
複製代碼
服務端處理器
public class ServerHandler implements Runnable{
private Selector selector;
public ServerHandler(Selector selector) {
this.selector = selector;
}
@Override
public void run() {
try {
while (true) {
// 等待某信道就緒(或超時)
if(selector.select(1000)==0){
// System.out.print("獨自等待.");
continue;
}
// 取得迭代器.selectedKeys()中包含了每一個準備好某一I/O操做的信道的SelectionKey
Iterator<SelectionKey> keyIterator=selector.selectedKeys().iterator();
while (keyIterator.hasNext()){
SelectionKey sk = keyIterator.next();
// 刪除已選的key 以防重負處理
keyIterator.remove();
// 處理key
handlerSelect(sk);
}
}
} catch (IOException e) {
}
}
private void handlerSelect(SelectionKey sk) throws IOException {
// 處理新接入的請求
if (sk.isAcceptable()) {
ServerSocketChannel ssc = (ServerSocketChannel) sk.channel();
//經過ServerSocketChannel的accept建立SocketChannel實例
//完成該操做意味着完成TCP三次握手,TCP物理鏈路正式創建
SocketChannel sc = ssc.accept();
//設置爲非阻塞的
sc.configureBlocking(false);
//註冊爲讀
sc.register(selector, SelectionKey.OP_READ);
}
// 讀操做
if (sk.isReadable()) {
String request, response;
SocketChannel sc = (SocketChannel) sk.channel();
// 建立一個ByteBuffer 並設置大小爲1m
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 獲取到讀取的字節長度
int readBytes = sc.read(byteBuffer);
// 判斷是否有數據
if (readBytes > 0) {
//將緩衝區當前的limit設置爲position=0,用於後續對緩衝區的讀取操做
byteBuffer.flip();
//根據緩衝區可讀字節數建立字節數組
byte[] bytes = new byte[byteBuffer.remaining()];
// 複製至新的緩衝字節流
byteBuffer.get(bytes);
request = new String(bytes, "UTF-8");
System.out.println("[" + Thread.currentThread().getName()+ "]" + "小yu機器人收到消息:" + request);
// 具體業務邏輯處理 查詢信息。
response = ResponseUtil.queryMessage(request);
//將消息編碼爲字節數組
byte[] responseBytes = response.getBytes();
//根據數組容量建立ByteBuffer
ByteBuffer writeBuffer = ByteBuffer.allocate(responseBytes.length);
//將字節數組複製到緩衝區
writeBuffer.put(responseBytes);
//flip操做
writeBuffer.flip();
//發送緩衝區的字節數組
sc.write(writeBuffer);
}
}
}
}
複製代碼
客戶端
/** * 客戶端 */
public class Client {
// 通道選擇器
private Selector selector;
// 與服務器通訊的通道
SocketChannel socketChannel;
/** * 默認端口 */
private static final Integer DEFAULT_PORT = 6780;
/** * 默認端口 */
private static final String DEFAULT_HOST = "127.0.0.1";
public void send(String key) throws IOException {
send(DEFAULT_PORT, DEFAULT_HOST, key);
}
public void send(int port,String host, String key) throws IOException {
init(port, host);
System.out.println("查詢的key爲:" + key);
//將消息編碼爲字節數組
byte[] bytes = key.getBytes();
//根據數組容量建立ByteBuffer
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
//將字節數組複製到緩衝區
writeBuffer.put(bytes);
//flip操做
writeBuffer.flip();
//發送緩衝區的字節數組
socketChannel.write(writeBuffer);
//****此處不含處理「寫半包」的代碼
}
public void init(int port,String host) throws IOException {
// 建立選擇器
selector = Selector.open();
// 設置連接的服務端地址
InetSocketAddress socketAddress = new InetSocketAddress(host, port);
// 打開通道
socketChannel = SocketChannel.open(socketAddress);// 非阻塞
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
new Thread(new ClientHandler(selector)).start();
}
public static void main(String[] args) throws IOException {
Scanner scanner = new Scanner(System.in);
Client client = new Client();
while (scanner.hasNext()) {
String key = scanner.next();
client.send(key);
}
}
}
複製代碼
客戶端處理器
public class ClientHandler implements Runnable {
private Selector selector;
public ClientHandler(Selector selector) {
this.selector = selector;
}
@Override
public void run() {
try {
while (true) {
if(selector.select(1000) < 0) {
continue;
}
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> selectionKeyIterator = keys.iterator();
while (selectionKeyIterator.hasNext()) {
SelectionKey sc = selectionKeyIterator.next();
selectionKeyIterator.remove();
//讀消息
if (sc.isReadable()) {
SocketChannel socketChannel = (SocketChannel) sc.channel();
//建立ByteBuffer,並開闢一個1M的緩衝區
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//讀取請求碼流,返回讀取到的字節數
int byteSize = socketChannel.read(byteBuffer);
if (byteSize > 0) {
//將緩衝區當前的limit設置爲position=0,用於後續對緩衝區的讀取操做
byteBuffer.flip();
//根據緩衝區可讀字節數建立字節數組 總長度減去空餘的
byte[] bytes = new byte[byteBuffer.remaining()];
// 複製至新的緩衝字節流
byteBuffer.get(bytes);
String message = new String(bytes, "UTF-8");
System.out.println(message);
}
}
}
}
} catch (IOException e) {
}
}
}
複製代碼
從代碼中 咱們也能看出來,nio解決的是阻塞與非阻塞的,經過selector輪詢上註冊的channel的狀態,來獲取對應準備就緒channel的 那麼請求者就不用一直去accpet阻塞,等待了。那爲何是同步呢,由於仍是咱們請求者不停的輪詢selector是否有徹底就緒的channel。
NIO 2.0引入了新的異步通道的概念,並提供了異步文件通道和異步套接字通道的實現。 異步的套接字通道時真正的異步非阻塞I/O,對應於UNIX網絡編程中的事件驅動I/O(AIO)。他不須要過多的Selector對註冊的通道進行輪詢便可實現異步讀寫,從而簡化了NIO的編程模型。 服務端代碼
/** * 異步非阻塞服務端 */
public class Sever {
/** * 默認端口 */
private static final Integer DEFAULT_PORT = 6780;
private AsynchronousServerSocketChannel serverChannel;
//做爲handler接收客戶端鏈接
class ServerCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Void> {
private AsynchronousServerSocketChannel serverChannel;
private ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
private CharBuffer charBuffer;
private CharsetDecoder decoder = Charset.defaultCharset().newDecoder();
public ServerCompletionHandler(AsynchronousServerSocketChannel serverChannel) {
this.serverChannel = serverChannel;
}
@Override
public void completed(AsynchronousSocketChannel result, Void attachment) {
//當即接收下一個請求,不停頓
serverChannel.accept(null, this);
try {
while (result.read(buffer).get() != -1) {
buffer.flip();
charBuffer = decoder.decode(buffer);
String request = charBuffer.toString().trim();
System.out.println("[" + Thread.currentThread().getName()+ "]" + "小yu機器人收到消息:" + request);
// 具體業務邏輯處理 查詢信息。
String response = ResponseUtil.queryMessage(request);
//將消息編碼爲字節數組
byte[] responseBytes = response.getBytes();
//根據數組容量建立ByteBuffer
ByteBuffer outBuffer = ByteBuffer.allocate(responseBytes.length);
//將字節數組複製到緩衝區
outBuffer.put(responseBytes);
//flip操做
outBuffer.flip();
//發送緩衝區的字節數組
result.write(outBuffer).get();
if (buffer.hasRemaining()) {
buffer.compact();
} else {
buffer.clear();
}
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} catch (CharacterCodingException e) {
e.printStackTrace();
} finally {
try {
result.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public void failed(Throwable exc, Void attachment) {
//當即接收下一個請求,不停頓
serverChannel.accept(null, this);
throw new RuntimeException("connection failed!");
}
}
public void init() throws IOException, InterruptedException {
init(DEFAULT_PORT);
}
public void init(Integer port) throws IOException, InterruptedException {
// 打開異步通道
this.serverChannel = AsynchronousServerSocketChannel.open();
// 判斷通道是否打開
if (serverChannel.isOpen()) {
serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 4 * 1024);
serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
serverChannel.bind(new InetSocketAddress(port));
} else {
throw new RuntimeException("Channel not opened!");
}
start(port);
}
public void start(Integer port) throws InterruptedException {
System.out.println("小yu機器人啓動,監聽端口爲:" + port);
this.serverChannel.accept(null, new ServerCompletionHandler(serverChannel));
// 保證線程不會掛了
while (true) {
Thread.sleep(5000);
}
}
public static void main(String[] args) throws IOException, InterruptedException {
Sever server = new Sever();
server.init();
}
}
複製代碼
客戶端
public class Client {
class ClientCompletionHandler implements CompletionHandler<Void, Void> {
private AsynchronousSocketChannel channel;
private CharBuffer charBufferr = null;
private CharsetDecoder decoder = Charset.defaultCharset().newDecoder();
private BufferedReader clientInput = new BufferedReader(new InputStreamReader(System.in));
public ClientCompletionHandler(AsynchronousSocketChannel channel) {
this.channel = channel;
}
@Override
public void completed(Void result, Void attachment) {
System.out.println("Input Client Reuest:");
String request;
try {
request = clientInput.readLine();
channel.write(ByteBuffer.wrap(request.getBytes()));
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
while(channel.read(buffer).get() != -1){
buffer.flip();
charBufferr = decoder.decode(buffer);
System.out.println(charBufferr.toString());
if(buffer.hasRemaining()){
buffer.compact();
}
else{
buffer.clear();
}
request = clientInput.readLine();
channel.write(ByteBuffer.wrap(request.getBytes())).get();
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
finally {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public void failed(Throwable exc, Void attachment) {
throw new RuntimeException("channel not opened!");
}
}
public void start() throws IOException, InterruptedException{
AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
if(channel.isOpen()){
channel.setOption(StandardSocketOptions.SO_RCVBUF, 128*1024);
channel.setOption(StandardSocketOptions.SO_SNDBUF, 128*1024);
channel.setOption(StandardSocketOptions.SO_KEEPALIVE,true);
channel.connect(new InetSocketAddress("127.0.0.1",6780),null,new ClientCompletionHandler(channel));
while(true){
Thread.sleep(5000);
}
}
else{
throw new RuntimeException("Channel not opened!");
}
}
public static void main(String[] args) throws IOException, InterruptedException{
Client client = new Client();
client.start();
}
}
複製代碼
先以一張表來直觀的對比一下: