前一篇博客中,完成了客戶端與服務端的簡單TCP交互,但這種交互是觸發式的:客戶端發送一條消息,服務端收到後再回送一條。沒有作到收發並行。收發並行的字面意思很容易理解,即數據的發送與接收互相不干擾,相互獨立。固然,要保證服務端和客戶端都能作到收發並行。java
脫離業務邏輯的實踐是毫無心義的,先描述一下本實踐中的業務邏輯:一個服務端接受多個客戶端的鏈接,鏈接後,向各個客戶端定時發送時間戳數據,同時在並行條件下,接受各個客戶端發送來的數據並顯示;客戶端鍵盤輸入字符串,發送給服務端,同時在並行條件下,接收服務器發來的時間戳數據並顯示。算法
實現發送與接收並行,思路其實很是直觀,即創建兩個線程,分別用來實現輸入流和輸出流。個人代碼的設計方案以下圖所示:
編程
源代碼文件結構以下圖所示
服務器
服務器端分爲三個部分,分別是Server.java,TCPServer.java和ClientHandler.java多線程
package Server; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.text.SimpleDateFormat; import java.util.TimerTask; import java.util.Timer; import java.util.Date; public class Server { private static SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd_HH:mm:ss"); public static void main(String[] args){ try { TCPServer.accept(); new Timer("Timer").schedule(new TimerTask() { @Override public void run() { TCPServer.broadcast(df.format(new Date())); } }, 1000,5000); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in)); String str; //由於ClientListen是異步線程,使用鍵盤輸入流將主線程阻塞住,保證跟ClientListen線程同步,同時可控制ClientListen服務的退出 do{ str = bufferedReader.readLine(); }while (str.equalsIgnoreCase("serverExit")); }catch (Exception e){ System.out.println("監聽請求過程當中異常退出"); } try { TCPServer.stop(); } catch (IOException e) { System.out.println("關閉套接字過程當中出現異常"); } finally { System.out.println("服務器端套接字已關閉!"); } } }
package Server; import java.io.IOException; import java.net.*; import java.util.ArrayList; import java.util.UUID; class TCPServer { private static int LOCAL_PORT = 3001; private static ClientListenHandle clientListenHandle; private static ArrayList<ClientHandler> clientHandlerList = new ArrayList<ClientHandler>(); static void accept() throws IOException { //建立服務器端套接字 ServerSocket serverSocket = createSocket(); InitSocket(serverSocket); System.out.println("服務器準備就緒 addr: " + Inet4Address.getLocalHost() + " /port: " + LOCAL_PORT); System.out.println("開始監聽客戶端鏈接..."); //建立線程監聽客戶端請求 clientListenHandle = new ClientListenHandle(serverSocket); clientListenHandle.start(); } static void stop() throws IOException { for (ClientHandler clientHandler : clientHandlerList) { clientHandler.socketClose(); } clientHandlerList.clear(); clientListenHandle.exit(); } private static ServerSocket createSocket() throws IOException { ServerSocket socket = new ServerSocket(LOCAL_PORT, 50); return socket; } private static void InitSocket(ServerSocket socket) throws SocketException { // 是否複用未徹底關閉的地址端口 socket.setReuseAddress(true); // 等效Socket#setReceiveBufferSize socket.setReceiveBufferSize(64 * 1024 * 1024); // 設置serverSocket#accept超時時間,不設置即永久等待 // serverSocket.setSoTimeout(2000); // 設置性能參數:短連接,延遲,帶寬的相對重要性 socket.setPerformancePreferences(1, 1, 1); } static void broadcast(String msg) { for (ClientHandler clientHandler : clientHandlerList) { clientHandler.write(msg); } } /** * 監聽客戶端請求的線程 */ static class ClientListenHandle extends Thread { private final ServerSocket serverSocket; private Boolean done = false; ClientListenHandle(ServerSocket serverSocket) { this.serverSocket = serverSocket; } @Override public void run() { super.run(); try { do { Socket client; try { client = serverSocket.accept(); } catch (Exception e) { continue;//某一個客戶端鏈接失敗,要保證其它客戶端能正常鏈接 } String uuid = UUID.randomUUID().toString();//爲客戶端生成惟一標識 System.out.println("已接受鏈接client:"+uuid+" /Addr:"+client.getInetAddress()+" /Port:"+client.getPort()); //爲該客戶端實例化一個ClientHandler對象,注入對象刪除操做的lambda表達式 ClientHandler clientHandle = new ClientHandler(client, handler -> clientHandlerList.remove(handler), uuid); clientHandle.read(); clientHandlerList.add(clientHandle); } while (!done); } catch (Exception e) { if (!done) { System.out.println("異常退出!"); } } } void exit() throws IOException { done = true; serverSocket.close(); } } }
package Server; import java.io.*; import java.net.Socket; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ClientHandler { private final Socket client; private final ReadHandler readHandler; private final WriteHandle writeHandler; private final Removable removable; private final String uid; ClientHandler(Socket socket, Removable removable, String uid) throws IOException { this.client = socket; this.readHandler = new ReadHandler(socket.getInputStream()); this.writeHandler = new WriteHandle(socket.getOutputStream()); this.removable = removable; this.uid = uid; } void read() { readHandler.start(); } void write(String msg) { System.out.println("Server -->> " + uid + " : " + msg); writeHandler.write(msg); } /** * 把輸入輸出流和套接字都關閉 */ void socketClose(){ try { readHandler.exit(); writeHandler.exit(); client.close(); } catch (IOException e) { e.printStackTrace(); }finally { System.out.println("客戶端:"+uid+" 套接字鏈接已關閉"); } } /** * 把自身從對象列表中清除掉,具體方法是使用lambda表達式來注入的 */ void removeClientHandler() { removable.removeClientHandle(this); } /** * 定義一個接口,接收lambda表達式 */ interface Removable { void removeClientHandle(ClientHandler clientHandler); } /** * 輸入流操做線程 */ class ReadHandler extends Thread { private final InputStream inputStream; private Boolean flag = true; ReadHandler(InputStream inputStream) { this.inputStream = inputStream; } @Override public void run() { super.run(); BufferedReader socketInput = null; try { socketInput = new BufferedReader(new InputStreamReader(inputStream)); do { String str = socketInput.readLine(); //不知道爲何,客戶端關閉時,這裏直接報異常,獲取不到null if (str.equalsIgnoreCase("exit")) { System.out.println("已沒法讀取客戶端數據!"); throw new Exception(); } System.out.println(uid + " -->> server : " + str); } while (flag); } catch (Exception e) { if (flag) { System.out.println("讀取客戶端過程當中異常退出"); ClientHandler.this.removeClientHandler(); ClientHandler.this.socketClose(); } } } void exit() throws IOException { flag = false; inputStream.close(); } } /** * 輸出流操做線程,使用單例線程池,能夠自動等待任務並處理,無需人工添加阻塞操做 */ class WriteHandle { private final OutputStream outputStream; private final ExecutorService executorService; WriteHandle(OutputStream outputStream) { this.outputStream = outputStream; this.executorService = Executors.newSingleThreadExecutor(); } private void write(String msg){ executorService.execute(new WriteRunnable(msg,outputStream)); } void exit() throws IOException{ outputStream.close(); executorService.shutdown(); } class WriteRunnable implements Runnable{ private final String msg; private final PrintStream printStream; WriteRunnable(String msg, OutputStream outputStream) { this.msg = msg; this.printStream = new PrintStream(outputStream); } @Override public void run() { try { printStream.println(msg); } catch (Exception e) { System.out.println("打印輸出異常!"); } } } } }
package Client; import java.io.*; import java.util.UUID; import Client.bean.ServerInfo; public class Client { public static void main(String[] args)throws IOException { ServerInfo serverInfo = new ServerInfo(UUID.randomUUID().toString(),"127.0.2.16",3001); System.out.println("準備發起服務器鏈接..."); System.out.println("服務器信息:Addr:"+serverInfo.getAddress()+" /Port:"+serverInfo.getPort()); try { TCPClient.connect(serverInfo); }catch (Exception e){ System.out.println("鏈接失敗,退出"); } } }
package Client; import Client.bean.ServerInfo; import java.io.*; import java.net.*; class TCPClient { static void connect(ServerInfo serverInfo) throws IOException { Socket clientSocket = createSocket();//創建套接字 InitSocket(clientSocket);//初始化套接字 //鏈接遠程服務器 clientSocket.connect(new InetSocketAddress(serverInfo.getAddress(), serverInfo.getPort()), 3000); System.out.println("已鏈接server"); try { //輸入流線程 ReadHandle readHandle = new ReadHandle(clientSocket.getInputStream()); readHandle.start(); //輸出流 write(clientSocket); //當輸出流結束時,關閉輸入流 readHandle.exit(); } catch (Exception e) { System.out.println("出現異常!"); } finally { clientSocket.close(); System.out.println("客戶端結束"); } } private static Socket createSocket() throws IOException { Socket socket = new Socket(); return socket; } private static void InitSocket(Socket socket) throws SocketException { // 設置讀取超時時間爲2秒,超過2秒未得到數據時readline報超時異常;不設置即進行永久等待 //socket.setSoTimeout(2000); // 是否複用未徹底關閉的Socket地址,對於指定bind操做後的套接字有效 socket.setReuseAddress(true); // 是否開啓Nagle算法 socket.setTcpNoDelay(true); // 是否須要在長時無數據響應時發送確認數據(相似心跳包),時間大約爲2小時 socket.setKeepAlive(true); // 對於close關閉操做行爲進行怎樣的處理;默認爲false,0 // false、0:默認狀況,關閉時當即返回,底層系統接管輸出流,將緩衝區內的數據發送完成 // true、0:關閉時當即返回,緩衝區數據拋棄,直接發送RST結束命令到對方,並沒有需通過2MSL等待 // true、200:關閉時最長阻塞200毫秒,隨後按第二狀況處理 socket.setSoLinger(true, 20); // 是否讓緊急數據內斂,默認false;緊急數據經過 socket.sendUrgentData(1);發送 socket.setOOBInline(true); // 設置接收發送緩衝器大小 socket.setReceiveBufferSize(64 * 1024 * 1024); socket.setSendBufferSize(64 * 1024 * 1024); // 設置性能參數:短連接,延遲,帶寬的相對重要性 socket.setPerformancePreferences(1, 1, 1); } /** * 輸出流方法 */ private static void write(Socket socket) throws IOException { //構建鍵盤輸入流 InputStream in = System.in; BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(in)); //獲得socket輸出流並轉化爲打印流 OutputStream outputStream = socket.getOutputStream(); PrintStream printStream = new PrintStream(outputStream); for(;;){ String str = bufferedReader.readLine();//從鍵盤輸入獲取內容 printStream.println(str);//經過打印流輸出 if(str.equalsIgnoreCase("exit")){ break; } } printStream.close(); System.out.println("輸出流關閉"); } /** * 輸入流線程 */ static class ReadHandle extends Thread { private final InputStream inputStream; private Boolean done = false; ReadHandle(InputStream inputStream) { this.inputStream = inputStream; } @Override public void run() { super.run(); try { //獲取輸入流 BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream)); do { String str; str = socketInput.readLine(); if (str==null) { break; } System.out.println("From server: "+ str); } while (!done); } catch (Exception e) { if (!done) { System.out.println("異常斷開,或者輸入異常"); } } } void exit() { done = true; try { inputStream.close(); } catch (IOException e) { e.printStackTrace(); }finally { System.out.println("輸入流關閉"); } } } }
關於代碼的具體分析,因爲代碼已有不少註釋,博文中便再也不贅述。dom
運行結果以下所示異步
服務端
鏈接成功後,服務端每隔5秒向各個客戶端發送時間戳信息,同時接收兩個客戶端發來的信息socket
客戶端1
輸入「I am client1」並向服務端發送,同時接收服務端發來的時間戳信息tcp
客戶端2
輸入「I am client2」並向服務端發送,同時接收服務端發來的時間戳信息ide
本篇博客記錄一次實踐學習,使用多線程+socket編程,實現了單服務器與多客戶端之間的數據收發並行,除此以外,經過思惟流程圖,整理了代碼的設計思路並展現出來。