springboot使用netty-socketio推送消息

前言

背景

最近被分配了一個站內信模塊,由本身單獨負責這個模塊;這個模塊主要功能就是提供一個接口給調用方,而後將傳送的消息推送至登陸的相關的用戶的客戶端;而後就是用戶對這條消息的操做了,就是寫一些curd的接口供前端調用;html

技術選用

因爲以前用netty作過一個項目,並且一位大佬也寫了不少關於netty的文章,第一時間就想到去看他寫的設計一個百萬級的消息推送系統; 而後仔細對比了一下,我負責這個模塊:前端

  • 用戶量不大,由於針對的是運維人員,並且不是全部運維人,是有針對性的;
  • 不用安全驗證,由於這個項目是在內網中運行;
  • 這個模塊不用分佈式,只是一個微服務中的一部分;

最後選用了netty-socketio這個框架;並且網上的文章也很多;java

正文

springboot整合netty-socketio

pom

首先導入包,我導入的版本是1.7.11;我最開始導入的是跟前邊的一個版本,可是出現了一個問題,就是OnEvent事件沒法監聽,因此我換了更高的版本,而後就能夠了;git

<dependency>
            <groupId>com.corundumstudio.socketio</groupId>
            <artifactId>netty-socketio</artifactId>
            <version>1.7.11</version>
        </dependency>

複製代碼

整合

這個整合和以前的springboot整合netty同樣的;github

  1. 實現CommandLineRunner 接口,在springboot啓動前啓動服務
  2. 使用@Configuration註解,將服務以bean的方式啓動
  3. 實現springboot中各類接口,如:實現InitializingBean接口,*Aware接口類等;

一句話總結起來就是netty的啓動是以注入的方式啓動,而不是以new的方式;固然也能夠以new的方式啓動,只是這樣的話就沒法直接以注入的方式調用其餘類了;web

新建ChatServer類

該類主要是啓動服務; 我這裏使用了實現InitializingBean接口啓動服務,類上注意聲明@Component,固然也能夠用其餘的方式;只要springboot在啓動時能掃描到這個類就好了;spring

@Component
public class PushServer implements InitializingBean {
    @Resource
    private EventListenner eventListenner;

    @Value("${push.server.port}")
    private int serverPort;
    @Override
    public void afterPropertiesSet() throws Exception {
        Configuration config = new Configuration();
        config.setPort(serverPort);

        SocketConfig socketConfig = new SocketConfig();
        socketConfig.setReuseAddress(true);
        socketConfig.setTcpNoDelay(true);
        socketConfig.setSoLinger(0);
        config.setSocketConfig(socketConfig);
        config.setHostname("localhost");

        SocketIOServer server = new SocketIOServer(config);
        server.addListeners(eventListenner);
        server.start();
        System.out.println("啓動正常");
    }
}


複製代碼

新建EventListennerlei

該類主要是監聽客戶端的鏈接及斷開,而後進行處理; 在這裏,我對請求地址附帶了用戶的ID;後端

@Component
public class EventListenner {
    @Resource
    private ClientCache clientCache;
    /** * 客戶端鏈接 * @param client */
    @OnConnect
    public void onConnect(SocketIOClient client) {
        String userId = client.getHandshakeData().getSingleUrlParam("userId");
        UUID sessionId = client.getSessionId();
        clientCache.saveClient(userId,sessionId,client);
        System.out.println("創建鏈接");
    }
    /** * 客戶端斷開 * @param client */
    @OnDisconnect
    public void onDisconnect(SocketIOClient client) {
        String userId = client.getHandshakeData().getSingleUrlParam("userId");
        clientCache.deleteSessionClient(userId,client.getSessionId());
        System.out.println("關閉鏈接");
    }
    //消息接收入口,當接收到消息後,查找發送目標客戶端,而且向該客戶端發送消息,且給本身發送消息
    // 暫未使用
    @OnEvent("messageevent")
    public void onEvent(SocketIOClient client, AckRequest request) {
    }
}

複製代碼

緩存

因爲須要向指定用戶推送消息,因此須要將鏈接信息與用戶綁定,因此前端在登錄的同時須要發送用戶ID;而後在鏈接事件的監聽中將鏈接信息與用戶綁定;鏈接信息與用戶實現一對一對應,可是這樣會出現一個問題,當用戶打開多個頁面時,新開的頁面通道鏈接會將舊頁面的通道鏈接信息覆蓋,形成沒法所有頁面推送,因此將用戶與通道信息改爲一對多的關係; 如何將用戶與通道信息保存,我這裏使用了兩個map集合:緩存

  • 裏層map存儲頁面sessionID對應的通道信息,以sessionID爲key,通道鏈接信息爲value
  • 外層map存儲用戶ID對應的裏層數據,以用戶ID爲key,裏層數據爲value;

客戶端斷開時則須要用戶ID及sessid; 代碼以下:安全

@Component
public class ClientCache {

    //本地緩存
    private static Map<String, HashMap<UUID, SocketIOClient>> concurrentHashMap=new ConcurrentHashMap<>();
    /** * 存入本地緩存 * @param userId 用戶ID * @param sessionId 頁面sessionID * @param socketIOClient 頁面對應的通道鏈接信息 */
    public void saveClient(String userId, UUID sessionId,SocketIOClient socketIOClient){
        HashMap<UUID, SocketIOClient> sessionIdClientCache=concurrentHashMap.get(userId);
        if(sessionIdClientCache==null){
            sessionIdClientCache = new HashMap<>();
        }
        sessionIdClientCache.put(sessionId,socketIOClient);
        concurrentHashMap.put(userId,sessionIdClientCache);
    }
    /** * 根據用戶ID獲取全部通道信息 * @param userId * @return */
    public HashMap<UUID, SocketIOClient> getUserClient(String userId){
        return concurrentHashMap.get(userId);
    }
    /** * 根據用戶ID及頁面sessionID刪除頁面連接信息 * @param userId * @param sessionId */
    public void deleteSessionClient(String userId,UUID sessionId){
        concurrentHashMap.get(userId).remove(sessionId);
    }
}

複製代碼

推送接口PushController類

該類主要是提供給別人調用,向用戶推送消息,這裏就貼直接推送的代碼了,具體的推送業務就不貼出來了;這裏須要注意,推送事件的命名須要與web端監聽命名一致;

@RestController
@RequestMapping("/push")
public class PushController {
    @Resource
    private ClientCache clientCache;

    @GetMapping("/user/{userId}")
    public String pushTuUser(@PathVariable("userId") String userId){
        HashMap<UUID, SocketIOClient> userClient = clientCache.getUserClient(userId);
        userClient.forEach((uuid, socketIOClient) -> {
            //向客戶端推送消息
            socketIOClient.sendEvent("chatevent","服務端推送消息");
        });
        return "success";
    }
}


複製代碼

客戶端(web端)

因爲web端代碼及依賴較多;就不提供代碼;能夠去官網下載:github.com/mrniko/nett… 下載下來後,打開client文件下的index.html;我修改的地方:

注意事件的監聽中的名稱必定要與後端相對應,否則沒法監聽;

運行結果

  1. 客戶端 這裏同時打開三個頁面

  1. 服務端

  1. 推送消息

調用推送接口,三個頁面同時收到推送消息

GitHub地址

總結

  1. 在剛開始設計時,我設計用戶與通道的關係就是一對一的關係,最後在提交流程圖的時候,項目組的各位大佬都給我提了許多建議,果真姜仍是老的辣啊,在這裏十分感謝項目組的各位大佬提出的寶貴的建議;
  2. 這是新年的第一篇博客;也是新的一個起點,但願本身能堅持把博客寫下去;
相關文章
相關標籤/搜索