java網絡編程實戰 - 基於BIO的僞異步、高併發、全雙工、長鏈接持續消息IO的網絡編程

前言編程

TCP是面向鏈接的通訊協議,經過三次握手創建鏈接,通信完成時要拆除鏈接,因爲TCP是面向鏈接的因此只能用於端到端的通信。服務器

TCP提供的是一種可靠的數據流服務,採用帶重傳的確定確認技術來實現傳輸的可靠性。TCP還採用一種稱爲滑動窗口的方式進行流量控制,所謂窗口實際表示接收能力,用以限制發送方的發送速度。網絡

若是IP數據包中有已經封好的TCP數據包,那麼IP將把它們向傳送到TCP層。TCP將包排序並進行錯誤檢查,同時實現虛電路間的鏈接。TCP數據包中包括序號和確認,因此未按照順序收到的包能夠被排序,而損壞的包能夠被重傳。併發

        TCP將它的信息送到更高層的應用程序,例如Telnet的服務程序和客戶程序。應用程序輪流將信息送回TCP層,TCP層便將它們向下傳送到IP層,設備驅動程序和物理介質,最後到接收方。框架


基於TCP通信的三次握手異步

image.png

原生網絡編程BIOsocket

  服務端提供IP和監聽端口,客戶端經過鏈接操做想服務端監聽的地址發起鏈接請求,經過三次握手鍊接,若是鏈接成功創建,雙方就能夠經過套接字進行通訊。ide

傳統的同步阻塞模型開發中,ServerSocket負責綁定IP地址,啓動監聽端口;Socket負責發起鏈接操做。鏈接成功後,雙方經過輸入和輸出流進行同步阻塞式通訊。 高併發

傳統BIO通訊模型:採用BIO通訊模型的服務端,一般由一個獨立的Acceptor線程負責監聽客戶端的鏈接,它接收到客戶端鏈接請求以後爲每一個客戶端建立一個新的線程進行鏈路處理沒處理完成後,經過輸出流返回應答給客戶端,線程銷燬。即典型的一請求一應答模型。性能

該模型最大的問題就是缺少彈性伸縮能力,當客戶端併發訪問量增長後,服務端的線程個數和客戶端併發訪問數呈1:1的正比關係,Java中的線程也是比較寶貴的系統資源,線程數量快速膨脹後,系統的性能將急劇降低,隨着訪問量的繼續增大,系統最終就--

爲了改進這種一鏈接一線程的模型,咱們可使用線程池來管理這些線程,實現1個或多個線程處理N個客戶端的模型(可是底層仍是使用的同步阻塞I/O),一般被稱爲僞異步I/O模型


image.png

代碼實戰

有以上網絡通訊知識和BIO模型的基礎瞭解後,咱們進入代碼實戰:來實現一個基於BIO的僞異步、高併發、全雙工、長鏈接服務器編程模型。


服務端實現

/**
 * @author andychen https://blog.51cto.com/14815984
 * BIO 服務器端
 * */

public class Server {
    /**
     * 鏈接處理請求服務池
     * */
    private static final ExecutorService executorService  = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    /*調用實現服務器-客戶端TCP通信*/
    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket();
            //建立服務器地址
            SocketAddress socketAddress = new InetSocketAddress(Constant.SERVER, Constant.SERV_PORT);
            //綁定socket服務端IP+端口
            serverSocket.bind(socketAddress);

            //循環監聽
            System.out.println("================Start listen message===========================");
            for (;;){
                //接收數據:因消息返回前這裏會阻塞
                Socket result = serverSocket.accept();
                //接收到數據在單獨線程中處理
                executorService.execute(new ServerTask(result));
            }
        } finally {
            if(null != serverSocket && !serverSocket.isClosed()){
                serverSocket.close();
            }
        }
    }

    /*定義服務器任務線程*/
    static class ServerTask implements Runnable{
        private Socket socket = null;
        private List<String> msgList = new LinkedList<>();
        public ServerTask(Socket socket) {
            this.socket = socket;
        }
        /*處理讀入和寫出消息數據*/
        @Override
        public void run() {
            String msg = null;
            String newMsg = null;
            //ARM寫法
            try(BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
                OutputStream outputStream = new ObjectOutputStream(this.socket.getOutputStream())){
                 //讀取返回的數據
                 while (!Thread.interrupted()){
                     msg = bufferedReader.readLine();
                     System.out.println("Client sent message: "+msg);

                     //雙向消息反饋
                     newMsg = "Server received the sent message: "+msg;
                     ((ObjectOutputStream) outputStream).writeUTF(newMsg);
                     outputStream.flush();
                 }
             } catch (IOException e) {
                 e.printStackTrace();
             } finally {
                if(!this.socket.isClosed()){
                    try {
                        this.socket.close();
                        this.socket = null;
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
             }
        }
    }
}


客戶端實現


/**
 * @author andychen https://blog.51cto.com/14815984
 * BIO 客戶端
 * */
public class Client {
    /**發送消息到服務器端
     * 同時接收服務器反饋
     * */
    public static void main(String[] args) {
         Socket socket = null;
         String msg = null;
         InputStream inputStream = null;

        PrintWriter printWriter = null;
        Scanner scanner = null;
        try {
            //定義服務器端地址
            SocketAddress address = new InetSocketAddress(Constant.SERVER, Constant.SERV_PORT);
            socket = new Socket();
            //鏈接服務器端
            socket.connect(address);
            if(socket.isConnected()){
                try{
                    printWriter = new PrintWriter(socket.getOutputStream());
                    inputStream = new ObjectInputStream(socket.getInputStream());
                    scanner = new Scanner(System.in);
                    do {
                        msg = scanner.nextLine();
                        printWriter.println(msg);
                        printWriter.flush();

                        //確認消息
                        msg = ((ObjectInputStream) inputStream).readUTF();
                        System.out.println(msg);
                    }while (!Constant.EXIT_TAG.equals(msg));
                }finally {
                    printWriter.close();
                    printWriter = null;
                    inputStream.close();
                    inputStream = null;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(null != socket && !socket.isClosed()){
                try {
                    socket.close();
                    socket = null;
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}


端到端持續消息發送結果

image.png

image.png


結論

結合以上通信機制和原理、核心實現思路,咱們能夠擴展這種模型的應用。下次咱們將基於以上思想和實現,手寫一個RPC定義的同步框架的核心業務和擴展。

相關文章
相關標籤/搜索