Apache ActiveMQ是Apache軟件基金會所研發的開放源代碼消息中間件,說白了就是個服務器,主要用來存放請求消息的java
這篇博客圖文並茂,通俗易懂 ActiveMQ做用總結ajax
筆者將其精煉了一下,主要有4大應用場景:異步處理,應用解耦,流量削鋒,消息通信spring
其核心思想都是把用戶的請求先存放在MQ中,而後返回用戶響應,後臺再慢慢去處理MQ中的消息,不須要一條龍業務所有跑完再返回響應,這樣的話單位時間內請求數能夠更多,響應速度也更快,至關於提升了吞吐量。其實前3種場景都差很少,筆者看來沒有絕對的邊界,只不過異步處理強調非同時性,應用解耦強調子系統掛掉後MQ體現的做用,流量削鋒強調MQ在高併發中體現的做用。消息通信的業務模式舉例子:1.用微信和微信好友聊天 2.微信羣聊天apache
源碼地址:json
安裝好activeMQ,如何安裝自行百度tomcat
項目適用jdk1.8,採用idea多模塊架構,涉及技術有spring, activemq, tomcatbash
client是模擬消費者,domain是公共工具包,被maven打成jar供其它項目適用,service是模擬消息生產者服務器
啓動activemq服務器微信
雙擊activemq.bat啓動session
登錄 http://localhost:8161/admin/queues.jsp ,發現Queues是空的
看一下service的配置activemq_config.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
<!-- 這裏暴露內部統一使用的MQ地址 -->
<bean id="internalTargetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<bean id="internalConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory" ref="internalTargetConnectionFactory" />
<property name="maxConnections" value="20" />
</bean>
<!-- Spring提供的JMS工具類,它能夠進行消息發送、接收等 -->
<bean id="internalJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="internalConnectionFactory" />
</bean>
<!-- 推送給用戶信息 建立一個Queue-->
<bean id="userServiceQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>user.service.queue</value>
</constructor-arg>
</bean>
<!-- 推送給新聞信息 建立一個Queue-->
<bean id="newsServiceQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>news.service.queue</value>
</constructor-arg>
</bean>
<!-- 推送給客戶信息 建立一個Queue-->
<bean id="clientServiceQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>client.service.queue</value>
</constructor-arg>
</bean>
</beans>複製代碼
一共3種推送,每種推送對應1個隊列名
PushService是1個通用接口,而後3種推送各對應1個實現,使用tomcat啓動service服務
登錄localhost:8080
按照如上填寫後,咱們點擊推送用戶信息,出現以下提示框
登錄 http://localhost:8161/admin/queues.jsp ,
發現新增了一個隊列,待處理消息數量1,消費者數量0,消息排隊1,消息已出列0
咱們看看後臺執行過程
js經過ajax請求到後臺
@RequestMapping(value="/user",method=RequestMethod.POST)
@ResponseBody
public ResultRespone userPush(User info){
ResultRespone respone = new ResultRespone();
try {
userPushService.push(info);
respone.setData(info);
} catch (Exception e) {
e.printStackTrace();
respone = new ResultRespone(false, e.getMessage());
}
return respone;
}複製代碼
調用push()方法
@Autowired
@Qualifier("userServiceQueue")
private Destination destination;
@Override
public void push(final Object info) {
pushExecutor.execute(new Runnable() {
@Override
public void run() {
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
User p = (User) info;
return session.createTextMessage(JSON.toJSONString(p));
}
});
}
});
}複製代碼
這個過程實際上將用戶屬性值組成的字符串發送到了activemq服務器,到此,生產者的任務就完成了
主要經過3個listener來接收activemq發送過來的消息
看其中一個UserPushListener.java
@Component("userPushListener")
public class UserPushListener implements MessageListener {
protected static final Logger logger = Logger.getLogger(UserPushListener.class);
@Override
public void onMessage(Message message) {
logger.info("[UserPushListener.onMessage]:begin onMessage.");
TextMessage textMessage = (TextMessage) message;
try {
String jsonStr = textMessage.getText();
logger.info("[UserPushListener.onMessage]:receive message is,"+ jsonStr);
if (jsonStr != null) {
User info = JSON.parseObject(jsonStr, User.class);
System.out.println("==============================接受到的用戶信息 開始====================================");
System.out.println(info.toString());
System.out.println("==============================接受到的用戶信息 結束====================================");
WebsocketController.broadcast("user", jsonStr);
}
} catch (JMSException e) {
logger.error("[UserPushListener.onMessage]:receive message occured an exception",e);
}
logger.info("[UserPushListener.onMessage]:end onMessage.");
}
}複製代碼
看一下消費端的配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
<!-- 內部統一使用的MQ地址 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory" ref="targetConnectionFactory"/>
<property name="maxConnections" value="50"/>
</bean>
<!-- Spring提供的JMS工具類,它能夠進行消息發送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<!-- 推送給用戶信息 -->
<bean id="userPushListenerMQ" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>user.service.queue</value>
</constructor-arg>
</bean>
<!-- 推送給新聞信息 -->
<bean id="newsPushListenerMQ" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>news.service.queue</value>
</constructor-arg>
</bean>
<!-- 推送給客戶信息 -->
<bean id="clientPushListenerMQ" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>client.service.queue</value>
</constructor-arg>
</bean>
<!-- 用戶接受推送 -->
<bean id="userPushListenerConsumer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="userPushListenerMQ" />
<property name="messageListener" ref="userPushListener" />
</bean>
<!-- 新聞接受推送 -->
<bean id="newsPushListenerConsumer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="newsPushListenerMQ" />
<property name="messageListener" ref="newsPushListener" />
</bean>
<!-- 客戶接受推送 -->
<bean id="clientPushListenerConsumer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="clientPushListenerMQ" />
<property name="messageListener" ref="clientPushListener" />
</bean>
</beans>複製代碼
消費端監聽了3個隊列,因此隊列一旦有消息,消費端就會監聽到,並且activemq能夠確認哪些消息被推送成功了
關閉service服務,啓動client服務,觀察日誌
成功接收到消息,再次查看 http://localhost:8161/admin/queues.jsp
發現user.service.queue這個隊列的消息是待處理消息數量0,消費者數量1,消息排隊1,消息已出列1,代表消息推送完畢,另外兩個新增的隊列是客戶端監聽形成的,能夠看出待處理消息的數量都是0