Spring整合ActiveMQ項目實戰

引言

   Apache ActiveMQ是Apache軟件基金會所研發的開放源代碼消息中間件,說白了就是個服務器,主要用來存放請求消息的java

原理

   這篇博客圖文並茂,通俗易懂  ActiveMQ做用總結ajax

   筆者將其精煉了一下,主要有4大應用場景:異步處理,應用解耦,流量削鋒,消息通信spring

   其核心思想都是把用戶的請求先存放在MQ中,而後返回用戶響應,後臺再慢慢去處理MQ中的消息,不須要一條龍業務所有跑完再返回響應,這樣的話單位時間內請求數能夠更多,響應速度也更快,至關於提升了吞吐量。其實前3種場景都差很少,筆者看來沒有絕對的邊界,只不過異步處理強調非同時性,應用解耦強調子系統掛掉後MQ體現的做用,流量削鋒強調MQ在高併發中體現的做用。消息通信的業務模式舉例子:1.用微信和微信好友聊天 2.微信羣聊天apache

準備工做

   源碼地址:json

   安裝好activeMQ,如何安裝自行百度tomcat

項目說明

   項目適用jdk1.8,採用idea多模塊架構,涉及技術有spring, activemq, tomcatbash

項目結構

   

   client是模擬消費者,domain是公共工具包,被maven打成jar供其它項目適用,service是模擬消息生產者服務器

啓動activemq服務器微信


雙擊activemq.bat啓動session

登錄 http://localhost:8161/admin/queues.jsp ,發現Queues是空的


看一下service的配置activemq_config.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
   xmlns:jms="http://www.springframework.org/schema/jms"
   xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">

   <!-- 這裏暴露內部統一使用的MQ地址 -->
   <bean id="internalTargetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
      <property name="brokerURL" value="tcp://localhost:61616" />
   </bean>
   <bean id="internalConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
      destroy-method="stop">
      <property name="connectionFactory" ref="internalTargetConnectionFactory" />
      <property name="maxConnections" value="20" />
   </bean>
   <!-- Spring提供的JMS工具類,它能夠進行消息發送、接收等 -->
   <bean id="internalJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
      <property name="connectionFactory" ref="internalConnectionFactory" />
   </bean>

   <!-- 推送給用戶信息  建立一個Queue-->
   <bean id="userServiceQueue" class="org.apache.activemq.command.ActiveMQQueue">
      <constructor-arg>
         <value>user.service.queue</value>
      </constructor-arg>
   </bean>
   <!-- 推送給新聞信息   建立一個Queue-->
   <bean id="newsServiceQueue" class="org.apache.activemq.command.ActiveMQQueue">
      <constructor-arg>
         <value>news.service.queue</value>
      </constructor-arg>
   </bean>
   <!-- 推送給客戶信息   建立一個Queue-->
   <bean id="clientServiceQueue" class="org.apache.activemq.command.ActiveMQQueue">
      <constructor-arg>
         <value>client.service.queue</value>
      </constructor-arg>
   </bean>    
</beans>複製代碼

一共3種推送,每種推送對應1個隊列名

service結構

 PushService是1個通用接口,而後3種推送各對應1個實現,使用tomcat啓動service服務

登錄localhost:8080


按照如上填寫後,咱們點擊推送用戶信息,出現以下提示框


登錄 http://localhost:8161/admin/queues.jsp ,


發現新增了一個隊列,待處理消息數量1,消費者數量0,消息排隊1,消息已出列0

咱們看看後臺執行過程

js經過ajax請求到後臺

@RequestMapping(value="/user",method=RequestMethod.POST)
@ResponseBody
public ResultRespone userPush(User info){
 ResultRespone respone = new ResultRespone();
 try {
  userPushService.push(info);
  respone.setData(info);
 } catch (Exception e) {
  e.printStackTrace();
  respone = new ResultRespone(false, e.getMessage());
 }
 return respone;
}複製代碼

調用push()方法

@Autowired
@Qualifier("userServiceQueue")
private Destination destination;

@Override
public void push(final Object info) {
 pushExecutor.execute(new Runnable() {
  @Override
  public void run() {
   jmsTemplate.send(destination, new MessageCreator() {
    public Message createMessage(Session session) throws JMSException {
      User p = (User) info;
     return session.createTextMessage(JSON.toJSONString(p));
    }
   });
  }    
 });
}複製代碼

這個過程實際上將用戶屬性值組成的字符串發送到了activemq服務器,到此,生產者的任務就完成了

消費端結構


主要經過3個listener來接收activemq發送過來的消息

看其中一個UserPushListener.java

@Component("userPushListener")
public class UserPushListener implements MessageListener {
  protected static final Logger logger = Logger.getLogger(UserPushListener.class);
 @Override
 public void onMessage(Message message) {
   logger.info("[UserPushListener.onMessage]:begin onMessage.");
         TextMessage textMessage = (TextMessage) message;
         try {
             String jsonStr = textMessage.getText();
             logger.info("[UserPushListener.onMessage]:receive message is,"+ jsonStr);
             if (jsonStr != null) {
                 User info = JSON.parseObject(jsonStr, User.class);
                 System.out.println("==============================接受到的用戶信息 開始====================================");
                 System.out.println(info.toString());
                 System.out.println("==============================接受到的用戶信息 結束====================================");
                 WebsocketController.broadcast("user", jsonStr);
             }
         } catch (JMSException e) {
             logger.error("[UserPushListener.onMessage]:receive message occured an exception",e);
         }
         logger.info("[UserPushListener.onMessage]:end onMessage.");
     }
}複製代碼

看一下消費端的配置

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"  
    xmlns:jms="http://www.springframework.org/schema/jms"  
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">  
     
    <!-- 內部統一使用的MQ地址 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
    </bean>
    <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">  
        <property name="connectionFactory" ref="targetConnectionFactory"/>  
        <property name="maxConnections" value="50"/>
    </bean>
    <!-- Spring提供的JMS工具類,它能夠進行消息發送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
        <property name="connectionFactory" ref="connectionFactory"/>
    </bean>

   <!-- 推送給用戶信息 -->
   <bean id="userPushListenerMQ" class="org.apache.activemq.command.ActiveMQQueue">
      <constructor-arg>
         <value>user.service.queue</value>
      </constructor-arg>
   </bean>
   <!-- 推送給新聞信息 -->
   <bean id="newsPushListenerMQ" class="org.apache.activemq.command.ActiveMQQueue">
      <constructor-arg>
         <value>news.service.queue</value>
      </constructor-arg>
   </bean>
   <!-- 推送給客戶信息 -->
   <bean id="clientPushListenerMQ" class="org.apache.activemq.command.ActiveMQQueue">
      <constructor-arg>
         <value>client.service.queue</value>
      </constructor-arg>
   </bean>
   
   <!-- 用戶接受推送 -->
    <bean id="userPushListenerConsumer"
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="userPushListenerMQ" />
        <property name="messageListener" ref="userPushListener" />
    </bean>
    
   <!-- 新聞接受推送 -->
    <bean id="newsPushListenerConsumer"
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="newsPushListenerMQ" />
        <property name="messageListener" ref="newsPushListener" />
    </bean>
    
   <!-- 客戶接受推送 -->
    <bean id="clientPushListenerConsumer"
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="clientPushListenerMQ" />
        <property name="messageListener" ref="clientPushListener" />
    </bean>
</beans>複製代碼

消費端監聽了3個隊列,因此隊列一旦有消息,消費端就會監聽到,並且activemq能夠確認哪些消息被推送成功了

關閉service服務,啓動client服務,觀察日誌


成功接收到消息,再次查看 http://localhost:8161/admin/queues.jsp


發現user.service.queue這個隊列的消息是待處理消息數量0,消費者數量1,消息排隊1,消息已出列1,代表消息推送完畢,另外兩個新增的隊列是客戶端監聽形成的,能夠看出待處理消息的數量都是0

相關文章
相關標籤/搜索