Spring Boot系列22 Spring Websocket實現websocket集羣方案的Demo

概述

上一篇文章Spring Boot系列21 Spring Websocket實現websocket集羣方案討論裏詳細介紹了WebSocket集羣的三種方案,並得出結論第三個方案是最好的,本文咱們實現第三個方案。前端

第三個方案以下圖 java

這裏寫圖片描述

在方案一的基礎進行以下修改,新的架構圖流程以下:git

  1. 服務A增長WS模塊,當websocket鏈接過來時,將此用戶的鏈接信息(主要是websocket sesionId值)存儲redis中
  2. 消息生產者發送消息到的交換機,這些服務不直接推送服務A/B
  3. 增長新的模塊dispatch,此模塊接收推送過來的信息,並從redis中讀取消息接收用戶對應的websocket sesionId值,而後根據上面的規則計算出用戶對應的路由鍵,而後將消息發送到用戶訂閱的隊列上
  4. 前端接收消息

詳細實現的代碼

工程名稱:mvc 本文在Spring Boot系列20 Spring Websocket實現向指定的用戶發送消息的基礎進行修改。github

在pom.xml中引入redis,rabbitmq相關的jar

<!--  webscoekt 集羣 須要 引入支持RabbitMQ, redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
複製代碼

rabbitmq, redis的配置

application-wscluster.propertiesweb

# websocket集羣須要配置RabbitMQ
spring.rabbitmq.host:192.168.21.3
spring.rabbitmq.virtual-host: /icc-local
spring.rabbitmq.username: icc-dev
spring.rabbitmq.password: icc-dev

# 配置redis
spring.redis.database=0
spring.redis.host=192.168.21.4
# spring.redis.password=
spring.redis.port=7001
spring.redis.pool.max-idle=8
spring.redis.pool.min-idle=0  
spring.redis.pool.max-active=8  
spring.redis.pool.max-wait=-1
複製代碼

IRedisSessionService及實現

接口IRedisSessionService定義了對redis的操做 IRedisSessionService實現類將用戶名稱和websocket sessionId的關係存儲到redis,提供添加、刪除、查詢 IRedisSessionServiceredis

public interface IRedisSessionService {
    void add(String name, String wsSessionId);
    boolean del(String name);
    String get(String name);
}
複製代碼

SimulationRedisSessionServiceImpl 將用戶名稱和websocket sessionId的關係存儲到redis,提供添加、刪除、查詢spring

@Component
public class SimulationRedisSessionServiceImpl implements IRedisSessionService {

    @Autowired
    private RedisTemplate<String, String> template;

    // key = 登陸用戶名稱, value=websocket的sessionId
    private ConcurrentHashMap<String,String> redisHashMap = new ConcurrentHashMap<>(32);

    /**
     * 在緩存中保存用戶和websocket sessionid的信息
     * @param name
     * @param wsSessionId
     */
    public void add(String name, String wsSessionId){
        BoundValueOperations<String,String> boundValueOperations = template.boundValueOps(name);
        boundValueOperations.set(wsSessionId,24 * 3600, TimeUnit.SECONDS);
    }

    /**
     * 從緩存中刪除用戶的信息
     * @param name
     */
    public boolean del(String name){
        return template.execute(new RedisCallback<Boolean>() {

            @Override
            public Boolean doInRedis(RedisConnection connection)
                    throws DataAccessException {
                byte[] rawKey = template.getStringSerializer().serialize(name);
                return connection.del(rawKey) > 0;
            }
        }, true);
    }

    /**
     * 根據用戶id獲取用戶對應的sessionId值
     * @param name
     * @return
     */
    public String get(String name){
        BoundValueOperations<String,String> boundValueOperations = template.boundValueOps(name);
        return boundValueOperations.get();
    }
}
複製代碼

AuthWebSocketHandlerDecoratorFactory

裝飾WebSocketHandlerDecorator對象,在鏈接創建時,保存websocket的session id,其中key爲賬號名稱;在鏈接斷開時,從緩存中刪除用戶的sesionId值。此websocket sessionId值用於建立消息的路由鍵。瀏覽器

@Component
public class AuthWebSocketHandlerDecoratorFactory implements WebSocketHandlerDecoratorFactory {
    private static final Logger log = LoggerFactory.getLogger(AuthWebSocketHandlerDecoratorFactory.class);

    @Autowired
    private IRedisSessionService redisSessionService;

    @Override
    public WebSocketHandler decorate(WebSocketHandler handler) {
        return new WebSocketHandlerDecorator(handler) {
            @Override
            public void afterConnectionEstablished(final WebSocketSession session) throws Exception {
                // 客戶端與服務器端創建鏈接後,此處記錄誰上線了
                Principal principal = session.getPrincipal();
                if(principal != null){
                    String username = principal.getName();
                    log.info("websocket online: " + username + " session " + session.getId());
                    redisSessionService.add(username, session.getId());
                }
                super.afterConnectionEstablished(session);
            }

            @Override
            public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
                // 客戶端與服務器端斷開鏈接後,此處記錄誰下線了
                Principal principal = session.getPrincipal();
                if(principal != null){
                    String username = session.getPrincipal().getName();
                    log.info("websocket offline: " + username);
                    redisSessionService.del(username);
                }
                super.afterConnectionClosed(session, closeStatus);
            }
        };
    }
}
複製代碼

WebSocketRabbitMQMessageBrokerConfigurer

Spring Boot系列20 Spring Websocket實現向指定的用戶發送消息的基礎上增長以下功能,將myWebSocketHandlerDecoratorFactory配置到websocket緩存

@Configuration
// 此註解開使用STOMP協議來傳輸基於消息代理的消息,此時能夠在@Controller類中使用@MessageMapping
@EnableWebSocketMessageBroker
public class WebSocketRabbitMQMessageBrokerConfigurer extends AbstractWebSocketMessageBrokerConfigurer {

    @Autowired
    private MyPrincipalHandshakeHandler myDefaultHandshakeHandler;
    @Autowired
    private AuthHandshakeInterceptor sessionAuthHandshakeInterceptor;

    @Autowired
    private AuthWebSocketHandlerDecoratorFactory myWebSocketHandlerDecoratorFactory;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        ….
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
          …       
    }

    /**
     * 這時實際spring weboscket集羣的新增的配置,用於獲取創建websocket時獲取對應的sessionid值
     * @param registration
     */
    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.addDecoratorFactory(myWebSocketHandlerDecoratorFactory);
        super.configureWebSocketTransport(registration);
    }
}

複製代碼

TestMQCtl:

在上文Spring Boot系列20 Spring Websocket實現向指定的用戶發送消息的基礎上,對此類進行修改bash

  • sendMq2User()方法根據用戶的賬號和websocket sessionId根據["web訂閱隊列名稱+'-user'+websocket sessionId"]組合路由鍵。而後經過AmqpTemplate 實例向amq.topic交換機發送消息,路由鍵爲["web訂閱隊列名稱+'-user'+websocket sessionId"]。方法中websocket sessionId是從根據賬號名稱從redis中獲取 其它的方法,這裏不一一列出
@Controller
@RequestMapping(value = "/ws")
public class TestMQCtl {
    private  static final Logger log = LoggerFactory.getLogger(TestMQCtl.class);

    @Autowired
    private AmqpTemplate amqpTemplate;
    @Autowired
    private IRedisSessionService redisSessionService;

     /**
     * 向執行用戶發送請求
     * @param msg
     * @param name
     * @return
     */
    @RequestMapping(value = "send2user")
    @ResponseBody
    public int sendMq2User(String msg, String name){
        // 根據用戶名稱獲取用戶對應的session id值
        String wsSessionId = redisSessionService.get(name);
        RequestMessage demoMQ = new RequestMessage();
        demoMQ.setName(msg);

        // 生成路由鍵值,生成規則以下: websocket訂閱的目的地 + "-user" + websocket的sessionId值。生成值相似:
        String routingKey = getTopicRoutingKey("demo", wsSessionId);
        // 向amq.topi交換機發送消息,路由鍵爲routingKey
        log.info("向用戶[{}]sessionId=[{}],發送消息[{}],路由鍵[{}]", name, wsSessionId, wsSessionId, routingKey);
        amqpTemplate.convertAndSend("amq.topic", routingKey,  JSON.toJSONString(demoMQ));
        return 0;
    }

    /**
     * 獲取Topic的生成的路由鍵
     *
     * @param actualDestination
     * @param sessionId
     * @return
     */
    private String getTopicRoutingKey(String actualDestination, String sessionId){
        return actualDestination + "-user" + sessionId;
    }
   ….
}
複製代碼

測試

以不一樣端口啓動兩個服務 啓動服務類:WebSocketClusterApplication 以「--spring.profiles.active=wscluster --server.port=8081」參數啓動服務A 以「--spring.profiles.active=wscluster --server.port=8082」參數啓動服務B

登陸模擬賬號:xiaoming登陸服務A,xiaoming2登陸服務B 使用xiaoming登陸服務A,並登陸websocket http://127.0.0.1:8081/ws/login 使用xiaoming登陸,並提交

這裏寫圖片描述
點擊鏈接,若是鏈接變灰色,則登陸websocket成功
這裏寫圖片描述

打開另外一個瀏覽器,使用xiaoming2登陸服務B,並登陸websocket http://127.0.0.1:8082/ws/login 使用xiaoming2登陸並提交,最後登陸websocket

登陸服務A模擬發送頁面 登陸http://127.0.0.1:8081/ws/send,發送消息

  1. 向賬號xiaoming發送消息xiaoming-receive,只能被鏈接服務A的服務websocket收到 §
  2. 向賬號xiaoming2發送消息xiaoming2-receive,只能被鏈接服務B的服務websocket收到

此時兩個頁面收到信息:

這裏寫圖片描述

xiaoming賬號只收到xiaoming-receive xiaoming2賬號只收到xiaoming2-receive

登陸服務B模擬發送頁面 登陸http://127.0.0.1:8082/ws/send,發送消息,和http://127.0.0.1:8081/ws/send 同樣發送相同消息,結果是同樣

結論 不管用戶登陸的服務A,仍是服務B,咱們經過以上的代碼,咱們均可以發送消息到指定的用戶,因此咱們已經實現websocket集羣

代碼

全部的詳細代碼見github代碼,請儘可能使用tag v0.24,不要使用master,由於master一直在變,不能保證文章中代碼和github上的代碼一直相同

相關文章
相關標籤/搜索