java-socket-demo的實現

前言

最近公司在預研設備app端與服務端的交互方案,主要方案有java

  • 服務端和app端經過阿里iot套件實現消息的收發;
  • 服務端經過極光推送主動給app端推消息,app經過rest接口與服務端進行交互;
  • 服務端與app經過mqtt消息隊列來實現彼此的消息交互;
  • 服務端與app經過原生socket長鏈接交互。

雖然上面的一些成熟方案確定更利於上生產環境,但它們通信基礎也都是socket長鏈接,因此本人主要是預研了一下socket長鏈接的交互,寫了個簡單demo,採用了BIO的多線程方案,實現了自定義簡單協議,心跳機制,socket客戶端身份強制驗證,socket客戶端斷線獲知等功能,並暴露了一些接口,可經過接口簡單實現客戶端與服務端的socket交互。git

Github 地址點此github

IO通信模型

IO通信模型簡介

IO通信模型主要包括阻塞式同步IO(BIO),非阻塞式同步IO,多路複用IO以及異步IO。大神博客請點此web

1. 阻塞式同步IO

BIO就是:blocking IO。最容易理解、最容易實現的IO工做方式,應用程序向操做系統請求網絡IO操做,這時應用程序會一直等待;另外一方面,操做系統收到請求後,也會等待,直到網絡上有數據傳到監聽端口;操做系統在收集數據後,會把數據發送給應用程序;最後應用程序受到數據,並解除等待狀態。spring

BIO示意圖

2. 非阻塞式同步IO

這種模式下,應用程序的線程再也不一直等待操做系統的IO狀態,而是在等待一段時間後,就解除阻塞。若是沒有獲得想要的結果,則再次進行相同的操做。這樣的工做方式,暴增了應用程序的線程能夠不會一直阻塞,而是能夠進行一些其餘工做。編程

非阻塞式同步IO示意圖

3. 多路複用IO(阻塞+非阻塞)

多路複用io示意圖

目前流程的多路複用IO實現主要包括四種:select、poll、epoll、kqueue。下表是他們的一些重要特性的比較:json

多路複用io模式比較

4. 異步IO

異步IO則是採用「訂閱-通知」模式:即應用程序向操做系統註冊IO監聽,而後繼續作本身的事情。當操做系統發生IO事件,而且準備好數據後,在主動通知應用程序,觸發相應的函數。windows

異步IO示意圖

  • 和同步IO同樣,異步IO也是由操做系統進行支持的。微軟的windows系統提供了一種異步IO技術:IOCP(I/O Completion Port,I/O完成端口);
  • Linux下因爲沒有這種異步IO技術,因此使用的是epoll(上文介紹過的一種多路複用IO技術的實現)對異步IO進行模擬。

Java對IO模型的支持

  • Java對阻塞式同步IO的支持主要是java.net包中的Socket套接字實現;
  • Java中非阻塞同步IO模式經過設置serverSocket.setSoTimeout(100);便可實現;
  • Java 1.4中引入了NIO框架(java.nio包)能夠構建多路複用、同步非阻塞IO程序;
  • Java 7中對NIO進行了進一步改進,即NIO2,引入了異步非阻塞IO方式。

因爲是要實現socket長鏈接的demo,主要關注其一些實現注意點及方案,因此本demo採用了BIO的多線程方案,該方案代碼比較簡單、直觀,引入了多線程技術後,IO的處理吞吐量也大大提升了。下面是BIO多線程方案server端的簡單實現:springboot

public static void main(String[] args) throws Exception{
        ServerSocket serverSocket = new ServerSocket(83);
        try {
            while(true) {
                Socket socket = null;
                socket = serverSocket.accept();
                //這邊得到socket鏈接後開啓一個線程監聽處理數據
                SocketServerThread socketServerThread = new SocketServerThread(socket);
                new Thread(socketServerThread).start();
            }
        } catch(Exception e) {
            log.error("Socket accept failed. Exception:{}", e.getMessage());
        } finally {
            if(serverSocket != null) {
                serverSocket.close();
            }
        }
    }
}
@slf4j
class SocketServerThread implements Runnable {

    private Socket socket;

    public SocketServerThread (Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        InputStream in = null;
        OutputStream out = null;
        try {
            in = socket.getInputStream();
            out = socket.getOutputStream();
            Integer sourcePort = socket.getPort();
            int maxLen = 2048;
            byte[] contextBytes = new byte[maxLen];
            int realLen;
            StringBuffer message = new StringBuffer();
            BIORead:while(true) {
                try {
                    while((realLen = in.read(contextBytes, 0, maxLen)) != -1) {
                        message.append(new String(contextBytes , 0 , realLen));
                        /*
                         * 咱們假設讀取到「over」關鍵字,
                         * 表示客戶端的全部信息在通過若干次傳送後,完成
                         * */
                        if(message.indexOf("over") != -1) {
                            break BIORead;
                        }
                    }
            }
            //下面打印信息
           log.info("服務器(收到來自於端口:" + sourcePort + "的信息:" + message);
            //下面開始發送信息
            out.write("回發響應信息!".getBytes());
            //關閉
            out.close();
            in.close();
            this.socket.close();
        } catch(Exception e) {
           log.error("Socket read failed. Exception:{}", e.getMessage());
        }
    }
}

注意點及實現方案

TCP粘包/拆包

1. 問題說明

假設客戶端分別發送了兩個數據包D1和D2給服務端,因爲服務端一次讀取到的字節數是不肯定的,故可能存在如下4種狀況。

  1. 服務端分兩次讀取到了兩個獨立的數據包,分別是D1和D2,沒有粘包和拆包;
  2. 服務端一次接收到了兩個數據包,D1和D2粘合在一塊兒,被稱爲TCP粘包;
  3. 服務端分兩次讀取到了兩個數據包,第一次讀取到了完整的D1包和D2包的部份內容,第二次讀取到了D2包的剩餘內容,這被稱爲TCP拆包;
  4. 服務端分兩次讀取到了兩個數據包,第一次讀取到了D1包的部份內容D1_1,第二次讀取到了D1包的剩餘內容D1_2和D2包的整包。若是此時服務端TCP接收滑窗很是小,而數據包D1和D2比較大,頗有可能會發生第五種可能,即服務端分屢次才能將D1和D2包接收徹底,期間發生屢次拆包。

2. 解決思路

因爲底層的TCP沒法理解上層的業務數據,因此在底層是沒法保證數據包不被拆分和重組的,這個問題只能經過上層的應用協議棧設計來解決,根據業界的主流協議的解決方案,能夠概括以下:

  1. 消息定長,例如每一個報文的大小爲固定長度200字節,若是不夠,空位補空格;
  2. 在包尾增長回車換行符進行分割,例如FTP協議;
  3. 將消息分爲消息頭和消息體,消息頭中包含表示消息總長度(或者消息體長度)的字段,一般設計思路爲消息頭的第一個字段使用int32來表示消息的總長度;
  4. 更復雜的應用層協議。

3. demo方案

做爲socket長鏈接的demo,使用了上述的解決思路2,即在包尾增長回車換行符進行數據的分割,同時總體數據使用約定的Json體進行做爲消息的傳輸格式。

使用換行符進行數據分割,可以下進行數據的單行讀取:

BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String message;
while ((message = reader.readLine()) != null) {
//....
}

可以下進行數據的單行寫入:

PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()), true);
writer.println(message);

Json消息格式以下:

  1. 服務端接收消息實體類
@Data
public class ServerReceiveDto implements Serializable {

    private static final long serialVersionUID = 6600253865619639317L;

    /**
     * 功能碼 0 心跳 1 登錄 2 登出 3 發送消息
     */
    private Integer functionCode;

    /**
     * 用戶id
     */
    private String userId;

    /**
     * 這邊假設是string的消息體
     */
    private String message;

}
  1. 服務端發送消息實體類
@Data
public class ServerSendDto implements Serializable {

    private static final long serialVersionUID = -7453297551797390215L;

    /**
     * 狀態碼 20000 成功,不然有errorMessage
     */
    private Integer statusCode;

    private String message;

    /**
     * 功能碼
     */
    private Integer functionCode;

    /**
     * 錯誤消息
     */
    private String errorMessage;
}
  1. 客戶端發送消息實體類
@Data
public class ClientSendDto implements Serializable {

    private static final long serialVersionUID = 97085384412852967L;

    /**
     * 功能碼 0 心跳 1 登錄 2 登出 3 發送消息
     */
    private Integer functionCode;

    /**
     * 用戶id
     */
    private String userId;

    /**
     * 這邊假設是string的消息體
     */
    private String message;

}

客戶端或服務端掉線檢測功能

1. 實現思路

經過自定義心跳包來實現掉線檢測功能,具體思路以下:

客戶端鏈接上服務端後,在服務端會維護一個在線客戶端列表。客戶端每隔一段時間,向服務端發送一個心跳包,服務端受收到包之後,會更新客戶端最近一次在線時間。一旦服務端超過規定時間沒有接收到客戶端發來的包,則視爲掉線。

2. 代碼實現

維護一個客戶端map,其中key表明用戶的惟一id(用戶惟一id的身份驗證下面會說明),value表明用戶對應的一個實體

/**
 * 存儲當前由用戶信息活躍的的socket線程
 */
private ConcurrentMap<String, Connection> existSocketMap = new ConcurrentHashMap<>();

其中Connection對象包含的信息以下:

@Slf4j
@Data
public class Connection {

    /**
     * 當前的socket鏈接實例
     */
    private Socket socket;

    /**
     * 當前鏈接線程
     */
    private ConnectionThread connectionThread;

    /**
     * 當前鏈接是否登錄
     */
    private boolean isLogin;

    /**
     * 存儲當前的user信息
     */
    private String userId;

    /**
     * 建立時間
     */
    private Date createTime;

    /**
     * 最後一次更新時間,用於判斷心跳
     */
    private Date lastOnTime;
}

主要關注其中的lastOnTime字段,每次服務端接收到標識是心跳數據,會更新當前的lastOnTime字段,代碼以下:

if (functionCode.equals(FunctionCodeEnum.HEART.getValue())) {
    //心跳類型
    connection.setLastOnTime(new Date());
    //發送一樣的心跳數據給客戶端
    ServerSendDto dto = new ServerSendDto();
    dto.setFunctionCode(FunctionCodeEnum.HEART.getValue());
    connection.println(JSONObject.toJSONString(dto));
}

額外會有一個監測進程,以必定頻率來監測上述維護的map中的每個Connection對象,若是當前時間與lastOnTime的時間間隔超過自定義的長度,則自動將其對應的socket鏈接關閉,代碼以下:

Date now = new Date();
Date lastOnTime = connectionThread.getConnection().getLastOnTime();
long heartDuration = now.getTime() - lastOnTime.getTime();
if (heartDuration > SocketConstant.HEART_RATE) {
    //心跳超時,關閉當前線程
    log.error("心跳超時");
    connectionThread.stopRunning();
}

在上面代碼中,服務端收到標識是心跳數據的時候,除了更新該socket對應的lastOnTime,還會一樣一樣心跳類型的數據給客戶端,客戶端收到標識是心跳數據的時候也會更新本身的lastOnTime字段,同時也有一個心跳監測線程在監測當前的socket鏈接心跳是否超時

客戶端身份獲知、強制身份驗證

1. 實現思路

經過代碼socket = serverSocket.accept()得到的一個socket鏈接咱們僅僅只能知道其客戶端的ip以及端口號,並不能獲知這個socket鏈接對應的究竟是哪個客戶端,所以必須得先得到客戶端的身份而且驗證經過其身份才能讓其正常鏈接。

具體的實現思路是:

自定義一個登錄處理接口,當server端受到標識是用戶登錄的時候(此時會攜帶用戶信息或者token,此處簡化爲用戶id),調用用戶的登錄驗證,驗證經過的話則將該socket鏈接與用戶信息綁定,設置其爲已登陸,而且封裝對應的對象放入前面提的客戶端map中,由此可得到具體用戶對應的哪個socket鏈接。

爲了實現socket鏈接的強制驗證,在監測線程中,也會判斷當前用戶多長時間內沒有實現登陸態,若超時則認爲該socket鏈接爲非法鏈接,主動關閉該socket鏈接。

2. 代碼實現

自定義登錄處理接口,這邊簡單以userId來判斷是否容許登錄:

public interface LoginHandler {

    /**
     * client登錄的處理函數
     *
     * @param userId 用戶id
     *
     * @return 是否驗證經過
     */
    boolean canLogin(String userId);
}

收到客戶端發來的數據時候的處理:

if (functionCode.equals(FunctionCodeEnum.LOGIN.getValue())) {
    //登錄,身份驗證
    String userId = receiveDto.getUserId();
    if (socketServer.getLoginHandler().canLogin(userId)) {
        //設置用戶對象已登陸狀態
        connection.setLogin(true);
        connection.setUserId(userId);
        if (socketServer.getExistSocketMap().containsKey(userId)) {
            //存在已登陸的用戶,發送登出指令並主動關閉該socket
            Connection existConnection = socketServer.getExistSocketMap().get(userId);
            ServerSendDto dto = new ServerSendDto();
            dto.setStatusCode(999);
            dto.setFunctionCode(FunctionCodeEnum.MESSAGE.getValue());
            dto.setErrorMessage("force logout");
            existConnection.println(JSONObject.toJSONString(dto));
            existConnection.getConnectionThread().stopRunning();
            log.error("用戶被客戶端重入踢出,userId:{}", userId);
        }
        //添加到已登陸map中
        socketServer.getExistSocketMap().put(userId, connection);
}

監測線程判斷用戶是否完成身份驗證:

if (!connectionThread.getConnection().isLogin()) {
    //尚未用戶登錄成功
    Date createTime = connectionThread.getConnection().getCreateTime();
    long loginDuration = now.getTime() - createTime.getTime();
    if (loginDuration > SocketConstant.LOGIN_DELAY) {
        //身份驗證超時
        log.error("身份驗證超時");
        connectionThread.stopRunning();
    }
}

socket異常處理與垃圾線程回收

1. 實現思路

socket在讀取數據或者發送數據的時候會出現各類異常,好比客戶端的socket已斷開鏈接(正常斷開或物理鏈接斷開等),可是服務端還在發送數據或者還在接受數據的過程當中,此時socket會拋出相關異常,對於該異常的處理須要將自身的socket鏈接關閉,避免資源的浪費,同時因爲是多線程方案,還需將該socket對應的線程正常清理。

2. 代碼實現

下面以server端發送數據爲例,改代碼中加入了重試機制:

public void println(String message) {
    int count = 0;
    PrintWriter writer;
    do {
        try {
            writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()), true);
            writer.println(message);
            break;
        } catch (IOException e) {
            count++;
            if (count >= RETRY_COUNT) {
                //重試屢次失敗,說明client端socket異常
                this.connectionThread.stopRunning();
            }
        }
        try {
            Thread.sleep(2 * 1000);
        } catch (InterruptedException e1) {
            log.error("Connection.println.IOException interrupt,userId:{}", userId);
        }
    } while (count < 3);
}

上述調用的this.connectionThread.stopRunning();代碼以下:

public void stopRunning() {
    //設置線程對象狀態,便於線程清理
    isRunning = false;
    try {
        //異常狀況須要將該socket資源釋放
        socket.close();
    } catch (IOException e) {
        log.error("ConnectionThread.stopRunning failed.exception:{}", e);
    }
}

上述代碼中設置了線程對象的狀態,下述代碼在監測線程中執行,將沒有運行的線程給清理掉

/**
 * 存儲只要有socket處理的線程
 */
private List<ConnectionThread> existConnectionThreadList = Collections.synchronizedList(new ArrayList<>());

/**
 * 中間list,用於遍歷的時候刪除
 */
private List<ConnectionThread> noConnectionThreadList = Collections.synchronizedList(new ArrayList<>());

//...

//刪除list中沒有用的thread引用
existConnectionThreadList.forEach(connectionThread -> {
    if (!connectionThread.isRunning()) {
        noConnectionThreadList.add(connectionThread);
    }
});
noConnectionThreadList.forEach(connectionThread -> {
    existConnectionThreadList.remove(connectionThread);
    if (connectionThread.getConnection().isLogin()) {
        //說明用戶已經身份驗證成功了,須要刪除map
        this.existSocketMap.remove(connectionThread.getConnection().getUserId());
    }
});
noConnectionThreadList.clear();

項目結構

因爲使用了springboot框架來實現該demo,因此項目結構以下:

總體目錄

socket工具包目錄以下:

socket工具包目錄

pom文件主要添加了springboot的相關依賴,以及json工具和lombok工具等,依賴以下:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.3.RELEASE</version>
    <relativePath/>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.36</version>
    </dependency>
</dependencies>

本身寫的socket工具包的使用方式以下:

@Configuration
@Slf4j
public class SocketServerConfig {

@Bean
public SocketServer socketServer() {
    SocketServer socketServer = new SocketServer(60000);
    socketServer.setLoginHandler(userId -> {
        log.info("處理socket用戶身份驗證,userId:{}", userId);
        //用戶名中包含了dingxu則容許登錄
        return userId.contains("dingxu");

    });
    socketServer.setMessageHandler((connection, receiveDto) -> log
            .info("處理socket消息,userId:{},receiveDto:{}", connection.getUserId(),
                    JSONObject.toJSONString(receiveDto)));
    socketServer.start();
    return socketServer;
}
}

該demo中主要提供瞭如下幾個接口進行測試:

  • 服務端:得到當前用戶列表,發送一個消息
  • 客戶端:開始一個socket客戶端,發送一個消息,關閉一個socket客戶端,查看已開啓的客戶端

具體的postman文件也放已在項目中,具體可點此連接得到

demo中還提供了一個簡單壓測函數,以下:

@Slf4j
public class SocketClientTest {

    public static void main(String[] args) {
        ExecutorService clientService = Executors.newCachedThreadPool();
        String userId = "dingxu";
        for (int i = 0; i < 1000; i++) {
            int index = i;
            clientService.execute(() -> {
                try {
                    SocketClient client;
                    client = new SocketClient(InetAddress.getByName("127.0.0.1"), 60000);
                    //登錄
                    ClientSendDto dto = new ClientSendDto();
                    dto.setFunctionCode(FunctionCodeEnum.LOGIN.getValue());
                    dto.setUserId(userId + index);
                    client.println(JSONObject.toJSONString(dto));
                    ScheduledExecutorService clientHeartExecutor = Executors.newSingleThreadScheduledExecutor(
                            r -> new Thread(r, "socket_client+heart_" + r.hashCode()));
                    clientHeartExecutor.scheduleWithFixedDelay(() -> {
                        try {
                            ClientSendDto heartDto = new ClientSendDto();
                            heartDto.setFunctionCode(FunctionCodeEnum.HEART.getValue());
                            client.println(JSONObject.toJSONString(heartDto));
                        } catch (Exception e) {
                            log.error("客戶端異常,userId:{},exception:{}", userId, e.getMessage());
                            client.close();
                        }
                    }, 0, 5, TimeUnit.SECONDS);
                    while (true){

                    }
                } catch (Exception e) {
                    log.error(e.getMessage());
                }

            });
        }
    }

}

參考

相關文章
相關標籤/搜索