經過上一篇文章 《消息隊列深刻解析》,咱們已經消息隊列是什麼、使用消息隊列的好處以及常見消息隊列的簡單介紹。java
這一篇文章,主要帶你們詳細瞭解一下消息隊列ActiveMQ的使用。面試
學習消息隊列ActiveMQ的使用以前,咱們先來搞清JMS。spring
JMS(JAVA Message Service,java消息服務)是java的消息服務,JMS的客戶端之間能夠經過JMS服務進行異步的消息傳輸。JMS(JAVA Message Service,java消息服務)API是一個消息服務的標準或者說是規範,容許應用程序組件基於JavaEE平臺建立、發送、接收和讀取消息。它使分佈式通訊耦合度更低,消息服務更加可靠以及異步性。apache
JMS定義了五種不一樣的消息正文格式,以及調用的消息類型,容許你發送並接收以一些不一樣形式的數據,提供現有消息格式的一些級別的兼容性。服務器
使用隊列(Queue)做爲消息通訊載體;知足生產者與消費者模式,一條消息只能被一個消費者使用,未被消費的消息在隊列中保留直到被消費或超時。好比:咱們生產者發送100條消息的話,兩個消費者來消費通常狀況下兩個消費者會按照消息發送的順序各自消費一半(也就是你一個我一個的消費。)後面咱們會經過代碼演示來驗證。微信
發佈訂閱模型(Pub/Sub) 使用主題(Topic)做爲消息通訊載體,相似於廣播模式;發佈者發佈一條消息,該消息經過主題傳遞給全部的訂閱者,在一條消息廣播以後才訂閱的用戶則是收不到該條消息的。session
參考:https://blog.csdn.net/shaobin...app
ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已是好久的事情了,可是JMS在當今的J2EE應用中間仍然扮演着特殊的地位。
安裝過程很簡單這裏就不貼安裝過程了,能夠自行google.異步
添加Maven依賴socket
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.3</version> </dependency>
生產者發送消息測試方法:
@Test public void testQueueProducer() throws Exception { // 一、建立一個鏈接工廠對象,須要指定服務的ip及端口。 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.155:61616"); // 二、使用工廠對象建立一個Connection對象。 Connection connection = connectionFactory.createConnection(); // 三、開啓鏈接,調用Connection對象的start方法。 connection.start(); // 四、建立一個Session對象。 // 第一個參數:是否開啓事務。若是true開啓事務,第二個參數無心義。通常不開啓事務false。 // 第二個參數:應答模式。自動應答或者手動應答。通常自動應答。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 五、使用Session對象建立一個Destination對象。兩種形式queue、topic,如今應該使用queue Queue queue = session.createQueue("test-queue"); // 六、使用Session對象建立一個Producer對象。 MessageProducer producer = session.createProducer(queue); // 七、建立一個Message對象,可使用TextMessage。 for (int i = 0; i < 50; i++) { TextMessage textMessage = session.createTextMessage("第"+i+ "一個ActiveMQ隊列目的地的消息"); // 八、發送消息 producer.send(textMessage); } // 九、關閉資源 producer.close(); session.close(); connection.close(); }
消費者消費消息測試方法
@Test public void testQueueConsumer() throws Exception { // 建立一個ConnectionFactory對象鏈接MQ服務器 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.155:61616"); // 建立一個鏈接對象 Connection connection = connectionFactory.createConnection(); // 開啓鏈接 connection.start(); // 使用Connection對象建立一個Session對象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 建立一個Destination對象。queue對象 Queue queue = session.createQueue("test-queue"); // 使用Session對象建立一個消費者對象。 MessageConsumer consumer = session.createConsumer(queue); // 接收消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { // 打印結果 TextMessage textMessage = (TextMessage) message; String text; try { text = textMessage.getText(); System.out.println("這是接收到的消息:" + text); } catch (JMSException e) { e.printStackTrace(); } } }); // 等待接收消息 System.in.read(); // 關閉資源 consumer.close(); session.close(); connection.close(); }
咱們開啓兩個消費者進程來監聽(運行兩次testQueueConsumer()方法)。
而後咱們運行運行生產者測試方法發送消息.先發送消息仍是先監聽消息通常不會不影響。
效果以下:
兩個消費者各自消費一半消息,並且仍是按照消息發送到消息隊列的順序,這也驗證了咱們上面的說法。
第一個消費者
第二個消費者
生產者發送消息測試方法:
@Test public void testTopicProducer() throws Exception { // 一、建立一個鏈接工廠對象,須要指定服務的ip及端口。 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.155:61616"); // 二、使用工廠對象建立一個Connection對象。 Connection connection = connectionFactory.createConnection(); // 三、開啓鏈接,調用Connection對象的start方法。 connection.start(); // 四、建立一個Session對象。 // 第一個參數:是否開啓事務。若是true開啓事務,第二個參數無心義。通常不開啓事務false。 // 第二個參數:應答模式。自動應答或者手動應答。通常自動應答。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 五、使用Session對象建立一個Destination對象。兩種形式queue、topic,如今應該使用topic Topic topic = session.createTopic("test-topic"); // 六、使用Session對象建立一個Producer對象。 MessageProducer producer = session.createProducer(topic); // 七、建立一個Message對象,可使用TextMessage。 for (int i = 0; i < 50; i++) { TextMessage textMessage = session.createTextMessage("第"+i+ "一個ActiveMQ隊列目的地的消息"); // 八、發送消息 producer.send(textMessage); } // 九、關閉資源 producer.close(); session.close(); connection.close(); }
消費者消費消息測試方法:
@Test public void testTopicConsumer() throws Exception { // 建立一個ConnectionFactory對象鏈接MQ服務器 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.155:61616"); // 建立一個鏈接對象 Connection connection = connectionFactory.createConnection(); // 開啓鏈接 connection.start(); // 使用Connection對象建立一個Session對象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 建立一個Destination對象。topic對象 Topic topic = session.createTopic("test-topic"); // 使用Session對象建立一個消費者對象。 MessageConsumer consumer = session.createConsumer(topic); // 接收消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { // 打印結果 TextMessage textMessage = (TextMessage) message; String text; try { text = textMessage.getText(); System.out.println("這是接收到的消息:" + text); } catch (JMSException e) { e.printStackTrace(); } } }); System.out.println("topic消費者啓動。。。。"); // 等待接收消息 System.in.read(); // 關閉資源 consumer.close(); session.close(); connection.close(); }
先運行兩個消費者進程(提早訂閱,否則收不到發送的消息),而後運行生產者測試方法發送消息。
結果是:
兩個消費者進程均可以接收到生產者發送過來的全部消息,我這裏就不貼圖片了,
這樣驗證了咱們上面的說法。
咱們從上面代碼就能夠看出,點對點通訊和發佈訂閱通訊模式的區別就是建立生產者和消費者對象時提供的Destination對象不一樣,若是是點對點通訊建立的Destination對象是Queue,發佈訂閱通訊模式通訊則是Topic。
整合spring除了咱們上面依賴的Jar包還要依賴
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.2.7.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> <version>4.2.7.RELEASE</version> </dependency>
好比咱們在咱們的系統中如今有兩個服務,第一個服務發送消息,第二個服務接收消息,咱們下面看看這是如何實現的。
發送消息的配置文件:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd"> <!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.25.155:61616" /> </bean> <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> <!-- 配置生產者 --> <!-- Spring提供的JMS工具類,它能夠進行消息發送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 這個connectionFactory對應的是咱們定義的Spring提供的那個ConnectionFactory對象 --> <property name="connectionFactory" ref="connectionFactory" /> </bean> <!--這個是隊列目的地,點對點的 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>spring-queue</value> </constructor-arg> </bean> <!--這個是主題目的地,一對多的 --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic" /> </bean> </beans>
發送消息的測試方法:
@Test public void testSpringActiveMq() throws Exception { //初始化spring容器 ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml"); //從spring容器中得到JmsTemplate對象 JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class); //從spring容器中取Destination對象 Destination destination = (Destination) applicationContext.getBean("queueDestination"); //使用JmsTemplate對象發送消息。 jmsTemplate.send(destination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { //建立一個消息對象並返回 TextMessage textMessage = session.createTextMessage("spring activemq queue message"); return textMessage; } }); }
咱們上面直接ApplicationContext的getBean方法獲取的對象,實際在項目使用依賴注入便可。
建立一個MessageListener的實現類。
public class MyMessageListener implements MessageListener { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; //取消息內容 String text = textMessage.getText(); System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }
接收消息的配置文件:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd"> <!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.25.168:61616" /> </bean> <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> <!--這個是隊列目的地,點對點的 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>spring-queue</value> </constructor-arg> </bean> <!--這個是主題目的地,一對多的 --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic" /> </bean> <!-- 接收消息 --> <!-- 配置監聽器 --> <bean id="myMessageListener" class="cn.e3mall.search.listener.MyMessageListener" /> <!-- 消息監聽容器 --> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="myMessageListener" /> </bean> </beans>
測試接收消息的代碼
@Test public void testQueueConsumer() throws Exception { //初始化spring容器 ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml"); //等待 System.in.read(); }
歡迎關注個人微信公衆號:"Java面試通關手冊"(一個有溫度的微信公衆號,期待與你共同進步~~~堅持原創,分享美文,分享各類Java學習資源):。