前面聊到redis和rabbit,那咱們是如何他他們進行通訊的呢?因此想聊聊這個問題。在咱們分佈式架構中,一個重要的點就是通訊,客戶端和服務端的通訊、微服務之間、中間件。而通訊直接影響到用戶的體驗,好比個人服務器只能支持100個用戶同時和我通訊,而這個時候,有1000個用戶,那剩下的900的用戶,確定要等待。因此今天會聊到關於通訊的一些東西:BIO、NIO、TCP揮手和握手、零拷貝、以及七層網絡模型。java
java中提供一種通訊模型名爲Socket, 咱們能夠經過Socket去進行一些網絡通訊。redis
服務端服務器
public class SocketServer { public static void main(String[] args) { ServerSocket serverSocket= null; try { serverSocket = new ServerSocket(8080); //這裏是阻塞等待 Socket socket=serverSocket.accept(); //對客戶端進行信息的接受 BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(socket.getInputStream())); String readLine = bufferedReader.readLine(); System.out.println("接受客戶端消息:"+readLine); BufferedWriter bufferedWriter=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); bufferedWriter.write("我收到了信息\n"); bufferedWriter.flush(); } catch (IOException e) { e.printStackTrace(); } } }客戶端網絡
public class SocketClient { public static void main(String[] args) { try { Socket socket= new Socket("localhost",8080); //對服務端進行消息的發送 BufferedWriter bufferedWriter=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); bufferedWriter.write("我是客戶端一,發送消息!\n"); bufferedWriter.flush(); BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(socket.getInputStream())); String result = bufferedReader.readLine(); System.out.println("服務端返回消息:"+result); } catch (IOException e) { e.printStackTrace(); } } }socket的簡單通訊總體流程:架構
- 服務端:在服務端創建一個監聽(這個時候服務端是一個阻塞狀態)
- 客戶端:創建一個鏈接,這個時候,服務端的阻塞被喚醒,獲得一個socket。而後客戶端經過OutputStream進行傳輸
- 服務端:經過inputStream去獲取信息,由於tcp是一個雙工的,因此能夠經過一個OutputStream去寫回到客戶端
- 客戶端:經過inputStream獲取服務端的返回數據
網絡分層(當他們創建鏈接的時候,底層涉及到這個點)不論是tcp仍是udp他們的傳輸都會涉及到七層的網絡模型負載均衡
- 應用層:爲咱們的應用程序提供服務
- 表示層:對數據進行格式化轉換、加密
- 會話層:創建、管理、以及維護會話
- 傳輸層:創建、管理和維護端到端的鏈接
- 網絡層:IP選址及路由選擇
- 數據鏈路層:提供介質訪問和鏈路管理
- 物理層:底層的物理傳輸
負載和網絡分層異步
- 二層負載是利用的Mac頭(數據鏈路層)、
- 三層負載是Ip(網絡層)咱們能夠在第三層對ip進行修改進行數據包的路由、
- 四層負載(傳輸層)TCP屬於這一層,咱們利用出傳輸層中的ip和端口號去進行一個負載均衡的計算(Nginx其實就是基於這一層進行的負載)
- 七層負載(應用層):根據URL進行負載,就行controller同樣
對於TCP/IP有四層和上面的這七層相對應:爲何叫TCP/IP?由於TCP是傳輸層,在咱們的網絡層(IP)層之上socket
- 應用層:應用層、表示層、會話層
- 傳輸層:傳輸層
- 網絡層:網絡層
- 網絡接口層:數據鏈路層、物理層
當客戶端發送一個tcp請求會通過一下幾個步驟:tcp
- 發起一個請求,會把tcp頭和數據報文放在一塊兒
- 向下走會增長一個ip頭,拼接到上面的數據中
- 接着拼接Mac頭(服務端的網卡地址),當服務端進行簽收的時候會去確認是否是和本身Mac地址同樣
- 這些數據就都變成二進制進行傳輸
服務端通過的步驟分佈式
- 接收到二進制數據
- 向上傳輸對Mac頭進行解析,這一步拿傳遞過來的Mac地址和當前的Mac地址進行匹配,若是匹配繼續傳輸
- 上面一層對Ip頭進行解析,若是ip是本身則向上傳遞,不是則轉發到別的地址
- 最上面一層獲取tcp頭,去匹配服務端的進程,進程中去獲取數據報文進行處理。
問題:咱們怎麼知道服務端的Mac地址呢?
那就是ARP協議(網絡層),流程爲,他將含目標IP地址的ARP請求廣播到網絡上的全部主機,想符合的主機則會返回Mac信息,從而得知Mac地址。IP地址表示服務器所在的位置,而Mac地址表示的是惟一的身份證實,每一個主機都是惟一的(Mac是刻在網卡上的)。
爲何咱們在實際的網絡過程當中考慮的是Tcp而不是Udp?
由於TCP網絡傳輸的可靠性:
- 三次握手
- 咱們的兩個節點通訊,須要經過三個數據包來肯定鏈接的創建。用服務器A和B舉個例子
- 服務器A給B說:我要和你通訊 (第一次握手)這一步是肯定服務器B是不是可以使用狀態,若是不可用服務器A會不斷重試
- 服務器B相應:一切正常(第二次握手) 這一步是告訴服務器A我這邊能夠進行鏈接,防止服務器A不斷的重試。
- 服務器A說:那咱們開始通訊吧(第三次握手)服務器A收到了服務器B的返回消息,他須要告訴服務器B他收到了消息,覺得不排除在服務器A在收到B的相應後掛了的可能性,
- 流量控制
- 斷開機制(四次揮手):
- A發送關閉消息給B(第一次揮手)這個時候B就知道A沒有消息要發送了
- B發送消息給A說:我已經收到關閉消息了,可是等我通知你後,你再進行關閉(第二次揮手)這個時候不直接關閉,由於B可能還有數據沒有處理完成,
- B發送消息給A說能夠我處理數據完成,能夠進行關閉。(第三次揮手)
- A給B說我關閉鏈接了(第四次揮手)
上面咱們運行的代碼是一個客戶端對一個服務端,(服務端在等待獲取數據的時候是阻塞狀態)可是在真實的場景中,不可能只有一個用戶鏈接服務器,那是否是能夠當一個請求過來的時候,開啓線程去處理
客戶端
View Codepublic class ServerSocketDemo { static ExecutorService executorService= Executors.newFixedThreadPool(20); public static void main(String[] args) { ServerSocket serverSocket=null; try { //localhost: 8080 serverSocket=new ServerSocket(8080); while(true) { Socket socket = serverSocket.accept(); //監聽客戶端鏈接(鏈接阻塞) System.out.println(socket.getPort()); executorService.execute(new SocketThread(socket)); //異步 } } catch (IOException e) { e.printStackTrace(); } finally { //TODO if(serverSocket!=null){ try { serverSocket.close(); } catch (IOException e) { e.printStackTrace(); } } } } } public class SocketThread implements Runnable{ private Socket socket; public SocketThread(Socket socket) { this.socket = socket; } @Override public void run() { try { BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));//輸入流 String s = bufferedReader.readLine(); //被阻塞了 String clientStr = s; //讀取客戶端的一行數據 System.out.println("接收到客戶端的信息:" + clientStr); //寫回去 BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); bufferedWriter.write("我收到了信息\n"); bufferedWriter.flush(); bufferedReader.close(); bufferedWriter.close(); }catch (Exception e){ } } }服務端
View Codepublic class SocketClientDemo1 { public static void main(String[] args) { try { Socket socket=new Socket("localhost",8080); BufferedWriter bufferedWriter=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); bufferedWriter.write("我是客戶端1,發送了一個消息\n"); bufferedWriter.flush(); BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(socket.getInputStream()));//輸入流 String serverLine=bufferedReader.readLine(); //讀取服務端返回的數據 System.out.println("服務端返回的數據:"+serverLine); } catch (IOException e) { e.printStackTrace(); } } } public class SocketClientDemo { public static void main(String[] args) { try { Socket socket=new Socket("localhost",8080); BufferedWriter bufferedWriter=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); bufferedWriter.write("我是客戶端,發送了一個消息\n"); bufferedWriter.flush(); BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(socket.getInputStream()));//輸入流 String serverLine=bufferedReader.readLine(); //讀取服務端返回的數據(被阻塞了) System.out.println("服務端返回的數據:"+serverLine); } catch (IOException e) { e.printStackTrace(); } } }
流程以下:
當客戶端傳遞請求到服務端,首先在阻塞在accept這裏,而後開啓線程去處理IO請求,這裏可會阻塞,那線程的數量如何控制?若是鏈接數大於線程數,勢必會有一些請求丟失,那如何提高鏈接數?這就出現了非阻塞IO(NIO)
有兩個阻塞的地方:IO阻塞、鏈接阻塞
非阻塞指的是:鏈接非阻塞、IO非阻塞
服務端:
View Codepublic class NioClient { public static void main(String[] args) { try { SocketChannel socketChannel=SocketChannel.open(); /*socketChannel.configureBlocking(false);*/ socketChannel.connect(new InetSocketAddress("localhost",8080)); //若是鏈接已經創建 if (socketChannel.isConnectionPending()){ //完成鏈接 socketChannel.finishConnect(); } //這裏並不意味着鏈接已經創建 ByteBuffer byteBuffer=ByteBuffer.allocate(1024); byteBuffer.put("Hi,I am client".getBytes()); byteBuffer.flip(); socketChannel.write(byteBuffer); byteBuffer.clear(); //讀取服務端返回的數據 (這裏實際上是阻塞的) // 有一個坑,就是若是上面設置了非阻塞,下面這裏在等待服務端返回結果,服務端是不會返回結果的,由於不阻塞的話,這裏的鏈接就已經關閉了 // 因此想要收到服務端的返回結果就註釋上面的configureBlocking int read = socketChannel.read(byteBuffer); if (read>0){ System.out.println("服務端的數據::"+new String(byteBuffer.array())); } else { System.out.println("服務端沒有數據返回"); } } catch (IOException e) { e.printStackTrace(); } } }客戶端
View Codepublic class NioClient { public static void main(String[] args) { try { SocketChannel socketChannel=SocketChannel.open(); /*socketChannel.configureBlocking(false);*/ socketChannel.connect(new InetSocketAddress("localhost",8080)); //若是鏈接已經創建 if (socketChannel.isConnectionPending()){ //完成鏈接 socketChannel.finishConnect(); } //這裏並不意味着鏈接已經創建 ByteBuffer byteBuffer=ByteBuffer.allocate(1024); byteBuffer.put("Hi,I am client".getBytes()); byteBuffer.flip(); socketChannel.write(byteBuffer); byteBuffer.clear(); //讀取服務端返回的數據 (這裏實際上是阻塞的) // 有一個坑,就是若是上面設置了非阻塞,下面這裏在等待服務端返回結果,服務端是不會返回結果的,由於不阻塞的話,這裏的鏈接就已經關閉了 // 因此想要收到服務端的返回結果就註釋上面的configureBlocking int read = socketChannel.read(byteBuffer); if (read>0){ System.out.println("服務端的數據::"+new String(byteBuffer.array())); } else { System.out.println("服務端沒有數據返回"); } } catch (IOException e) { e.printStackTrace(); } } }
咱們能夠看出服務端在不斷的詢問鏈接,可是採用這種輪詢的方式,會不會很消耗性能?因此這裏就引出了多路複用機制。
多路複用【selector】
多路複用指的是一個線程管理多個通道,那一個線程就是咱們的selector,簡單來講就是把channel註冊到selector上,註冊的事件分爲,讀事件、寫事件、鏈接事件、接收事件、一旦有一個事件通知的話,咱們的selector就由阻塞變成非阻塞,這個時候他就拿到對應的通道進行處理,當他處理的時候必定是一個能夠執行的通道,不像咱們上圖展現的那種(出現鏈接未就緒的狀況發生).流程以下:
代碼流程以下:
【客戶端】:註冊一個鏈接時間到他的selector中,客戶端的selector發現有一個鏈接事件,而後就處理這個事件發送消息到服務端,而且註冊一個讀的事件
【服務端】:註冊一個接受事件,當客戶端發送一個消息過來後,服務端的selector就註冊一個接受事件,在這個事件中他給客戶端發送一個【收到】的消息,而且也註冊一個讀的事件。他的selector發現了這個讀的事件後就開始讀取服務端傳遞過來的數據。
【客戶端】:selector發現了讀的事件後,就開始讀取服務端傳遞過來的【收到】的信息。
服務端:
View Codepublic class NIOServer { //多路複用 static Selector selector; public static void main(String[] args) { try { //打開多路複用 selector= Selector.open(); ServerSocketChannel serverSocketChannel=ServerSocketChannel.open(); //這是設置非阻塞 serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(8080)); //把接收事件註冊到多路複用器上 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true){ //只有事件到達的時候他纔會被喚醒,不然是阻塞狀態。 selector.select(); //這裏是全部可使用的channel,下面對這些可使用的事件進行輪詢操做 Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey next = iterator.next(); // 一旦操做就對把事件進行移除 iterator.remove(); if (next.isAcceptable()){//寫事件 copeWithAccept(next); }else if (next.isReadable()){//讀事件 copeWithRead(next); } } } } catch (IOException e) { e.printStackTrace(); } } //處理寫事件 private static void copeWithAccept(SelectionKey selectionKey){ //可使用的channel ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel(); try { //這裏就是接受一個鏈接 SocketChannel accept = channel.accept(); //打開非阻塞 accept.configureBlocking(false); //註冊一個事件(由於上面寫了一些東西,如今讀的話也要註冊一個讀的事件) //這個時候上面輪詢的時候就發現有一個讀的事件準備就緒了 accept.register(selector,SelectionKey.OP_READ); accept.write(ByteBuffer.wrap("Hello,I am server".getBytes())); } catch (IOException e) { e.printStackTrace(); } } //處理讀事件 private static void copeWithRead(SelectionKey selectionKey){ //這裏拿到的通道就是上面註冊(copeWithAccept)的要讀的通道 SocketChannel socketChannel= (SocketChannel) selectionKey.channel(); ByteBuffer byteBuffer=ByteBuffer.allocate(1024); try { socketChannel.read(byteBuffer); System.out.println(new String(byteBuffer.array())); } catch (IOException e) { e.printStackTrace(); } } }客戶端:
View Codepublic class NioClient { //多路複用 static Selector selector; public static void main(String[] args) { try { //打開多路複用 selector=Selector.open(); SocketChannel socketChannel=SocketChannel.open(); socketChannel.configureBlocking(false); socketChannel.connect(new InetSocketAddress("localhost",8080)); //註冊一個鏈接事件 socketChannel.register(selector, SelectionKey.OP_CONNECT); while (true){ //仍是當有事件發生的時候才觸發 selector.select(); //同樣,輪詢全部註冊的事件 Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey next = iterator.next(); iterator.remove(); if (next.isConnectable()){//鏈接事件 copeWithAccept(next); }else if (next.isReadable()){//讀事件 copeWithRead(next); } } } } catch (IOException e) { e.printStackTrace(); } } //處理鏈接事件 private static void copeWithAccept(SelectionKey selectionKey) throws IOException { //可使用的channel SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); if (socketChannel.isConnectionPending()) { socketChannel.finishConnect(); } socketChannel.configureBlocking(false); socketChannel.write(ByteBuffer.wrap("Hello Serve,I am client".getBytes())); //註冊一個讀取的事件 socketChannel.register(selector,SelectionKey.OP_READ); } //處理讀事件 private static void copeWithRead(SelectionKey selectionKey) throws IOException { //這裏拿到的通道就是上面註冊(copeWithAccept)的要讀的通道 SocketChannel socketChannel= (SocketChannel) selectionKey.channel(); ByteBuffer byteBuffer=ByteBuffer.allocate(1024); socketChannel.read(byteBuffer); System.out.println("client receive:"+new String(byteBuffer.array())); } }
當咱們要操做一個文件發送給其餘服務器的時候,有三次拷貝
- 把文件拷貝到內核空間
- 而後從內核空間拷貝到用戶空間
- 而後再從用戶空間把這個數據拷貝到內核空間,經過網卡發送到其餘的服務器上去
零拷貝的意思是,咱們不通過用戶空間,直接從磁盤的內核空間進行發送。後面咱們聊Netty的時候會聊到這。
實現方式:
- 把內核空間和用戶空間映射在一塊兒(MMAP)
- 使用現成的API