java網絡編程——多線程數據收發並行

基本介紹與思路

收發並行

前一篇博客中,完成了客戶端與服務端的簡單TCP交互,但這種交互是觸發式的:客戶端發送一條消息,服務端收到後再回送一條。沒有作到收發並行。收發並行的字面意思很容易理解,即數據的發送與接收互相不干擾,相互獨立。固然,要保證服務端和客戶端都能作到收發並行。java

業務邏輯

脫離業務邏輯的實踐是毫無心義的,先描述一下本實踐中的業務邏輯:一個服務端接受多個客戶端的鏈接,鏈接後,向各個客戶端定時發送時間戳數據,同時在並行條件下,接受各個客戶端發送來的數據並顯示;客戶端鍵盤輸入字符串,發送給服務端,同時在並行條件下,接收服務器發來的時間戳數據並顯示。算法

實現思路

實現發送與接收並行,思路其實很是直觀,即創建兩個線程,分別用來實現輸入流和輸出流。個人代碼的設計方案以下圖所示:
image編程

  • 服務端:建立一個監聽客戶端鏈接的線程,線程中一旦接收到請求,建立一個對應該客戶端收發處理的對象,對象中建立輸入流線程,並使用單例線程池建立輸出流線程。主線程使用鍵盤輸入流System.in來進行阻塞。同時主線程中建立Timer定時器,定時向輸出流發送數據。
  • 客戶端:主線程發送鏈接請求,與服務器創建鏈接。使用鍵盤輸入流System.in來阻塞主線程,同時做爲輸出流使用;建立一個輸入流線程,異步運行,接收服務器數據。

代碼分析

源代碼文件結構以下圖所示
image服務器

服務端

服務器端分爲三個部分,分別是Server.java,TCPServer.java和ClientHandler.java多線程

Server.java
package Server;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.text.SimpleDateFormat;
import java.util.TimerTask;
import java.util.Timer;
import java.util.Date;

public class Server {
    private static SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd_HH:mm:ss");
    public static void main(String[] args){
        try {
            TCPServer.accept();
            new Timer("Timer").schedule(new TimerTask() {
                @Override
                public void run() {
                    TCPServer.broadcast(df.format(new Date()));
                }
            }, 1000,5000);
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
            String str;
            //由於ClientListen是異步線程,使用鍵盤輸入流將主線程阻塞住,保證跟ClientListen線程同步,同時可控制ClientListen服務的退出
            do{
                str = bufferedReader.readLine();
            }while (str.equalsIgnoreCase("serverExit"));
        }catch (Exception e){
            System.out.println("監聽請求過程當中異常退出");
        }

        try {
            TCPServer.stop();
        } catch (IOException e) {
            System.out.println("關閉套接字過程當中出現異常");
        } finally {
            System.out.println("服務器端套接字已關閉!");
        }
    }

}
TCPServer.java
package Server;

import java.io.IOException;
import java.net.*;
import java.util.ArrayList;
import java.util.UUID;

class TCPServer {
    private static int LOCAL_PORT = 3001;
    private static ClientListenHandle clientListenHandle;
    private static ArrayList<ClientHandler> clientHandlerList = new ArrayList<ClientHandler>();

    static void accept() throws IOException {
        //建立服務器端套接字
        ServerSocket serverSocket = createSocket();
        InitSocket(serverSocket);
        System.out.println("服務器準備就緒 addr: " + Inet4Address.getLocalHost() + "  /port: " + LOCAL_PORT);
        System.out.println("開始監聽客戶端鏈接...");
        //建立線程監聽客戶端請求
        clientListenHandle = new ClientListenHandle(serverSocket);
        clientListenHandle.start();

    }

    static void stop() throws IOException {
        for (ClientHandler clientHandler : clientHandlerList) {
            clientHandler.socketClose();
        }
        clientHandlerList.clear();
        clientListenHandle.exit();
    }

    private static ServerSocket createSocket() throws IOException {
        ServerSocket socket = new ServerSocket(LOCAL_PORT, 50);
        return socket;
    }

    private static void InitSocket(ServerSocket socket) throws SocketException {
        // 是否複用未徹底關閉的地址端口
        socket.setReuseAddress(true);

        // 等效Socket#setReceiveBufferSize
        socket.setReceiveBufferSize(64 * 1024 * 1024);

        // 設置serverSocket#accept超時時間,不設置即永久等待
        // serverSocket.setSoTimeout(2000);

        // 設置性能參數:短連接,延遲,帶寬的相對重要性
        socket.setPerformancePreferences(1, 1, 1);
    }

    static void broadcast(String msg) {
        for (ClientHandler clientHandler : clientHandlerList) {
            clientHandler.write(msg);
        }

    }
    /**
     * 監聽客戶端請求的線程
     */
    static class ClientListenHandle extends Thread {
        private final ServerSocket serverSocket;
        private Boolean done = false;

        ClientListenHandle(ServerSocket serverSocket) {
            this.serverSocket = serverSocket;
        }

        @Override
        public void run() {
            super.run();
            try {
                do {
                    Socket client;
                    try {
                        client = serverSocket.accept();
                    } catch (Exception e) {
                        continue;//某一個客戶端鏈接失敗,要保證其它客戶端能正常鏈接
                    }
                    String uuid = UUID.randomUUID().toString();//爲客戶端生成惟一標識
                    System.out.println("已接受鏈接client:"+uuid+" /Addr:"+client.getInetAddress()+" /Port:"+client.getPort());
                    //爲該客戶端實例化一個ClientHandler對象,注入對象刪除操做的lambda表達式
                    ClientHandler clientHandle = new ClientHandler(client, handler -> clientHandlerList.remove(handler), uuid);
                    clientHandle.read();
                    clientHandlerList.add(clientHandle);
                } while (!done);
            } catch (Exception e) {
                if (!done) {
                    System.out.println("異常退出!");
                }
            }
        }

        void exit() throws IOException {
            done = true;
            serverSocket.close();
        }
    }
}
ClientHandler.java
package Server;

import java.io.*;
import java.net.Socket;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ClientHandler {
    private final Socket client;
    private final ReadHandler readHandler;
    private final WriteHandle writeHandler;
    private final Removable removable;
    private final String uid;

    ClientHandler(Socket socket, Removable removable, String uid) throws IOException {
        this.client = socket;
        this.readHandler = new ReadHandler(socket.getInputStream());
        this.writeHandler = new WriteHandle(socket.getOutputStream());
        this.removable = removable;
        this.uid = uid;
    }

    void read() {
        readHandler.start();
    }

    void write(String msg) {
        System.out.println("Server -->> " + uid + " : " + msg);
        writeHandler.write(msg);
    }

    /**
     * 把輸入輸出流和套接字都關閉
     */
    void socketClose(){
        try {
            readHandler.exit();
            writeHandler.exit();
            client.close();
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            System.out.println("客戶端:"+uid+" 套接字鏈接已關閉");
        }
    }
    /**
     * 把自身從對象列表中清除掉,具體方法是使用lambda表達式來注入的
     */
    void removeClientHandler() {
        removable.removeClientHandle(this);
    }

    /**
     * 定義一個接口,接收lambda表達式
     */
    interface Removable {
        void removeClientHandle(ClientHandler clientHandler);
    }

    /**
     * 輸入流操做線程
     */
    class ReadHandler extends Thread {
        private final InputStream inputStream;
        private Boolean flag = true;

        ReadHandler(InputStream inputStream) {
            this.inputStream = inputStream;
        }

        @Override
        public void run() {
            super.run();

            BufferedReader socketInput = null;
            try {
                socketInput = new BufferedReader(new InputStreamReader(inputStream));
                do {

                    String str = socketInput.readLine();
                    //不知道爲何,客戶端關閉時,這裏直接報異常,獲取不到null
                    if (str.equalsIgnoreCase("exit")) {
                        System.out.println("已沒法讀取客戶端數據!");
                        throw new Exception();
                    }
                    System.out.println(uid + " -->> server : " + str);
                } while (flag);
            } catch (Exception e) {
                if (flag) {
                    System.out.println("讀取客戶端過程當中異常退出");
                    ClientHandler.this.removeClientHandler();
                    ClientHandler.this.socketClose();
                }
            }
        }

        void exit() throws IOException {
            flag = false;
            inputStream.close();
        }
    }

    /**
     * 輸出流操做線程,使用單例線程池,能夠自動等待任務並處理,無需人工添加阻塞操做
     */
    class WriteHandle {
        private final OutputStream outputStream;
        private final ExecutorService executorService;

        WriteHandle(OutputStream outputStream) {
            this.outputStream = outputStream;
            this.executorService = Executors.newSingleThreadExecutor();
        }
        private void write(String msg){
            executorService.execute(new WriteRunnable(msg,outputStream));
        }
        void exit() throws IOException{
            outputStream.close();
            executorService.shutdown();
        }
        class WriteRunnable implements Runnable{
            private final String msg;
            private final PrintStream printStream;

            WriteRunnable(String msg, OutputStream outputStream) {
                this.msg = msg;
                this.printStream = new PrintStream(outputStream);
            }

            @Override
            public void run() {
                try {
                    printStream.println(msg);
                } catch (Exception e) {
                    System.out.println("打印輸出異常!");
                }

            }
        }
    }
}

客戶端

Client.java
package Client;

import java.io.*;
import java.util.UUID;

import Client.bean.ServerInfo;
public class Client {
    public static void main(String[] args)throws IOException {
        ServerInfo serverInfo = new ServerInfo(UUID.randomUUID().toString(),"127.0.2.16",3001);
        System.out.println("準備發起服務器鏈接...");
        System.out.println("服務器信息:Addr:"+serverInfo.getAddress()+" /Port:"+serverInfo.getPort());

        try {
            TCPClient.connect(serverInfo);
        }catch (Exception e){
            System.out.println("鏈接失敗,退出");
        }
    }
}
TCPClient.java
package Client;

import Client.bean.ServerInfo;

import java.io.*;
import java.net.*;

class TCPClient {
    static void connect(ServerInfo serverInfo) throws IOException {
        Socket clientSocket = createSocket();//創建套接字

        InitSocket(clientSocket);//初始化套接字
        //鏈接遠程服務器
        clientSocket.connect(new InetSocketAddress(serverInfo.getAddress(), serverInfo.getPort()), 3000);
        System.out.println("已鏈接server");
        try {
            //輸入流線程
            ReadHandle readHandle = new ReadHandle(clientSocket.getInputStream());
            readHandle.start();

            //輸出流
            write(clientSocket);
            //當輸出流結束時,關閉輸入流
            readHandle.exit();
        } catch (Exception e) {
            System.out.println("出現異常!");
        } finally {
            clientSocket.close();
            System.out.println("客戶端結束");
        }
    }

    private static Socket createSocket() throws IOException {
        Socket socket = new Socket();
        return socket;
    }

    private static void InitSocket(Socket socket) throws SocketException {
        // 設置讀取超時時間爲2秒,超過2秒未得到數據時readline報超時異常;不設置即進行永久等待
        //socket.setSoTimeout(2000);
        // 是否複用未徹底關閉的Socket地址,對於指定bind操做後的套接字有效
        socket.setReuseAddress(true);

        // 是否開啓Nagle算法
        socket.setTcpNoDelay(true);

        // 是否須要在長時無數據響應時發送確認數據(相似心跳包),時間大約爲2小時
        socket.setKeepAlive(true);

        // 對於close關閉操做行爲進行怎樣的處理;默認爲false,0
        // false、0:默認狀況,關閉時當即返回,底層系統接管輸出流,將緩衝區內的數據發送完成
        // true、0:關閉時當即返回,緩衝區數據拋棄,直接發送RST結束命令到對方,並沒有需通過2MSL等待
        // true、200:關閉時最長阻塞200毫秒,隨後按第二狀況處理
        socket.setSoLinger(true, 20);

        // 是否讓緊急數據內斂,默認false;緊急數據經過 socket.sendUrgentData(1);發送
        socket.setOOBInline(true);

        // 設置接收發送緩衝器大小
        socket.setReceiveBufferSize(64 * 1024 * 1024);
        socket.setSendBufferSize(64 * 1024 * 1024);

        // 設置性能參數:短連接,延遲,帶寬的相對重要性
        socket.setPerformancePreferences(1, 1, 1);
    }

    /**
     * 輸出流方法
     */
    private static void write(Socket socket) throws IOException {
        //構建鍵盤輸入流
        InputStream in = System.in;
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(in));
        //獲得socket輸出流並轉化爲打印流
        OutputStream outputStream = socket.getOutputStream();
        PrintStream printStream = new PrintStream(outputStream);

        for(;;){
            String str = bufferedReader.readLine();//從鍵盤輸入獲取內容
            printStream.println(str);//經過打印流輸出
            if(str.equalsIgnoreCase("exit")){
                break;
            }
        }

        printStream.close();
        System.out.println("輸出流關閉");
    }


    /**
     * 輸入流線程
     */
    static class ReadHandle extends Thread {
        private final InputStream inputStream;
        private Boolean done = false;

        ReadHandle(InputStream inputStream) {
            this.inputStream = inputStream;
        }

        @Override
        public void run() {
            super.run();
            try {
                //獲取輸入流
                BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));
                do {
                    String str;
                    str = socketInput.readLine();
                    if (str==null) {
                        break;
                    }
                    System.out.println("From server: "+ str);
                } while (!done);
            } catch (Exception e) {
                if (!done) {
                    System.out.println("異常斷開,或者輸入異常");
                }
            }
        }

        void exit() {
            done = true;
            try {
                inputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }finally {
                System.out.println("輸入流關閉");
            }
        }
    }
}

關於代碼的具體分析,因爲代碼已有不少註釋,博文中便再也不贅述。dom

運行結果

運行結果以下所示異步

  • 服務端
    image
    鏈接成功後,服務端每隔5秒向各個客戶端發送時間戳信息,同時接收兩個客戶端發來的信息socket

  • 客戶端1
    image
    輸入「I am client1」並向服務端發送,同時接收服務端發來的時間戳信息tcp

  • 客戶端2
    image
    輸入「I am client2」並向服務端發送,同時接收服務端發來的時間戳信息ide

本篇博客記錄一次實踐學習,使用多線程+socket編程,實現了單服務器與多客戶端之間的數據收發並行,除此以外,經過思惟流程圖,整理了代碼的設計思路並展現出來。

相關文章
相關標籤/搜索