最近被分配了一個站內信模塊,由本身單獨負責這個模塊;這個模塊主要功能就是提供一個接口給調用方,而後將傳送的消息推送至登陸的相關的用戶的客戶端;而後就是用戶對這條消息的操做了,就是寫一些curd的接口供前端調用;html
因爲以前用netty作過一個項目,並且一位大佬也寫了不少關於netty的文章,第一時間就想到去看他寫的設計一個百萬級的消息推送系統; 而後仔細對比了一下,我負責這個模塊:前端
最後選用了netty-socketio這個框架;並且網上的文章也很多;java
首先導入包,我導入的版本是1.7.11;我最開始導入的是跟前邊的一個版本,可是出現了一個問題,就是OnEvent事件沒法監聽,因此我換了更高的版本,而後就能夠了;git
<dependency>
<groupId>com.corundumstudio.socketio</groupId>
<artifactId>netty-socketio</artifactId>
<version>1.7.11</version>
</dependency>
複製代碼
這個整合和以前的springboot整合netty同樣的;github
一句話總結起來就是netty的啓動是以注入的方式啓動,而不是以new的方式;固然也能夠以new的方式啓動,只是這樣的話就沒法直接以注入的方式調用其餘類了;web
該類主要是啓動服務; 我這裏使用了實現InitializingBean接口啓動服務,類上注意聲明@Component,固然也能夠用其餘的方式;只要springboot在啓動時能掃描到這個類就好了;spring
@Component
public class PushServer implements InitializingBean {
@Resource
private EventListenner eventListenner;
@Value("${push.server.port}")
private int serverPort;
@Override
public void afterPropertiesSet() throws Exception {
Configuration config = new Configuration();
config.setPort(serverPort);
SocketConfig socketConfig = new SocketConfig();
socketConfig.setReuseAddress(true);
socketConfig.setTcpNoDelay(true);
socketConfig.setSoLinger(0);
config.setSocketConfig(socketConfig);
config.setHostname("localhost");
SocketIOServer server = new SocketIOServer(config);
server.addListeners(eventListenner);
server.start();
System.out.println("啓動正常");
}
}
複製代碼
該類主要是監聽客戶端的鏈接及斷開,而後進行處理; 在這裏,我對請求地址附帶了用戶的ID;後端
@Component
public class EventListenner {
@Resource
private ClientCache clientCache;
/** * 客戶端鏈接 * @param client */
@OnConnect
public void onConnect(SocketIOClient client) {
String userId = client.getHandshakeData().getSingleUrlParam("userId");
UUID sessionId = client.getSessionId();
clientCache.saveClient(userId,sessionId,client);
System.out.println("創建鏈接");
}
/** * 客戶端斷開 * @param client */
@OnDisconnect
public void onDisconnect(SocketIOClient client) {
String userId = client.getHandshakeData().getSingleUrlParam("userId");
clientCache.deleteSessionClient(userId,client.getSessionId());
System.out.println("關閉鏈接");
}
//消息接收入口,當接收到消息後,查找發送目標客戶端,而且向該客戶端發送消息,且給本身發送消息
// 暫未使用
@OnEvent("messageevent")
public void onEvent(SocketIOClient client, AckRequest request) {
}
}
複製代碼
因爲須要向指定用戶推送消息,因此須要將鏈接信息與用戶綁定,因此前端在登錄的同時須要發送用戶ID;而後在鏈接事件的監聽中將鏈接信息與用戶綁定;鏈接信息與用戶實現一對一對應,可是這樣會出現一個問題,當用戶打開多個頁面時,新開的頁面通道鏈接會將舊頁面的通道鏈接信息覆蓋,形成沒法所有頁面推送,因此將用戶與通道信息改爲一對多的關係; 如何將用戶與通道信息保存,我這裏使用了兩個map集合:緩存
客戶端斷開時則須要用戶ID及sessid; 代碼以下:安全
@Component
public class ClientCache {
//本地緩存
private static Map<String, HashMap<UUID, SocketIOClient>> concurrentHashMap=new ConcurrentHashMap<>();
/** * 存入本地緩存 * @param userId 用戶ID * @param sessionId 頁面sessionID * @param socketIOClient 頁面對應的通道鏈接信息 */
public void saveClient(String userId, UUID sessionId,SocketIOClient socketIOClient){
HashMap<UUID, SocketIOClient> sessionIdClientCache=concurrentHashMap.get(userId);
if(sessionIdClientCache==null){
sessionIdClientCache = new HashMap<>();
}
sessionIdClientCache.put(sessionId,socketIOClient);
concurrentHashMap.put(userId,sessionIdClientCache);
}
/** * 根據用戶ID獲取全部通道信息 * @param userId * @return */
public HashMap<UUID, SocketIOClient> getUserClient(String userId){
return concurrentHashMap.get(userId);
}
/** * 根據用戶ID及頁面sessionID刪除頁面連接信息 * @param userId * @param sessionId */
public void deleteSessionClient(String userId,UUID sessionId){
concurrentHashMap.get(userId).remove(sessionId);
}
}
複製代碼
該類主要是提供給別人調用,向用戶推送消息,這裏就貼直接推送的代碼了,具體的推送業務就不貼出來了;這裏須要注意,推送事件的命名須要與web端監聽命名一致;
@RestController
@RequestMapping("/push")
public class PushController {
@Resource
private ClientCache clientCache;
@GetMapping("/user/{userId}")
public String pushTuUser(@PathVariable("userId") String userId){
HashMap<UUID, SocketIOClient> userClient = clientCache.getUserClient(userId);
userClient.forEach((uuid, socketIOClient) -> {
//向客戶端推送消息
socketIOClient.sendEvent("chatevent","服務端推送消息");
});
return "success";
}
}
複製代碼
因爲web端代碼及依賴較多;就不提供代碼;能夠去官網下載:github.com/mrniko/nett… 下載下來後,打開client文件下的index.html;我修改的地方:
注意事件的監聽中的名稱必定要與後端相對應,否則沒法監聽;調用推送接口,三個頁面同時收到推送消息