目錄html
最近公司在預研設備app端與服務端的交互方案,主要方案有java
雖然上面的一些成熟方案確定更利於上生產環境,但它們通信基礎也都是socket長鏈接,因此本人主要是預研了一下socket長鏈接的交互,寫了個簡單demo,採用了BIO的多線程方案,實現了自定義簡單協議,心跳機制,socket客戶端身份強制驗證,socket客戶端斷線獲知等功能,並暴露了一些接口,可經過接口簡單實現客戶端與服務端的socket交互。git
Github 地址點此github
IO通信模型主要包括阻塞式同步IO(BIO),非阻塞式同步IO,多路複用IO以及異步IO。大神博客請點此web
BIO就是:blocking IO。最容易理解、最容易實現的IO工做方式,應用程序向操做系統請求網絡IO操做,這時應用程序會一直等待;另外一方面,操做系統收到請求後,也會等待,直到網絡上有數據傳到監聽端口;操做系統在收集數據後,會把數據發送給應用程序;最後應用程序受到數據,並解除等待狀態。spring
這種模式下,應用程序的線程再也不一直等待操做系統的IO狀態,而是在等待一段時間後,就解除阻塞。若是沒有獲得想要的結果,則再次進行相同的操做。這樣的工做方式,暴增了應用程序的線程能夠不會一直阻塞,而是能夠進行一些其餘工做。編程
目前流程的多路複用IO實現主要包括四種:select、poll、epoll、kqueue。下表是他們的一些重要特性的比較:json
異步IO則是採用「訂閱-通知」模式:即應用程序向操做系統註冊IO監聽,而後繼續作本身的事情。當操做系統發生IO事件,而且準備好數據後,在主動通知應用程序,觸發相應的函數。windows
Java
對阻塞式同步IO的支持主要是java.net
包中的Socket
套接字實現;Java
中非阻塞同步IO模式經過設置serverSocket.setSoTimeout(100);
便可實現;Java 1.4
中引入了NIO
框架(java.nio
包)能夠構建多路複用、同步非阻塞IO
程序;Java 7
中對NIO
進行了進一步改進,即NIO2
,引入了異步非阻塞IO方式。因爲是要實現socket長鏈接的demo,主要關注其一些實現注意點及方案,因此本demo採用了BIO
的多線程方案,該方案代碼比較簡單、直觀,引入了多線程技術後,IO的處理吞吐量也大大提升了。下面是BIO
多線程方案server
端的簡單實現:springboot
public static void main(String[] args) throws Exception{ ServerSocket serverSocket = new ServerSocket(83); try { while(true) { Socket socket = null; socket = serverSocket.accept(); //這邊得到socket鏈接後開啓一個線程監聽處理數據 SocketServerThread socketServerThread = new SocketServerThread(socket); new Thread(socketServerThread).start(); } } catch(Exception e) { log.error("Socket accept failed. Exception:{}", e.getMessage()); } finally { if(serverSocket != null) { serverSocket.close(); } } } }
@slf4j class SocketServerThread implements Runnable { private Socket socket; public SocketServerThread (Socket socket) { this.socket = socket; } @Override public void run() { InputStream in = null; OutputStream out = null; try { in = socket.getInputStream(); out = socket.getOutputStream(); Integer sourcePort = socket.getPort(); int maxLen = 2048; byte[] contextBytes = new byte[maxLen]; int realLen; StringBuffer message = new StringBuffer(); BIORead:while(true) { try { while((realLen = in.read(contextBytes, 0, maxLen)) != -1) { message.append(new String(contextBytes , 0 , realLen)); /* * 咱們假設讀取到「over」關鍵字, * 表示客戶端的全部信息在通過若干次傳送後,完成 * */ if(message.indexOf("over") != -1) { break BIORead; } } } //下面打印信息 log.info("服務器(收到來自於端口:" + sourcePort + "的信息:" + message); //下面開始發送信息 out.write("回發響應信息!".getBytes()); //關閉 out.close(); in.close(); this.socket.close(); } catch(Exception e) { log.error("Socket read failed. Exception:{}", e.getMessage()); } } }
假設客戶端分別發送了兩個數據包D1和D2給服務端,因爲服務端一次讀取到的字節數是不肯定的,故可能存在如下4種狀況。
因爲底層的TCP沒法理解上層的業務數據,因此在底層是沒法保證數據包不被拆分和重組的,這個問題只能經過上層的應用協議棧設計來解決,根據業界的主流協議的解決方案,能夠概括以下:
做爲socket長鏈接的demo,使用了上述的解決思路2,即在包尾增長回車換行符進行數據的分割,同時總體數據使用約定的Json
體進行做爲消息的傳輸格式。
使用換行符進行數據分割,可以下進行數據的單行讀取:
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); String message; while ((message = reader.readLine()) != null) { //.... }
可以下進行數據的單行寫入:
PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()), true); writer.println(message);
Json
消息格式以下:
@Data public class ServerReceiveDto implements Serializable { private static final long serialVersionUID = 6600253865619639317L; /** * 功能碼 0 心跳 1 登錄 2 登出 3 發送消息 */ private Integer functionCode; /** * 用戶id */ private String userId; /** * 這邊假設是string的消息體 */ private String message; }
@Data public class ServerSendDto implements Serializable { private static final long serialVersionUID = -7453297551797390215L; /** * 狀態碼 20000 成功,不然有errorMessage */ private Integer statusCode; private String message; /** * 功能碼 */ private Integer functionCode; /** * 錯誤消息 */ private String errorMessage; }
@Data public class ClientSendDto implements Serializable { private static final long serialVersionUID = 97085384412852967L; /** * 功能碼 0 心跳 1 登錄 2 登出 3 發送消息 */ private Integer functionCode; /** * 用戶id */ private String userId; /** * 這邊假設是string的消息體 */ private String message; }
經過自定義心跳包來實現掉線檢測功能,具體思路以下:
客戶端鏈接上服務端後,在服務端會維護一個在線客戶端列表。客戶端每隔一段時間,向服務端發送一個心跳包,服務端受收到包之後,會更新客戶端最近一次在線時間。一旦服務端超過規定時間沒有接收到客戶端發來的包,則視爲掉線。
維護一個客戶端map,其中key表明用戶的惟一id(用戶惟一id的身份驗證下面會說明),value表明用戶對應的一個實體
/** * 存儲當前由用戶信息活躍的的socket線程 */ private ConcurrentMap<String, Connection> existSocketMap = new ConcurrentHashMap<>();
其中Connection
對象包含的信息以下:
@Slf4j @Data public class Connection { /** * 當前的socket鏈接實例 */ private Socket socket; /** * 當前鏈接線程 */ private ConnectionThread connectionThread; /** * 當前鏈接是否登錄 */ private boolean isLogin; /** * 存儲當前的user信息 */ private String userId; /** * 建立時間 */ private Date createTime; /** * 最後一次更新時間,用於判斷心跳 */ private Date lastOnTime; }
主要關注其中的lastOnTime
字段,每次服務端接收到標識是心跳數據,會更新當前的lastOnTime
字段,代碼以下:
if (functionCode.equals(FunctionCodeEnum.HEART.getValue())) { //心跳類型 connection.setLastOnTime(new Date()); //發送一樣的心跳數據給客戶端 ServerSendDto dto = new ServerSendDto(); dto.setFunctionCode(FunctionCodeEnum.HEART.getValue()); connection.println(JSONObject.toJSONString(dto)); }
額外會有一個監測進程,以必定頻率來監測上述維護的map中的每個Connection對象,若是當前時間與lastOnTime
的時間間隔超過自定義的長度,則自動將其對應的socket鏈接關閉,代碼以下:
Date now = new Date(); Date lastOnTime = connectionThread.getConnection().getLastOnTime(); long heartDuration = now.getTime() - lastOnTime.getTime(); if (heartDuration > SocketConstant.HEART_RATE) { //心跳超時,關閉當前線程 log.error("心跳超時"); connectionThread.stopRunning(); }
在上面代碼中,服務端收到標識是心跳數據的時候,除了更新該socket
對應的lastOnTime
,還會一樣一樣心跳類型的數據給客戶端,客戶端收到標識是心跳數據的時候也會更新本身的lastOnTime
字段,同時也有一個心跳監測線程在監測當前的socket鏈接心跳是否超時
經過代碼socket = serverSocket.accept()
得到的一個socket
鏈接咱們僅僅只能知道其客戶端的ip
以及端口號,並不能獲知這個socket
鏈接對應的究竟是哪個客戶端,所以必須得先得到客戶端的身份而且驗證經過其身份才能讓其正常鏈接。
具體的實現思路是:
自定義一個登錄處理接口,當server
端受到標識是用戶登錄的時候(此時會攜帶用戶信息或者token,此處簡化爲用戶id),調用用戶的登錄驗證,驗證經過的話則將該socket
鏈接與用戶信息綁定,設置其爲已登陸,而且封裝對應的對象放入前面提的客戶端map中,由此可得到具體用戶對應的哪個socket
鏈接。
爲了實現socket
鏈接的強制驗證,在監測線程中,也會判斷當前用戶多長時間內沒有實現登陸態,若超時則認爲該socket
鏈接爲非法鏈接,主動關閉該socket
鏈接。
自定義登錄處理接口,這邊簡單以userId來判斷是否容許登錄:
public interface LoginHandler { /** * client登錄的處理函數 * * @param userId 用戶id * * @return 是否驗證經過 */ boolean canLogin(String userId); }
收到客戶端發來的數據時候的處理:
if (functionCode.equals(FunctionCodeEnum.LOGIN.getValue())) { //登錄,身份驗證 String userId = receiveDto.getUserId(); if (socketServer.getLoginHandler().canLogin(userId)) { //設置用戶對象已登陸狀態 connection.setLogin(true); connection.setUserId(userId); if (socketServer.getExistSocketMap().containsKey(userId)) { //存在已登陸的用戶,發送登出指令並主動關閉該socket Connection existConnection = socketServer.getExistSocketMap().get(userId); ServerSendDto dto = new ServerSendDto(); dto.setStatusCode(999); dto.setFunctionCode(FunctionCodeEnum.MESSAGE.getValue()); dto.setErrorMessage("force logout"); existConnection.println(JSONObject.toJSONString(dto)); existConnection.getConnectionThread().stopRunning(); log.error("用戶被客戶端重入踢出,userId:{}", userId); } //添加到已登陸map中 socketServer.getExistSocketMap().put(userId, connection); }
監測線程判斷用戶是否完成身份驗證:
if (!connectionThread.getConnection().isLogin()) { //尚未用戶登錄成功 Date createTime = connectionThread.getConnection().getCreateTime(); long loginDuration = now.getTime() - createTime.getTime(); if (loginDuration > SocketConstant.LOGIN_DELAY) { //身份驗證超時 log.error("身份驗證超時"); connectionThread.stopRunning(); } }
socket
在讀取數據或者發送數據的時候會出現各類異常,好比客戶端的socket
已斷開鏈接(正常斷開或物理鏈接斷開等),可是服務端還在發送數據或者還在接受數據的過程當中,此時socket
會拋出相關異常,對於該異常的處理須要將自身的socket
鏈接關閉,避免資源的浪費,同時因爲是多線程方案,還需將該socket
對應的線程正常清理。
下面以server端發送數據爲例,改代碼中加入了重試機制:
public void println(String message) { int count = 0; PrintWriter writer; do { try { writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()), true); writer.println(message); break; } catch (IOException e) { count++; if (count >= RETRY_COUNT) { //重試屢次失敗,說明client端socket異常 this.connectionThread.stopRunning(); } } try { Thread.sleep(2 * 1000); } catch (InterruptedException e1) { log.error("Connection.println.IOException interrupt,userId:{}", userId); } } while (count < 3); }
上述調用的this.connectionThread.stopRunning();
代碼以下:
public void stopRunning() { //設置線程對象狀態,便於線程清理 isRunning = false; try { //異常狀況須要將該socket資源釋放 socket.close(); } catch (IOException e) { log.error("ConnectionThread.stopRunning failed.exception:{}", e); } }
上述代碼中設置了線程對象的狀態,下述代碼在監測線程中執行,將沒有運行的線程給清理掉
/** * 存儲只要有socket處理的線程 */ private List<ConnectionThread> existConnectionThreadList = Collections.synchronizedList(new ArrayList<>()); /** * 中間list,用於遍歷的時候刪除 */ private List<ConnectionThread> noConnectionThreadList = Collections.synchronizedList(new ArrayList<>()); //... //刪除list中沒有用的thread引用 existConnectionThreadList.forEach(connectionThread -> { if (!connectionThread.isRunning()) { noConnectionThreadList.add(connectionThread); } }); noConnectionThreadList.forEach(connectionThread -> { existConnectionThreadList.remove(connectionThread); if (connectionThread.getConnection().isLogin()) { //說明用戶已經身份驗證成功了,須要刪除map this.existSocketMap.remove(connectionThread.getConnection().getUserId()); } }); noConnectionThreadList.clear();
因爲使用了springboot
框架來實現該demo,因此項目結構以下:
socket
工具包目錄以下:
pom
文件主要添加了springboot
的相關依賴,以及json
工具和lombok
工具等,依賴以下:
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.3.RELEASE</version> <relativePath/> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.36</version> </dependency> </dependencies>
本身寫的socket
工具包的使用方式以下:
@Configuration @Slf4j public class SocketServerConfig { @Bean public SocketServer socketServer() { SocketServer socketServer = new SocketServer(60000); socketServer.setLoginHandler(userId -> { log.info("處理socket用戶身份驗證,userId:{}", userId); //用戶名中包含了dingxu則容許登錄 return userId.contains("dingxu"); }); socketServer.setMessageHandler((connection, receiveDto) -> log .info("處理socket消息,userId:{},receiveDto:{}", connection.getUserId(), JSONObject.toJSONString(receiveDto))); socketServer.start(); return socketServer; } }
該demo中主要提供瞭如下幾個接口進行測試:
具體的postman文件也放已在項目中,具體可點此連接得到
demo中還提供了一個簡單壓測函數,以下:
@Slf4j public class SocketClientTest { public static void main(String[] args) { ExecutorService clientService = Executors.newCachedThreadPool(); String userId = "dingxu"; for (int i = 0; i < 1000; i++) { int index = i; clientService.execute(() -> { try { SocketClient client; client = new SocketClient(InetAddress.getByName("127.0.0.1"), 60000); //登錄 ClientSendDto dto = new ClientSendDto(); dto.setFunctionCode(FunctionCodeEnum.LOGIN.getValue()); dto.setUserId(userId + index); client.println(JSONObject.toJSONString(dto)); ScheduledExecutorService clientHeartExecutor = Executors.newSingleThreadScheduledExecutor( r -> new Thread(r, "socket_client+heart_" + r.hashCode())); clientHeartExecutor.scheduleWithFixedDelay(() -> { try { ClientSendDto heartDto = new ClientSendDto(); heartDto.setFunctionCode(FunctionCodeEnum.HEART.getValue()); client.println(JSONObject.toJSONString(heartDto)); } catch (Exception e) { log.error("客戶端異常,userId:{},exception:{}", userId, e.getMessage()); client.close(); } }, 0, 5, TimeUnit.SECONDS); while (true){ } } catch (Exception e) { log.error(e.getMessage()); } }); } } }