SpringAOP+RabbitMQ+WebSocket實戰

背景

最近公司的客戶要求,分配給員工的任務除了有微信通知外,還但願PC端的網頁也能實時收到通知。管理員分配任務是在咱們的系統A,而員工接受任務是在系統B。兩個系統都是如今已投入使用的系統。html

技術選型

       根據需求咱們最終選用SpringAOP+RabbitMQ+WebSocket。前端

       SpringAOP可讓咱們不修改原有代碼,直接將原有service做爲切點,加入切面。RabbitMQ可讓A系統和B系統解耦。WebSocket則能夠達到實時通知的要求。java

SpringAOP

AOP稱爲面向切面編程,在程序開發中主要用來解決一些系統層面上的問題,好比日誌,事務,權限等待。是Spring的核心模塊,底層是經過動態代理來實現(動態代理將在以後的文章重點介紹)。web

基本概念spring

Aspect(切面):一般是一個類,裏面能夠定義切入點和通知。編程

JointPoint(鏈接點):程序執行過程當中明確的點,通常是方法的調用。瀏覽器

Advice(通知):AOP在特定的切入點上執行的加強處理,有before,after,afterReturning,afterThrowing,around。服務器

Pointcut(切入點):就是帶有通知的鏈接點,在程序中主要體現爲書寫切入點表達式。微信

通知類型websocket

Before:在目標方法被調用以前作加強處理。

@Before只須要指定切入點表達式便可

AfterReturning:在目標方法正常完成後作加強。

@AfterReturning除了指定切入點表達式後,還能夠指定一個返回值形參名returning,表明目標方法的返回值

AfterThrowing:主要用來處理程序中未處理的異常。

@AfterThrowing除了指定切入點表達式後,還能夠指定一個throwing的返回值形參名,能夠經過該形參名

來訪問目標方法中所拋出的異常對象

After:在目標方法完成以後作加強,不管目標方法時候成功完成。

@After能夠指定一個切入點表達式

Around:環繞通知,在目標方法完成先後作加強處理,環繞通知是最重要的通知類型,像事務,日誌等都是環繞通知,注意編程中核心是一個ProceedingJoinPoint。

 

RabbitMQ

(圖摘自:https://www.cnblogs.com/dwlsxj/p/RabbitMQ.html

從圖中咱們能夠看到RabbitMQ主要的結構有:Routing、Binding、Exchange、Queue。

Queue

Queue(隊列)RabbitMQ的做用是存儲消息,隊列的特性是先進先出。

Exchange

生產者產生的消息並非直接發送給消息隊列Queue的,而是要通過Exchange(交換器),由Exchange再將消息路由到一個或多個Queue,還會將不符合路由規則的消息丟棄。

Routing

       用於標記或生產者尋找Exchange。

Binding

       用於Exchange和Queue作關聯。

Exchange Type

fanout

fanout類型的Exchange路由規則很是簡單,它會把全部發送到該Exchange的消息路由到全部與它綁定的Queue中。

direct

direct會把消息路由到那些binding key與routing key徹底匹配的Queue中。

topic

direct規則是嚴格意義上的匹配,換言之Routing Key必須與Binding Key相匹配的時候纔將消息傳送給Queue,那麼topic這個規則就是模糊匹配,能夠經過通配符知足一部分規則就能夠傳送。

headers

headers類型的Exchange不依賴於routing key與binding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。

 

WebSocket

瞭解websocket必須先知道幾個經常使用的web通訊技術及其區別。

短輪詢

  短輪詢的基本思路就是瀏覽器每隔一段時間向瀏覽器發送http請求,服務器端在收到請求後,不管是否有數據更新,都直接進行響應。這種方式實現的即時通訊,本質上仍是瀏覽器發送請求,服務器接受請求的一個過程,經過讓客戶端不斷的進行請求,使得客戶端可以模擬實時地收到服務器端的數據的變化。

  這種方式的優勢是比較簡單,易於理解,實現起來也沒有什麼技術難點。缺點是顯而易見的,這種方式因爲須要不斷的創建http鏈接,嚴重浪費了服務器端和客戶端的資源。尤爲是在客戶端,距離來講,若是有數量級想對比較大的人同時位於基於短輪詢的應用中,那麼每個用戶的客戶端都會瘋狂的向服務器端發送http請求,並且不會間斷。人數越多,服務器端壓力越大,這是很不合理的。

  所以短輪詢不適用於那些同時在線用戶數量比較大,而且很注重性能的Web應用。

長輪詢/comet

  comet指的是,當服務器收到客戶端發來的請求後,不會直接進行響應,而是先將這個請求掛起,而後判斷服務器端數據是否有更新。若是有更新,則進行響應,若是一直沒有數據,則到達必定的時間限制(服務器端設置)後關閉鏈接。

  長輪詢和短輪詢比起來,明顯減小了不少沒必要要的http請求次數,相比之下節約了資源。長輪詢的缺點在於,鏈接掛起也會致使資源的浪費。

SSE

  SSE是HTML5新增的功能,全稱爲Server-Sent Events。它能夠容許服務推送數據到客戶端。SSE在本質上就與以前的長輪詢、短輪詢不一樣,雖然都是基於http協議的,可是輪詢須要客戶端先發送請求。而SSE最大的特色就是不須要客戶端發送請求,能夠實現只要服務器端數據有更新,就能夠立刻發送到客戶端。

  SSE的優點很明顯,它不須要創建或保持大量的客戶端發往服務器端的請求,節約了不少資源,提高應用性能。而且SSE的實現很是簡單,不須要依賴其餘插件。

WebSocket

  WebSocket是Html5定義的一個新協議,與傳統的http協議不一樣,該協議能夠實現服務器與客戶端之間全雙工通訊。簡單來講,首先須要在客戶端和服務器端創建起一個鏈接,這部分須要http。鏈接一旦創建,客戶端和服務器端就處於平等的地位,能夠相互發送數據,不存在請求和響應的區別。

  WebSocket的優勢是實現了雙向通訊,缺點是服務器端的邏輯很是複雜。如今針對不一樣的後臺語言有不一樣的插件可使用。

四種Web即時通訊技術比較

  從兼容性角度考慮,短輪詢>長輪詢>長鏈接SSE>WebSocket;

  從性能方面考慮,WebSocket>長鏈接SSE>長輪詢>短輪詢。

實戰

項目使用SpringBoot搭建。RabbitMQ的安裝這裏不講述。

RabbitMQ配置

兩個系統A、B都須要操做RabbitMQ,其中A生產消息,B消費消息。故都須要配置。

一、首先引入RabbitMQ的dependency:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

這個dependency中包含了RabbitMQ相關dependency。

二、在項目的配置文件裏配置爲使用rabbitmq及其參數。

application-pro.yml

#消息隊列
message.queue.type: rabbitmq
## rabbit mq properties
rabbitmq:
  host: localhost
  port: 5672
  username: guest
  password: guest

application.properties

#將要使用的隊列名
rabbitmq.websocket.msg.queue=websocket_msg_queue

三、建立配置文件。隊列的建立交給spring。

RabbitMQConfig.java

@Configuration
@EnableRabbit
public class RabbitMQConfig {

    @Value("${rabbitmq.host}")
    private String host;
    @Value("${rabbitmq.port}")
    private String port;
    @Value("${rabbitmq.username}")
    private String username;
    @Value("${rabbitmq.password}")
    private String password;
    @Value("${rabbitmq.websocket.msg.queue}")
    private String webSocketMsgQueue;

    @Bean
    public ConnectionFactory connectionFactory() throws IOException {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setUsername(username);
        factory.setPassword(password);
//        factory.setVirtualHost("test");
        factory.setHost(host);
        factory.setPort(Integer.valueOf(port));
        factory.setPublisherConfirms(true);

        //設置隊列參數,是否持久化、隊列TTL、隊列消息TTL等
        factory.createConnection().createChannel(false).queueDeclare(webSocketMsgQueue, true, false, false, null);
        return factory;
    }

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    // 必須是prototype類型
    public RabbitTemplate rabbitTemplate() throws IOException {
        return new RabbitTemplate(connectionFactory());
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() throws IOException {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrentConsumers(3);
        factory.setMaxConcurrentConsumers(10);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }
}

 

四、系統B中建立隊列監聽,當隊列有消息時,發送websocket通知。

RabbitMQListener.java

@Component
public class RabbitMQListener {

    @Autowired
    private RabbitMQService mqService;

    /**
     * WebSocket推送監聽器
     * @param socketEntity
     * @param deliveryTag
     * @param channel
     */
    @RabbitListener(queues = "websocket_msg_queue")
    public void webSocketMsgListener(@Payload WebSocketMsgEntity socketMsgEntity, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {
        mqService.handleWebSocketMsg(socketMsgEntity, deliveryTag, channel);
    }

}

RabbitMQService.java

public class RabbitMQService {
    @Autowired
    private MessageWebSocketHandler messageWebSocketHandler;

    /**
     * @param socketMsgEntity
     * @param deliveryTag
     * @param channel
     * @throws IOException
     */
    void handleWebSocketMsg(WebSocketMsgEntity socketMsgEntity, long deliveryTag, Channel channel) throws IOException {
        try {
            messageWebSocketHandler.sendMessageToUsers(socketMsgEntity.toJsonString(), socketMsgEntity.getToUserIds());
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            channel.basicNack(deliveryTag, false, false);
        }
    }
}

WebSocketMsgEntity爲MQ中傳送的實體。

public class WebSocketMsgEntity implements Serializable {
    public enum OrderType{
        repair("維修"),
        maintain("保養"),
        measure("計量");

        OrderType(String value){
            this.value = value;
        }
        String value;

        public String getValue() {
            return value;
        }
    }
    //設備名稱
    private String EquName;
    //設備編號
    private String EquId;
    //工單類型
    private OrderType orderType;
    //工單單號
    private String orderId;
    //工單狀態
    private String orderStatus;
    //建立時間
    private Date createTime;
    //消息接收人ID
    private List<String> toUserIds;

    public String getEquName() {
        return EquName;
    }

    public void setEquName(String equName) {
        EquName = equName;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public String getEquId() {
        return EquId;
    }

    public void setEquId(String equId) {
        EquId = equId;
    }

    public String getOrderStatus() {
        return orderStatus;
    }

    public void setOrderStatus(String orderStatus) {
        this.orderStatus = orderStatus;
    }


    public OrderType getOrderType() {
        return orderType;
    }

    public void setOrderType(OrderType orderType) {
        this.orderType = orderType;
    }

    public Date getCreateTime() {
        return createTime;
    }

    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }

    public List<String> getToUserIds() {
        return toUserIds;
    }

    public void setToUserIds(List<String> toUserIds) {
        this.toUserIds = toUserIds;
    }

    public String toJsonString(){
        return JSON.toJSONString(this);
    }
}

SpringAOP

一、系統A中建立一個切面類DataInterceptor.java

@Aspect
@Component
public class DataInterceptor {
    @Autowired
    private MessageQueueService queueService;


    //維修工單切點
    @Pointcut("execution(* com.zhishang.hes.common.service.impl.RepairServiceImpl.executeFlow(..))")
    private void repairMsg() {
    }

    /**
     * 返回通知,方法執行正常返回時觸發
     *
     * @param joinPoint
     * @param result
     */
    @AfterReturning(value = "repairMsg()", returning = "result")
    public void afterReturning(JoinPoint joinPoint, Object result) {
        //此處能夠得到切點方法名
        //String methodName = joinPoint.getSignature().getName();
        EquipmentRepair equipmentRepair = (EquipmentRepair) result;
        WebSocketMsgEntity webSocketMsgEntity = this.generateRepairMsgEntity(equipmentRepair);
        if (webSocketMsgEntity == null) {
            return;
        }
        queueService.send(webSocketMsgEntity);
    }

    /**
     * 生成發送到MQ的維修消息
     *
     * @param equipmentRepair
     * @return
     */
    private WebSocketMsgEntity generateRepairMsgEntity(EquipmentRepair equipmentRepair) {
        WebSocketMsgEntity webSocketMsgEntity = generateRepairMsgFromTasks(equipmentRepair);
        return webSocketMsgEntity;
    }

    /**
     * 從任務中生成消息
     *
     * @param equipmentRepair
     * @return
     */
    private WebSocketMsgEntity generateRepairMsgFromTasks(EquipmentRepair equipmentRepair) {
       //業務代碼略
    }

}

二、發送消息到MQ。這裏只貼了發送的核心代碼

public class RabbitMessageQueue extends AbstractMessageQueue {

    @Value("${rabbitmq.websocket.msg.queue}")
    private String webSocketMsgQueue;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void send(WebSocketMsgEntity entity) {
        //沒有指定exchange,則使用默認名爲「」的exchange,binding名與queue名相同
        rabbitTemplate.convertAndSend(webSocketMsgQueue, entity);
    }
}

WebSocket

一、 系統B中引入websocket服務端dependency

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-websocket</artifactId>
    <version>4.3.10.RELEASE</version>
</dependency>

二、 配置websocket,添加處理類

WebSocketConfigurer.java

@Configuration
@EnableWebSocket
public class WebSocketConfig extends WebMvcConfigurerAdapter implements WebSocketConfigurer {

    private static Logger logger = LoggerFactory.getLogger(WebSocketConfig.class);

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        //配置webSocket路徑
        registry.addHandler(messageWebSocketHandler(),"/msg-websocket").addInterceptors(new MyHandshakeInterceptor()).setAllowedOrigins("*");
        //配置webSocket路徑 支持前端使用socketJs
        registry.addHandler(messageWebSocketHandler(), "/sockjs/msg-websocket").setAllowedOrigins("*").addInterceptors(new MyHandshakeInterceptor()).withSockJS();
    }

    @Bean
    public MessageWebSocketHandler messageWebSocketHandler() {
        logger.info("......建立MessageWebSocketHandler......");
        return new MessageWebSocketHandler();
    }

}

MessageWebSocketHandler.java 主要用於websocket鏈接及消息發送處理。配置中還使用了鏈接握手時的處理,主要是取用戶登錄信息,這裏很少講述。

public class MessageWebSocketHandler extends TextWebSocketHandler {
    private static Logger logger = LoggerFactory.getLogger(SystemWebSocketHandler.class);
    private static ConcurrentHashMap<String, CopyOnWriteArraySet<WebSocketSession>> users = new ConcurrentHashMap<>();

    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        String userId = session.getAttributes().get("WEBSOCKET_USERID").toString();
        logger.info("......AfterConnectionEstablished......");
        logger.info("session.getId:" + session.getId());
        logger.info("session.getLocalAddress:" + session.getLocalAddress().toString());
        logger.info("userId:" + userId);
        //websocket鏈接後記錄鏈接信息
        if (users.keySet().contains(userId)) {
            CopyOnWriteArraySet<WebSocketSession> webSocketSessions = users.get(userId);
            webSocketSessions.add(session);
        } else {
            CopyOnWriteArraySet<WebSocketSession> webSocketSessions = new CopyOnWriteArraySet<>();
            webSocketSessions.add(session);
            users.put(userId, webSocketSessions);
        }
    }

    @Override
    public void handleTransportError(WebSocketSession session, Throwable throwable) throws Exception {
        removeUserSession(session);
        if (session.isOpen()) {
            session.close();
        }
        logger.info("異常出現handleTransportError" + throwable.getMessage());
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
        removeUserSession(session);
        logger.info("關閉afterConnectionClosed" + closeStatus.getReason());
    }

    @Override
    public boolean supportsPartialMessages() {
        return false;
    }

    /**
     * 給符合要求的在線用戶發送消息
     *
     * @param message
     */
    public void sendMessageToUsers(String message, List<String> userIds) throws IOException{
        if (StringUtils.isEmpty(message) || CollectionUtils.isEmpty(userIds)) {
            return;
        }
        if (users.isEmpty()) {
            return;
        }
        for (String userId : userIds) {
            if (!users.keySet().contains(userId)) {
                continue;
            }
            CopyOnWriteArraySet<WebSocketSession> webSocketSessions = users.get(userId);
            if (webSocketSessions == null) {
                continue;
            }
            for (WebSocketSession webSocketSession : webSocketSessions) {
                if (webSocketSession.isOpen()) {
                    try {
                        webSocketSession.sendMessage(new TextMessage(message));
                    } catch (IOException e) {
                        logger.error(" WebSocket server send message ERROR " + e.getMessage());
                        try {
                            throw e;
                        } catch (IOException e1) {
                            e1.printStackTrace();
                        }
                    }
                }
            }
        }
    }

    /**
     * websocket清除鏈接信息
     *
     * @param session
     */
    private void removeUserSession(WebSocketSession session) {
        String userId = session.getAttributes().get("WEBSOCKET_USERID").toString();
        if (users.keySet().contains(userId)) {
            CopyOnWriteArraySet<WebSocketSession> webSocketSessions = users.get(userId);
            webSocketSessions.remove(session);
            if (webSocketSessions.isEmpty()) {
                users.remove(userId);
            }
        }
    }
}

整個功能完成後,A系統分配任務時,系統B登錄用戶收到的消息如圖:

整體流程:

一、對於系統B,每一個登錄的用戶都會和服務器創建websocket長鏈接。

二、系統A生成任務,AOP作出響應,將封裝的消息發送給MQ。

三、系統B中的MQ監聽發現隊列有消息到達,消費消息。

四、系統B經過websocket長鏈接將消息發給指定的登錄用戶。

 

 

 

參考:

https://docs.spring.io/spring/docs/4.3.12.RELEASE/spring-framework-reference/htmlsingle/#websocket

http://www.javashuo.com/article/p-nmcskqka-be.html

https://www.cnblogs.com/dwlsxj/p/RabbitMQ.html

https://blog.csdn.net/Holmofy/article/details/78111715

相關文章
相關標籤/搜索