分佈式-通訊(NIO&BIO&網絡模型&零拷貝)

分佈式-通訊(NIO&BIO&網絡模型&零拷貝)

前面聊到redis和rabbit,那咱們是如何他他們進行通訊的呢?因此想聊聊這個問題。在咱們分佈式架構中,一個重要的點就是通訊,客戶端和服務端的通訊、微服務之間、中間件。而通訊直接影響到用戶的體驗,好比個人服務器只能支持100個用戶同時和我通訊,而這個時候,有1000個用戶,那剩下的900的用戶,確定要等待。因此今天會聊到關於通訊的一些東西:BIO、NIO、TCP揮手和握手、零拷貝、以及七層網絡模型。java

 Java中通訊-Socket

 java中提供一種通訊模型名爲Socket, 咱們能夠經過Socket去進行一些網絡通訊。redis

服務端服務器

public class SocketServer  {
    public static void main(String[] args) {
        ServerSocket serverSocket= null;
        try {
            serverSocket = new ServerSocket(8080);
            //這裏是阻塞等待
            Socket socket=serverSocket.accept();
            //對客戶端進行信息的接受
            BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(socket.getInputStream()));
            String readLine = bufferedReader.readLine();
            System.out.println("接受客戶端消息:"+readLine);
            BufferedWriter bufferedWriter=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
            bufferedWriter.write("我收到了信息\n");
            bufferedWriter.flush();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

客戶端網絡

public class SocketClient {
    public static void main(String[] args) {
        try {
            Socket socket= new Socket("localhost",8080);
            //對服務端進行消息的發送
            BufferedWriter bufferedWriter=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
            bufferedWriter.write("我是客戶端一,發送消息!\n");
            bufferedWriter.flush();
            BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(socket.getInputStream()));
            String result = bufferedReader.readLine();
            System.out.println("服務端返回消息:"+result);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

socket的簡單通訊總體流程:架構

  • 服務端:在服務端創建一個監聽(這個時候服務端是一個阻塞狀態)
  • 客戶端:創建一個鏈接,這個時候,服務端的阻塞被喚醒,獲得一個socket。而後客戶端經過OutputStream進行傳輸
  • 服務端:經過inputStream去獲取信息,由於tcp是一個雙工的,因此能夠經過一個OutputStream去寫回到客戶端
  • 客戶端:經過inputStream獲取服務端的返回數據

網絡分層(當他們創建鏈接的時候,底層涉及到這個點)不論是tcp仍是udp他們的傳輸都會涉及到七層的網絡模型負載均衡

  • 應用層:爲咱們的應用程序提供服務
  • 表示層:對數據進行格式化轉換、加密
  • 會話層:創建、管理、以及維護會話
  • 傳輸層:創建、管理和維護端到端的鏈接
  • 網絡層:IP選址及路由選擇
  • 數據鏈路層:提供介質訪問和鏈路管理
  • 物理層:底層的物理傳輸

負載和網絡分層異步

  • 二層負載是利用的Mac頭(數據鏈路層)、
  • 三層負載是Ip(網絡層)咱們能夠在第三層對ip進行修改進行數據包的路由、
  • 四層負載(傳輸層)TCP屬於這一層,咱們利用出傳輸層中的ip和端口號去進行一個負載均衡的計算(Nginx其實就是基於這一層進行的負載)
  • 七層負載(應用層):根據URL進行負載,就行controller同樣

對於TCP/IP有四層和上面的這七層相對應:爲何叫TCP/IP?由於TCP是傳輸層,在咱們的網絡層(IP)層之上socket

  • 應用層:應用層、表示層、會話層
  • 傳輸層:傳輸層
  • 網絡層:網絡層
  • 網絡接口層:數據鏈路層、物理層

當客戶端發送一個tcp請求會通過一下幾個步驟:tcp

  • 發起一個請求,會把tcp頭和數據報文放在一塊兒
  • 向下走會增長一個ip頭,拼接到上面的數據中
  • 接着拼接Mac頭(服務端的網卡地址),當服務端進行簽收的時候會去確認是否是和本身Mac地址同樣 
  • 這些數據就都變成二進制進行傳輸

服務端通過的步驟分佈式

  • 接收到二進制數據
  • 向上傳輸對Mac頭進行解析,這一步拿傳遞過來的Mac地址和當前的Mac地址進行匹配,若是匹配繼續傳輸
  • 上面一層對Ip頭進行解析,若是ip是本身則向上傳遞,不是則轉發到別的地址
  • 最上面一層獲取tcp頭,去匹配服務端的進程,進程中去獲取數據報文進行處理。

問題:咱們怎麼知道服務端的Mac地址呢?

那就是ARP協議(網絡層),流程爲,他將含目標IP地址的ARP請求廣播到網絡上的全部主機,想符合的主機則會返回Mac信息,從而得知Mac地址。IP地址表示服務器所在的位置,而Mac地址表示的是惟一的身份證實,每一個主機都是惟一的(Mac是刻在網卡上的)。

爲何咱們在實際的網絡過程當中考慮的是Tcp而不是Udp?

由於TCP網絡傳輸的可靠性:

  • 三次握手
    • 咱們的兩個節點通訊,須要經過三個數據包來肯定鏈接的創建。用服務器A和B舉個例子
      • 服務器A給B說:我要和你通訊 (第一次握手這一步是肯定服務器B是不是可以使用狀態,若是不可用服務器A會不斷重試
      • 服務器B相應:一切正常(第二次握手 這一步是告訴服務器A我這邊能夠進行鏈接,防止服務器A不斷的重試。
      • 服務器A說:那咱們開始通訊吧(第三次握手)服務器A收到了服務器B的返回消息,他須要告訴服務器B他收到了消息,覺得不排除在服務器A在收到B的相應後掛了的可能性,
  • 流量控制
  • 斷開機制(四次揮手)
    • A發送關閉消息給B(第一次揮手這個時候B就知道A沒有消息要發送了
    • B發送消息給A說:我已經收到關閉消息了,可是等我通知你後,你再進行關閉(第二次揮手這個時候不直接關閉,由於B可能還有數據沒有處理完成
    • B發送消息給A說能夠我處理數據完成,能夠進行關閉。第三次揮手
    • A給B說我關閉鏈接(第四次揮手)

 IO阻塞怎麼辦【BIO】?

上面咱們運行的代碼是一個客戶端對一個服務端,(服務端在等待獲取數據的時候是阻塞狀態)可是在真實的場景中,不可能只有一個用戶鏈接服務器,那是否是能夠當一個請求過來的時候,開啓線程去處理

客戶端

public class ServerSocketDemo {

    static ExecutorService executorService= Executors.newFixedThreadPool(20);

    public static void main(String[] args) {
        ServerSocket serverSocket=null;
        try {
            //localhost: 8080
            serverSocket=new ServerSocket(8080);
            while(true) {
                Socket socket = serverSocket.accept(); //監聽客戶端鏈接(鏈接阻塞)
                System.out.println(socket.getPort());
                executorService.execute(new SocketThread(socket)); //異步
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            //TODO
            if(serverSocket!=null){
                try {
                    serverSocket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
public class SocketThread implements Runnable{

    private Socket socket;

    public SocketThread(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        try {
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));//輸入流
            String s = bufferedReader.readLine(); //被阻塞了
            String clientStr = s; //讀取客戶端的一行數據
            System.out.println("接收到客戶端的信息:" + clientStr);
            //寫回去
            BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
            bufferedWriter.write("我收到了信息\n");
            bufferedWriter.flush();

            bufferedReader.close();
            bufferedWriter.close();
        }catch (Exception e){

        }
    }
}
View Code

服務端

public class SocketClientDemo1 {
    public static void main(String[] args) {
        try {
            Socket socket=new Socket("localhost",8080);
            BufferedWriter bufferedWriter=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
            bufferedWriter.write("我是客戶端1,發送了一個消息\n");
            bufferedWriter.flush();
            BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(socket.getInputStream()));//輸入流
            String serverLine=bufferedReader.readLine(); //讀取服務端返回的數據
            System.out.println("服務端返回的數據:"+serverLine);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
public class SocketClientDemo {
    public static void main(String[] args) {
        try {
            Socket socket=new Socket("localhost",8080);
            BufferedWriter bufferedWriter=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
            bufferedWriter.write("我是客戶端,發送了一個消息\n");
            bufferedWriter.flush();
            BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(socket.getInputStream()));//輸入流
            String serverLine=bufferedReader.readLine(); //讀取服務端返回的數據(被阻塞了)
            System.out.println("服務端返回的數據:"+serverLine);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
View Code

 流程以下:

當客戶端傳遞請求到服務端,首先在阻塞在accept這裏,而後開啓線程去處理IO請求,這裏可會阻塞,那線程的數量如何控制?若是鏈接數大於線程數,勢必會有一些請求丟失,那如何提高鏈接數?這就出現了非阻塞IO(NIO)

有兩個阻塞的地方IO阻塞、鏈接阻塞

 非阻塞IO(NIO)

非阻塞指的是:鏈接非阻塞、IO非阻塞

服務端:

public class NioClient {
    public static void main(String[] args)  {
        try {
            SocketChannel socketChannel=SocketChannel.open();
            /*socketChannel.configureBlocking(false);*/
            socketChannel.connect(new InetSocketAddress("localhost",8080));
            //若是鏈接已經創建
            if (socketChannel.isConnectionPending()){
                //完成鏈接
                socketChannel.finishConnect();
            }
            //這裏並不意味着鏈接已經創建
            ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
            byteBuffer.put("Hi,I am client".getBytes());
            byteBuffer.flip();
            socketChannel.write(byteBuffer);
            byteBuffer.clear();
            //讀取服務端返回的數據 (這裏實際上是阻塞的)
            // 有一個坑,就是若是上面設置了非阻塞,下面這裏在等待服務端返回結果,服務端是不會返回結果的,由於不阻塞的話,這裏的鏈接就已經關閉了
            // 因此想要收到服務端的返回結果就註釋上面的configureBlocking
            int read = socketChannel.read(byteBuffer);
            if (read>0){
                System.out.println("服務端的數據::"+new String(byteBuffer.array()));
            }
            else {
                System.out.println("服務端沒有數據返回");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}
View Code

客戶端

public class NioClient {
    public static void main(String[] args)  {
        try {
            SocketChannel socketChannel=SocketChannel.open();
            /*socketChannel.configureBlocking(false);*/
            socketChannel.connect(new InetSocketAddress("localhost",8080));
            //若是鏈接已經創建
            if (socketChannel.isConnectionPending()){
                //完成鏈接
                socketChannel.finishConnect();
            }
            //這裏並不意味着鏈接已經創建
            ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
            byteBuffer.put("Hi,I am client".getBytes());
            byteBuffer.flip();
            socketChannel.write(byteBuffer);
            byteBuffer.clear();
            //讀取服務端返回的數據 (這裏實際上是阻塞的)
            // 有一個坑,就是若是上面設置了非阻塞,下面這裏在等待服務端返回結果,服務端是不會返回結果的,由於不阻塞的話,這裏的鏈接就已經關閉了
            // 因此想要收到服務端的返回結果就註釋上面的configureBlocking
            int read = socketChannel.read(byteBuffer);
            if (read>0){
                System.out.println("服務端的數據::"+new String(byteBuffer.array()));
            }
            else {
                System.out.println("服務端沒有數據返回");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}
View Code

 

 咱們能夠看出服務端在不斷的詢問鏈接,可是採用這種輪詢的方式,會不會很消耗性能?因此這裏就引出了多路複用機制。

多路複用【selector】

多路複用指的是一個線程管理多個通道,那一個線程就是咱們的selector,簡單來講就是把channel註冊到selector上,註冊的事件分爲,讀事件、寫事件、鏈接事件、接收事件、一旦有一個事件通知的話,咱們的selector就由阻塞變成非阻塞,這個時候他就拿到對應的通道進行處理,當他處理的時候必定是一個能夠執行的通道,不像咱們上圖展現的那種(出現鏈接未就緒的狀況發生).流程以下:

代碼流程以下:

【客戶端】:註冊一個鏈接時間到他的selector中,客戶端的selector發現有一個鏈接事件,而後就處理這個事件發送消息到服務端,而且註冊一個讀的事件

【服務端】:註冊一個接受事件,當客戶端發送一個消息過來後,服務端的selector就註冊一個接受事件,在這個事件中他給客戶端發送一個【收到】的消息,而且也註冊一個讀的事件。他的selector發現了這個讀的事件後就開始讀取服務端傳遞過來的數據。

【客戶端】:selector發現了讀的事件後,就開始讀取服務端傳遞過來的【收到】的信息。

服務端:

public class NIOServer {
    //多路複用
    static Selector selector;
    public static void main(String[] args) {
        try {
            //打開多路複用
            selector= Selector.open();
            ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
            //這是設置非阻塞
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.socket().bind(new InetSocketAddress(8080));
            //把接收事件註冊到多路複用器上
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            while (true){
                //只有事件到達的時候他纔會被喚醒,不然是阻塞狀態。
                selector.select();
                //這裏是全部可使用的channel,下面對這些可使用的事件進行輪詢操做
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey next = iterator.next();
                    // 一旦操做就對把事件進行移除
                    iterator.remove();
                    if (next.isAcceptable()){//寫事件
                        copeWithAccept(next);
                    }else if (next.isReadable()){//讀事件
                        copeWithRead(next);
                    }
                }
            }

        } catch (IOException  e) {
            e.printStackTrace();
        }
    }

    //處理寫事件
    private static   void  copeWithAccept(SelectionKey selectionKey){
        //可使用的channel
        ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel();
        try {
            //這裏就是接受一個鏈接
            SocketChannel accept = channel.accept();
            //打開非阻塞
            accept.configureBlocking(false);
            //註冊一個事件(由於上面寫了一些東西,如今讀的話也要註冊一個讀的事件)
            //這個時候上面輪詢的時候就發現有一個讀的事件準備就緒了
            accept.register(selector,SelectionKey.OP_READ);
            accept.write(ByteBuffer.wrap("Hello,I am server".getBytes()));
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
    //處理讀事件
    private  static void  copeWithRead(SelectionKey selectionKey){
        //這裏拿到的通道就是上面註冊(copeWithAccept)的要讀的通道
        SocketChannel socketChannel= (SocketChannel) selectionKey.channel();
        ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
        try {
           socketChannel.read(byteBuffer);
            System.out.println(new String(byteBuffer.array()));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
View Code

客戶端:

public class NioClient {
    //多路複用
    static Selector selector;
    public static void main(String[] args)  {
        try {
            //打開多路複用
            selector=Selector.open();
            SocketChannel socketChannel=SocketChannel.open();
            socketChannel.configureBlocking(false);
            socketChannel.connect(new InetSocketAddress("localhost",8080));
            //註冊一個鏈接事件
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
            while (true){
                //仍是當有事件發生的時候才觸發
                selector.select();
                //同樣,輪詢全部註冊的事件
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey next = iterator.next();
                    iterator.remove();
                    if (next.isConnectable()){//鏈接事件
                        copeWithAccept(next);
                    }else if (next.isReadable()){//讀事件
                        copeWithRead(next);
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
    //處理鏈接事件
    private static   void  copeWithAccept(SelectionKey selectionKey) throws IOException {
        //可使用的channel
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        if (socketChannel.isConnectionPending()) {
            socketChannel.finishConnect();
        }
        socketChannel.configureBlocking(false);
        socketChannel.write(ByteBuffer.wrap("Hello Serve,I am  client".getBytes()));
        //註冊一個讀取的事件
        socketChannel.register(selector,SelectionKey.OP_READ);
    }
    //處理讀事件
    private  static void  copeWithRead(SelectionKey selectionKey) throws IOException {
        //這裏拿到的通道就是上面註冊(copeWithAccept)的要讀的通道
        SocketChannel socketChannel= (SocketChannel) selectionKey.channel();
        ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
        socketChannel.read(byteBuffer);
        System.out.println("client receive:"+new String(byteBuffer.array()));
    }
}
View Code

 零拷貝

當咱們要操做一個文件發送給其餘服務器的時候,三次拷貝

  • 把文件拷貝到內核空間
  • 而後從內核空間拷貝到用戶空間
  • 而後再從用戶空間把這個數據拷貝到內核空間,經過網卡發送到其餘的服務器上去

零拷貝的意思是,咱們不通過用戶空間,直接從磁盤的內核空間進行發送。後面咱們聊Netty的時候會聊到這。

實現方式:

  • 把內核空間和用戶空間映射在一塊兒(MMAP)
  • 使用現成的API

相關文章
相關標籤/搜索