MQ學習系列:html
activeMQ啓動錯誤 BeanFactory not initializedjava
在session接口中定義的幾個常量:apache
消息消費端在建立Session對象時須要指定應答模式爲客戶端手動應答,當消費者獲取到消息併成功處理後須要調用message.acknowledge()方法進行應答,通知Broker消費成功。若是處理過程當中出現異常,須要調用session.recover()通知Broker重複消息,默認最多重複6次。windows
<dependencies> <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-client --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.15.8</version> </dependency> <!-- https://mvnrepository.com/artifact/junit/junit --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> </dependencies>
package org.newmean; import org.apache.activemq.ActiveMQConnectionFactory; import org.junit.Test; import javax.jms.*; public class ActiveMQTest { //消息發送方-producter @Test public void test1() throws JMSException { //建立鏈接工廠對象 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); //從工廠中獲取一個鏈接對象 Connection connection = connectionFactory.createConnection(); //鏈接MQ服務 connection.start(); //獲取session對象 //參數說明 b 是否使用事務 i jms消息確認機制 1 2 3 0 用常量表示 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //經過session建立Topic Topic topic = session.createTopic("TestTopic"); //經過session建立消息發送者 MessageProducer producer = session.createProducer(topic); //經過session建立消息對象 TextMessage message = session.createTextMessage("hello"); //發送消息 producer.send(message); //關閉資源 producer.close(); session.close(); connection.close(); } //消息接收方-consumer @Test public void test2() throws JMSException { //建立鏈接工廠對象 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); //從工廠中獲取一個鏈接對象 Connection connection = connectionFactory.createConnection(); //鏈接MQ服務 connection.start(); //獲取session對象 //參數說明 b 是否使用事務 i jms消息確認機制 1 2 3 0 用常量表示 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //經過session建立Topic Topic topic = session.createTopic("TestTopic"); //經過session建立消費者 MessageConsumer consumer = session.createConsumer(topic); //指定消息監聽器 consumer.setMessageListener(new MessageListener() { //當咱們監聽的topic中存在消息,onMessage這個方法就會自動運行 public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("消費者接收到了消息:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); //由於要接收消息不能關閉,同時線程不能死掉 while (true){ } } }
先啓動test2方法發起訂閱「TestTopic」消息,而後啓動test1方法,這時消費者收到了消息。session
消息重發模擬maven
咱們只須要更消息接收方的代碼,改動以下:tcp
//消息接收方-consumer @Test public void test2() throws JMSException { //建立鏈接工廠對象 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); //從工廠中獲取一個鏈接對象 Connection connection = connectionFactory.createConnection(); //鏈接MQ服務 connection.start(); //獲取session對象 //參數說明 b 是否使用事務 i jms消息確認機制 1 2 3 0 用常量表示 final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); //經過session建立Topic Topic topic = session.createTopic("TestTopic"); //經過session建立消費者 MessageConsumer consumer = session.createConsumer(topic); //指定消息監聽器 consumer.setMessageListener(new MessageListener() { //當咱們監聽的topic中存在消息,onMessage這個方法就會自動運行 public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { if(textMessage.getText().equals("nihao")){ System.out.println("消費者接收到了消息:"+textMessage.getText()); message.acknowledge(); }else { System.out.println("消息處理失敗了.."); session.recover(); } } catch (JMSException e) { e.printStackTrace(); } } }); //由於要接收消息不能關閉,同時線程不能死掉 while (true){ } }
先啓動test2方法發起訂閱「TestTopic」消息,而後啓動test1方法,這時消費者就會調用session.recover()方法讓消息發佈者重發消息默認6次,咱們可以看到7條(第一次+重發六次)「消息處理失敗了..」輸出。ide