沒有標準定義,通常認爲,採用消息傳送機制/消息隊列 的中間件技術,進行數據交流,用在分佈式系統的集成javascript
解決分佈式系統之間消息的傳遞。
電商場景:css
用戶下單減庫存,調用物流系統。隨着業務量的增大,須要對系統進行拆分(服務化和業務拆分),拆分後的系統之間的交互通常用RPC(遠程過程調用)。若是系統擴充到有幾十個接口,就須要用消息中間件來解決問題。html
在架構上,RPC和Message的差別點:Message有一箇中間結點Message Queue,能夠把消息存儲。前端
Message Queue把請求的壓力保存一下,逐漸釋放出來,讓處理者按照本身的節奏來處理。
Message Queue引入一下新的結點,讓系統的可靠性會受Message Queue結點的影響。
Message Queue是異步單向的消息。發送消息設計成是不須要等待消息處理的完成。
因此對於有同步返回需求,用Message Queue則變得麻煩了。java
同步調用,對於要等待返回結果/處理結果的場景,RPC是能夠很是天然直覺的使用方式。
# RPC也能夠是異步調用。
因爲等待結果,Consumer(Client)會有線程消耗。
若是以異步RPC的方式使用,Consumer(Client)線程消耗能夠去掉。但不能作到像消息同樣暫存消息/請求,壓力會直接傳導到服務Provider。jquery
但願同步獲得結果的場合,RPC合適。
但願使用簡單,則RPC;RPC操做基於接口,使用簡單,使用方式模擬本地調用。異步的方式編程比較複雜。
不但願發送端(RPC Consumer、Message Sender)受限於處理端(RPC Provider、Message Receiver)的速度時,使用Message Queue。
隨着業務增加,有的處理端處理量會成爲瓶頸,會進行同步調用到異步消息的改造。git
這樣的改造實際上有調整業務的使用方式。github
好比原來一個操做頁面提交後就下一個頁面會看處處理結果;改造後異步消息後,下一個頁面就會變成「操做已提交,完成後會獲得通知」。web
用戶註冊(50ms),還需發送郵件(50ms)和短信(50ms)ajax
串行:(150ms)用戶註冊—》發送郵件----》發送短信
並行(100ms):用戶註冊—》發送郵件----》發送短信
消息中間件(56ms):
用戶註冊(50ms)—》(6ms)消息中間件《-----發送郵件《-----發送短信
說明:
用戶註冊時,可能還須要同時發送郵件和短信,使用串行的方式進行處理時花費的時間比較久;這時就會考慮並行處理,即用戶註冊完之後,同時啓動兩個兩個線程去發送郵件和短信,這樣時間就會花費得更少。
若是引入消息中間件的話就會比並行處理更快,即用戶註冊時,把註冊信息放到消息中間件裏面,而後發送郵件和短信的程序本身去消息中間件裏面那用戶註冊的消息來消費
訂單系統---》庫存系統(強耦合)
消息中間件:訂單系統---》消息中間件《----庫存系統(解耦)
緣由:下訂單之後還要同步去修改庫存,沒有必要耦合在一塊兒
說明:
用戶下訂單時,可能還須要去更新庫存,這個時候下訂單的後,還須要同步去更新庫存,這樣兩個系統之間就會有很強的耦合。
因此這個時候引入消息中間件,當用戶下訂單後,直接把訂單信息放入消息中間件裏面,接着就不用管了,庫存系統直接去消息中間件裏面那訂單信息來消費就好了,這樣訂單系統和庫存系統之間就解耦了
用戶請求-----》秒殺應用
應用的前端加入消息隊列
用戶請求-----》消息隊列《----秒殺應用
緣由:用戶訪問太多,服務器承受不了
說明:
在作秒殺的時候會有許多訪問,可能致使系統承受不住。這個時候就須要在應用的前端加入消息隊列,而後秒殺系統就能夠直接去消息隊列裏面拿消息來消費就好了,秒殺系統是能夠選擇性的拿消息過來消費的,若是消息太多就會選擇性的丟棄一些消息
錯誤日誌---》消息隊列《----日誌處理
用戶行爲日誌--》消息隊列(kafka)《-----日誌的存儲或流式處理
緣由:機器太多,日誌查看不方便
說明:
當系統太多的時候,部署了不少機器,每臺機器上面都有日誌,定位問題的時候不可能去每一臺機器去看日誌。
這個時候就須要引入消息中間件,把全部的日誌放到消息中間件裏面,而後在經過一個應用去讀取日誌存庫或者展現
點對點通訊
說明:
kafka和RabbitMQ的比較
1)RabbitMq比kafka成熟,在可用性上,穩定性上,可靠性上,RabbitMq超過kafka
2)Kafka設計的初衷就是處理日誌的,能夠看作是一個日誌系統,針對性很強,因此它並無具有一個成熟MQ應該具有的特性
3)Kafka的性能(吞吐量、tps)比RabbitMq要強
JMS(Java Messaging Service)規範,本質是API,Java平臺消息中間件的規範,java應用程序之間進行消息交換。而且經過提供標準的產生、發送、接收消息的接口簡化企業應用的開發。對應的實現ActiveMQ
1)鏈接工廠:建立一個JMS鏈接
2)JMS鏈接:客戶端和服務器之間的一個鏈接。
3)JMS會話:客戶端和服務器會話的狀態,創建在鏈接之上的
4)JMS目的:消息隊列
5)JMS生產者:消息的生成
6)JMS消費者:接收和消費消息
7)Broker:消息中間件的實例(ActiveMQ)
隊列,一個消息只有一個消費者(即便有多個接受者監聽隊列),消費者是要向隊列應答成功
發佈到Topic的消息會被當前主題全部的訂閱者消費
TextMessage,MapMessage,ObjectMessage,BytesMessage,StreamMessage
編碼過程參考JMS對象模型的幾個要素:
1)鏈接工廠:建立一個JMS鏈接
2)JMS鏈接:客戶端和服務器之間的一個鏈接。
3)JMS會話:客戶端和服務器會話的狀態,創建在鏈接之上的
4)JMS目的:消息隊列
5)JMS生產者:消息的生成
6)JMS消費者:接收和消費消息
7)Broker:消息中間件的實例(ActiveMQ)
1)在ActiveMQ官網下載windows版的ActiveMQ,下載地址:http://activemq.apache.org/activemq-580-release.html
下載後啓動,
在瀏覽器中輸入地址http://127.0.0.1:8161/admin訪問,用戶名和密碼爲admin/admin
2)新建一名爲OriginalActiveMQ的maven工程,結構以下:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.8.0</version> </dependency>
package com.study.demo; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * * @Description: java原生ActiveMQ的API的使用-JMS消息生產者 * @author leeSmall * @date 2018年9月13日 * */ public class JmsProducer { //默認鏈接用戶名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; //默認鏈接密碼 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; //默認鏈接地址 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; //發送的消息數量 private static final int SENDNUM = 10; //編碼過程參考JMS對象模型的幾個要素 public static void main(String[] args) { //1.鏈接工廠:建立一個JMS鏈接 ConnectionFactory connectionFactory; //2.JMS鏈接:客戶端和服務器之間的一個鏈接 Connection connection = null; //3.JMS會話:客戶端和服務器會話的狀態,創建在JMS鏈接之上的 Session session; //4.JMS目的:消息隊列 Destination destination; //5.JMS生產者:消息的生成 MessageProducer messageProducer; //建立一個ActiveMQ的鏈接工廠 connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL); try { //經過鏈接工廠建立一個JMS鏈接 connection = connectionFactory.createConnection(); //開啓JMS鏈接 connection.start(); /* * 經過JMS鏈接建立一個JMS會話 * * createSession參數取值說明: * 第一個參數:爲true表示啓用事務 * 第二個參數:消息的確認模式: * AUTO_ACKNOWLEDGE 自動簽收 * CLIENT_ACKNOWLEDGE 客戶端自行調用 * ACKNOWLEDGE 方法簽收 * DUPS_OK_ACKNOWLEDGE 不是必須簽收 * 消息可能會重複發送 在第二次從新傳送消息的時候,消息頭的JmsDelivered會被置爲true標示當前消息已經傳送過一次, * 客戶端須要進行消息的重複處理控制。 */ session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE); //經過JMS會話建立一個JMS目的,即消息隊列 destination = session.createQueue("firstMSG"); //經過JMS會話和JMS目的建立一個JMS生產者,即消息生產者 messageProducer = session.createProducer(destination); //發送10條消息 for(int i=0;i<SENDNUM;i++){ //生成消息 String msg = "發送消息"+i+" "+System.currentTimeMillis(); TextMessage message = session.createTextMessage(msg); System.out.println("發送消息:"+msg); //發送消息 messageProducer.send(message); } //提交JMS會話 session.commit(); } catch (JMSException e) { e.printStackTrace(); }finally { if(connection!=null){ try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
package com.study.demo; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * * @Description: java原生ActiveMQ的API的使用-JMS消息消費者 * @author leeSmall * @date 2018年9月13日 * */ public class JmsConsumer { //默認鏈接用戶名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; //默認鏈接密碼 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; //默認鏈接地址 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; //編碼過程參考JMS對象模型的幾個要素 public static void main(String[] args) { //1.鏈接工廠:建立一個JMS鏈接 ConnectionFactory connectionFactory; //2.JMS鏈接:客戶端和服務器之間的一個鏈接 Connection connection = null; //3.JMS會話:客戶端和服務器會話的狀態,創建在JMS鏈接之上的 Session session; //4.JMS目的:消息隊列 Destination destination; //5.JMS消費者:接收和消費消息 MessageConsumer messageConsumer; //建立一個ActiveMQ的鏈接工廠 connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL); try { //經過鏈接工廠建立一個JMS鏈接 connection = connectionFactory.createConnection(); //開啓JMS鏈接 connection.start(); /* * 經過JMS鏈接建立一個JMS會話 * * createSession參數取值說明: * 第一個參數:爲true表示啓用事務 * 第二個參數:消息的確認模式: * AUTO_ACKNOWLEDGE 自動簽收 * CLIENT_ACKNOWLEDGE 客戶端自行調用 * ACKNOWLEDGE 方法簽收 * DUPS_OK_ACKNOWLEDGE 不是必須簽收 * 消息可能會重複發送 在第二次從新傳送消息的時候,消息頭的JmsDelivered會被置爲true標示當前消息已經傳送過一次, * 客戶端須要進行消息的重複處理控制。 */ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //經過JMS會話建立一個JMS目的,即消息隊列 destination = session.createQueue("firstMSG"); //經過JMS會話和JMS目的建立一個JMS消費者,即消息消費者 messageConsumer = session.createConsumer(destination); //讀取消息 while(true){ //使用receive方法消費一個消息,若是超過10s沒有獲得消息就跳過 TextMessage textMessage = (TextMessage)messageConsumer.receive(10000); if(textMessage != null){ System.out.println("Accept msg : "+textMessage.getText()); }else{ break; } } } catch (JMSException e) { e.printStackTrace(); } } }
說明:
能夠看到有10條消息待消費,0個消費者,10條消息入隊,0條消息出隊
說明:
能夠看到,此時有0條消息待消費,1個消費者,10條消息入隊,10條消息出隊
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.study.demo</groupId> <artifactId>ActiveMQProducer</artifactId> <packaging>war</packaging> <version>0.0.1-SNAPSHOT</version> <name>ActiveMQProducer Maven Webapp</name> <url>http://maven.apache.org</url> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>javax</groupId> <artifactId>javaee-web-api</artifactId> <version>7.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>4.3.11.RELEASE</version> </dependency> <dependency> <groupId>javax.servlet</groupId> <artifactId>jstl</artifactId> <version>1.2</version> </dependency> <!--日誌 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.16</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.0.13</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.0.13</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-access</artifactId> <version>1.0.13</version> </dependency> <!--JSON --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.9.6</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.6</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> <version>2.9.6</version> </dependency> <!-- xbean --> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>3.16</version> </dependency> <dependency> <groupId>com.thoughtworks.xstream</groupId> <artifactId>xstream</artifactId> <version>1.3.1</version> </dependency> <!--ActiveMq --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.8.0</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.3.11.RELEASE</version> </dependency> </dependencies> <build> <finalName>ActiveMQProducer</finalName> <resources> <resource> <directory>${basedir}/src/main/java</directory> <includes> <include>**/*.xml</include> </includes> </resource> </resources> </build> </project>
<?xml version="1.0" encoding="UTF-8"?> <!-- 查找最新的schemaLocation 訪問 http://www.springframework.org/schema/ --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd"> <!-- 配置掃描路徑 --> <context:component-scan base-package="com.study.demo"> <context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/> </context:component-scan> <!-- ActiveMQ 鏈接工廠 --> <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin" /> <!-- Spring Caching鏈接工廠 --> <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> <property name="sessionCacheSize" value="100"></property> </bean> <!-- Spring JmsTemplate 的消息生產者 start--> <!-- 定義JmsTemplate的Queue類型 --> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <constructor-arg ref="connectionFactory"></constructor-arg> <!-- 隊列模式--> <property name="pubSubDomain" value="false"></property> </bean> <!-- 定義JmsTemplate的Topic類型 --> <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> <constructor-arg ref="connectionFactory"></constructor-arg> <!-- 發佈訂閱模式--> <property name="pubSubDomain" value="true"></property> </bean> <!--Spring JmsTemplate 的消息生產者 end--> </beans>
說明:
要在Spring的配置文件中增長命名空間
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms=http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd"
package com.study.demo.mq.producer.queue; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Component; /** * * @Description: 隊列消息生產者,發送消息到隊列 * @author leeSmall * @date 2018年9月13日 * */ @Component("queueSender") public class QueueSender { @Autowired @Qualifier("jmsQueueTemplate") private JmsTemplate jmsTemplate; /*@Autowired private GetResponse getResponse;*/ public void send(String queueName,final String message){ jmsTemplate.send(queueName, new MessageCreator() { public Message createMessage(Session session) throws JMSException { Message msg = session.createTextMessage(message); return msg; } }); } }
package com.study.demo.mq.producer.topic; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; /** * * @Description: Topic生產者發送消息到Topic * @author leeSmall * @date 2018年9月13日 * */ @Component("topicSender") public class TopicSender { @Autowired @Qualifier("jmsTopicTemplate") private JmsTemplate jmsTemplate; public void send(String queueName,final String message){ jmsTemplate.send(queueName, new MessageCreator() { public Message createMessage(Session session) throws JMSException { Message msg = session.createTextMessage(message); return msg; } }); } }
package com.study.demo.controller; import com.study.demo.mq.producer.queue.QueueSender; import com.study.demo.mq.producer.topic.TopicSender; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import javax.annotation.Resource; /** * * @Description: controller測試 * @author leeSmall * @date 2018年9月13日 * */ @Controller public class ActivemqController { @Resource QueueSender queueSender; @Resource TopicSender topicSender; /** * 發送消息到隊列 * Queue隊列:僅有一個訂閱者會收到消息,消息一旦被處理就不會存在隊列中 * @param message * @return String */ @ResponseBody @RequestMapping("queueSender") public String queueSender(@RequestParam("message")String message){ String opt=""; try { queueSender.send("test.queue",message); opt = "suc"; } catch (Exception e) { opt = e.getCause().toString(); } return opt; } /** * 發送消息到主題 * Topic主題 :放入一個消息,全部訂閱者都會收到 * 這個是主題目的地是一對多的 * @param message * @return String */ @ResponseBody @RequestMapping("topicSender") public String topicSender(@RequestParam("message")String message){ String opt = ""; try { topicSender.send("test.topic",message); opt = "suc"; } catch (Exception e) { opt = e.getCause().toString(); } return opt; } }
<?xml version="1.0" encoding="UTF-8"?> <!-- 查找最新的schemaLocation 訪問 http://www.springframework.org/schema/ --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"> <!-- <mvc:default-servlet-handler />--> <mvc:resources mapping="/js/**" location="/js/"/> <mvc:annotation-driven content-negotiation-manager="contentNegotiationManager" /> <context:component-scan base-package="com.study.demo"> <context:include-filter type="annotation" expression="org.springframework.stereotype.Controller" /> </context:component-scan> <bean id="stringHttpMessageConverter" class="org.springframework.http.converter.StringHttpMessageConverter"> <property name="supportedMediaTypes"> <list> <bean class="org.springframework.http.MediaType"> <constructor-arg index="0" value="text" /> <constructor-arg index="1" value="plain" /> <constructor-arg index="2" value="UTF-8" /> </bean> </list> </property> </bean> <bean id="mappingJackson2HttpMessageConverter" class="org.springframework.http.converter.json.MappingJackson2HttpMessageConverter" /> <bean class="org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandlerAdapter"> <property name="messageConverters"> <list> <ref bean="stringHttpMessageConverter" /> <ref bean="mappingJackson2HttpMessageConverter" /> </list> </property> </bean> <bean id="contentNegotiationManager" class="org.springframework.web.accept.ContentNegotiationManagerFactoryBean"> <property name="mediaTypes"> <map> <entry key="html" value="text/html" /> <entry key="pdf" value="application/pdf" /> <entry key="xsl" value="application/vnd.ms-excel" /> <entry key="xml" value="application/xml" /> <entry key="json" value="application/json" /> </map> </property> <property name="defaultContentType" value="text/html" /> </bean> <bean id="viewResolver" class="org.springframework.web.servlet.view.ContentNegotiatingViewResolver"> <property name="order" value="0" /> <property name="contentNegotiationManager" ref="contentNegotiationManager" /> <property name="viewResolvers"> <list> <bean class="org.springframework.web.servlet.view.BeanNameViewResolver" /> <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver"> <property name="viewClass" value="org.springframework.web.servlet.view.JstlView" /> <property name="prefix" value="/WEB-INF/pages/" /> <property name="suffix" value=".jsp"></property> </bean> </list> </property> <property name="defaultViews"> <list> <bean class="org.springframework.web.servlet.view.json.MappingJackson2JsonView"> <property name="extractValueFromSingleKeyModel" value="true" /> </bean> </list> </property> </bean> </beans>
<%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%> <% String path = request.getContextPath(); System.out.println(path); String basePath = request.getScheme() + "://" + request.getServerName() + ":" + request.getServerPort() + path + "/"; System.out.println(basePath); %> <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"> <html> <head> <base href="<%=basePath%>"> <title>ActiveMQ Demo程序</title> <meta http-equiv="pragma" content="no-cache"> <meta http-equiv="cache-control" content="no-cache"> <meta http-equiv="expires" content="0"> <script type="text/javascript" src="<%=basePath%>js/jquery-1.11.0.min.js"></script> <style type="text/css"> .h1 { margin: 0 auto; } #producer{ width: 48%; border: 1px solid blue; height: 80%; align:center; margin:0 auto; } body{ text-align :center; } div { text-align :center; } textarea{ width:80%; height:100px; border:1px solid gray; } button{ background-color: rgb(62, 156, 66); border: none; font-weight: bold; color: white; height:30px; } </style> <script type="text/javascript"> function send(controller){ if($("#message").val()==""){ $("#message").css("border","1px solid red"); return; }else{ $("#message").css("border","1px solid gray"); } $.ajax({ type: 'post', url:'<%=basePath%>/'+controller, dataType:'text', data:{"message":$("#message").val()}, success:function(data){ if(data=="suc"){ $("#status").html("<font color=green>發送成功</font>"); setTimeout(clear,1000); }else{ $("#status").html("<font color=red>"+data+"</font>"); setTimeout(clear,5000); } }, error:function(data){ $("#status").html("<font color=red>ERROR:"+data["status"]+","+data["statusText"]+"</font>"); setTimeout(clear,5000); } }); } function clear(){ $("#status").html(""); } </script> </head> <body> <h1>Hello ActiveMQ</h1> <div id="producer"> <h2>Producer</h2> <textarea id="message"></textarea> <br> <button onclick="send('queueSender')">發送Queue消息</button> <button onclick="send('topicSender')">發送Topic消息</button> <br> <span id="status"></span> </div> </body> </html>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.study.demo</groupId> <artifactId>ActiveMQConsumer</artifactId> <packaging>war</packaging> <version>0.0.1-SNAPSHOT</version> <name>ActiveMQConsumer Maven Webapp</name> <url>http://maven.apache.org</url> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>javax</groupId> <artifactId>javaee-web-api</artifactId> <version>7.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>4.3.11.RELEASE</version> </dependency> <dependency> <groupId>javax.servlet</groupId> <artifactId>jstl</artifactId> <version>1.2</version> </dependency> <!--日誌 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.16</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.0.13</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.0.13</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-access</artifactId> <version>1.0.13</version> </dependency> <!--JSON --> <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-mapper-asl</artifactId> <version>1.9.9</version><!-- 1.9.13 --> </dependency> <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-core-asl</artifactId> <version>1.9.9</version><!-- 1.9.13 --> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.9.6</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.6</version> </dependency> <!-- xbean --> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>3.16</version> </dependency> <dependency> <groupId>com.thoughtworks.xstream</groupId> <artifactId>xstream</artifactId> <version>1.3.1</version> </dependency> <!--ActiveMq --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.8.0</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.3.11.RELEASE</version> </dependency> </dependencies> <build> <finalName>ActiveMQConsumer</finalName> <resources> <resource> <directory>${basedir}/src/main/java</directory> <includes> <include>**/*.xml</include> </includes> </resource> </resources> </build> </project>
2.12 新建一個/ActiveMQConsumer/src/main/java/applicationContext.xml文件,並在裏面添加Spring的ActiveMQ相關配置
<?xml version="1.0" encoding="UTF-8"?> <!-- 查找最新的schemaLocation 訪問 http://www.springframework.org/schema/ --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd"> <!-- 配置掃描路徑 --> <context:component-scan base-package="com.study.demo"> <context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/> </context:component-scan> <!-- ActiveMQ 鏈接工廠 --> <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin" /> <!-- Spring Caching鏈接工廠 --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> <property name="sessionCacheSize" value="100" /> </bean> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <constructor-arg ref="connectionFactory"></constructor-arg> <!-- 隊列模式--> <property name="pubSubDomain" value="false"></property> </bean> </beans>
隊列消息監聽器1:
package com.study.demo.mq.consumer.queue; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import org.springframework.stereotype.Component; /** * * @Description: 隊列消息監聽器 * @author leeSmall * @date 2018年9月13日 * */ @Component public class QueueReceiver1 implements MessageListener { public void onMessage(Message message) { try { String textMsg = ((TextMessage)message).getText(); System.out.println("QueueReceiver1 accept msg : "+textMsg); } catch (JMSException e) { e.printStackTrace(); } } }
隊列消息監聽器2:
package com.study.demo.mq.consumer.queue; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * * @Description: 隊列消息監聽器 * @author leeSmall * @date 2018年9月13日 * */ @Component public class QueueReceiver2 implements MessageListener { public void onMessage(Message message) { try { String textMsg = ((TextMessage)message).getText(); System.out.println("QueueReceiver2 accept msg : "+textMsg); } catch (JMSException e) { e.printStackTrace(); } } }
applicationContext.xml配置:
<!-- 定義Queue監聽器 --> <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> <jms:listener destination="test.queue" ref="queueReceiver1"></jms:listener> <jms:listener destination="test.queue" ref="queueReceiver2"></jms:listener> </jms:listener-container>
Topic消息監聽器1:
package com.study.demo.mq.consumer.topic; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * * @Description: Topic消息監聽器 * @author leeSmall * @date 2018年9月13日 * */ @Component public class TopicReceiver1 implements MessageListener { public void onMessage(Message message) { try { String textMsg = ((TextMessage)message).getText(); System.out.println("TopicReceiver1 accept msg : "+textMsg); } catch (JMSException e) { e.printStackTrace(); } } }
Topic消息監聽器2:
package com.study.demo.mq.consumer.topic; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * * @Description: Topic消息監聽器 * @author leeSmall * @date 2018年9月13日 * */ @Component public class TopicReceiver2 implements MessageListener { public void onMessage(Message message) { try { String textMsg = ((TextMessage)message).getText(); System.out.println("TopicReceiver2 accept msg : "+textMsg); } catch (JMSException e) { e.printStackTrace(); } } }
applicationContext.xml配置:
<!-- 定義Topic監聽器 --> <jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto"> <jms:listener destination="test.topic" ref="topicReceiver1"></jms:listener> <jms:listener destination="test.topic" ref="topicReceiver2"></jms:listener> </jms:listener-container>
<?xml version="1.0" encoding="UTF-8"?> <!-- 查找最新的schemaLocation 訪問 http://www.springframework.org/schema/ --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"> <!-- <mvc:default-servlet-handler />--> <mvc:resources mapping="/js/**" location="/js/"/> <mvc:annotation-driven content-negotiation-manager="contentNegotiationManager" /> <context:component-scan base-package="com.study.demo"> <context:include-filter type="annotation" expression="org.springframework.stereotype.Controller" /> </context:component-scan> <bean id="stringHttpMessageConverter" class="org.springframework.http.converter.StringHttpMessageConverter"> <property name="supportedMediaTypes"> <list> <bean class="org.springframework.http.MediaType"> <constructor-arg index="0" value="text" /> <constructor-arg index="1" value="plain" /> <constructor-arg index="2" value="UTF-8" /> </bean> </list> </property> </bean> <bean id="mappingJacksonHttpMessageConverter" class="org.springframework.http.converter.json.MappingJackson2HttpMessageConverter" /> <bean class="org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandlerAdapter"> <property name="messageConverters"> <list> <ref bean="stringHttpMessageConverter" /> <ref bean="mappingJacksonHttpMessageConverter" /> </list> </property> </bean> <bean id="contentNegotiationManager" class="org.springframework.web.accept.ContentNegotiationManagerFactoryBean"> <property name="mediaTypes"> <map> <entry key="html" value="text/html" /> <entry key="pdf" value="application/pdf" /> <entry key="xsl" value="application/vnd.ms-excel" /> <entry key="xml" value="application/xml" /> <entry key="json" value="application/json" /> </map> </property> <property name="defaultContentType" value="text/html" /> </bean> <bean id="viewResolver" class="org.springframework.web.servlet.view.ContentNegotiatingViewResolver"> <property name="order" value="0" /> <property name="contentNegotiationManager" ref="contentNegotiationManager" /> <property name="viewResolvers"> <list> <bean class="org.springframework.web.servlet.view.BeanNameViewResolver" /> <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver"> <property name="viewClass" value="org.springframework.web.servlet.view.JstlView" /> <property name="prefix" value="/WEB-INF/pages/" /> <property name="suffix" value=".jsp"></property> </bean> </list> </property> <property name="defaultViews"> <list> <bean class="org.springframework.web.servlet.view.json.MappingJackson2JsonView"> <property name="extractValueFromSingleKeyModel" value="true" /> </bean> </list> </property> </bean> </beans>
在生產者的消息發送頁面分別發送隊列消息和topic消息查看效果
發送隊列消息:
查看消費者的狀態:
發送topic消息
查看消費者的狀態:
參考文章: