前段時間配置了rabbitMQ 主要是爲了公司發送大批量郵件而服務,結果錦鯉說要用actionMQ,還能說什麼,確定是actionMq的走起了。html
鑑於第一次配置、使用ActionMQ,因此進行詳細的配置,記錄。若是有最新的用法,或者跟詳細、更深刻的使用,我會不按期的更新,java
期待和小夥伴們共同進步。spring
添加MAVEN依賴apache
<!--添加activemq的依賴-->
<!--ActiveMQ-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
<version>5.14.5</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.14.5</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-spring</artifactId>
<version>5.14.5</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jms-client</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${org.springframework.version}</version>
</dependency>
這裏有的小夥伴可能依賴了服務器
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${org.springframework.version}</version>
</dependency>
畢竟一個頂的上上面的一大堆了,可出問題的時候也是來的猝不及防的。slf4j 衝突。大部分的程序中都會有slf4j,若是你報錯了,就乖乖用上面的一大堆把。或者能夠考慮把衝突的包從Maven程序中排除。具體怎麼排除,能夠百度了。session
1 <?xml version="1.0" encoding="UTF-8" ?> 2 3 <beans xmlns="http://www.springframework.org/schema/beans" 4 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 5 xmlns:context="http://www.springframework.org/schema/context" 6 xsi:schemaLocation="http://www.springframework.org/schema/beans 7 http://www.springframework.org/schema/beans/spring-beans.xsd 8 http://www.springframework.org/schema/context 9 http://www.springframework.org/schema/context/spring-context.xsd "> 10 11 <context:annotation-config /> 12 13 <!--Activemq的鏈接工廠--> 14 <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 15 <property name="brokerURL" value="tcp://127.0.0.1:61616" /> 16 <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" /> <!-- 引用重發機制 --> 17 </bean> 18 <!--spring jms爲咱們提供的鏈接池 獲取一個鏈接工廠--> 19 <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> 20 <property name="targetConnectionFactory" ref="targetConnectionFactory" /> 21 </bean> 22 23 <!-- 消息目的地 點對點的模式--> 24 <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> 25 <constructor-arg value="SpringActiveMQMsg"/> 26 </bean> 27 <!-- 消息目的地 點對點的模式--> 28 <bean id="queueDestinationTwo" class="org.apache.activemq.command.ActiveMQQueue"> 29 <constructor-arg value="EmailMQMsg"/> 30 </bean> 31 <!-- jms模板 用於進行消息發送--> 32 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 33 <property name="connectionFactory" ref="connectionFactory"/> 34 </bean> 35 <!--注入咱們的生產者--> 36 <bean class="mq.ProduceServiceImpl"/> 37 38 39 <!-- 配置消息監聽器--> 40 <bean id="SimpleMsgListener" class="mq.SimpleMsgListener"/> 41 <!--配置消息容器--> 42 <bean id ="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 43 <!--配置鏈接工廠--> 44 <property name="connectionFactory" ref="connectionFactory"/> 45 <!--配置監聽的隊列--> 46 <property name="destination" ref="queueDestination"/> 47 <!--配置消息監聽器--> 48 <property name="messageListener" ref="SimpleMsgListener"/> 49 <!--應答模式--> 50 <property name="sessionAcknowledgeMode" value="4"></property> 51 </bean> 52 53 54 <!-- 定義ReDelivery(重發機制)機制 ,重發時間間隔是100毫秒,最大重發次數是3次 http://www.kuqin.com/shuoit/20140419/339344.html --> 55 <bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"> 56 <!--是否在每次嘗試從新發送失敗後,增加這個等待時間 --> 57 <property name="useExponentialBackOff" value="true"></property> 58 <!--重發次數,默認爲6次 這裏設置爲1次 --> 59 <property name="maximumRedeliveries" value="2"></property> 60 <!--重發時間間隔,默認爲1秒 --> 61 <property name="initialRedeliveryDelay" value="1000"></property> 62 <!--第一次失敗後從新發送以前等待500毫秒,第二次失敗再等待500 * 2毫秒,這裏的2就是value --> 63 <property name="backOffMultiplier" value="2"></property> 64 <!--最大傳送延遲,只在useExponentialBackOff爲true時有效(V5.5),假設首次重連間隔爲10ms,倍數爲2,那麼第 65 二次重連時間間隔爲 20ms,第三次重連時間間隔爲40ms,當重連時間間隔大的最大重連時間間隔時,之後每次重連時間間隔都爲最大重連時間間隔。 --> 66 <property name="maximumRedeliveryDelay" value="1000"></property> 67 </bean> 68 </beans>
配置的第一步確定是連接到MQ服務了。(ActionMQ 連接工廠能夠添加的屬性還有不少,例如 username、password ...)tcp
第二步:配置spring jms爲咱們提供的鏈接池ide
第三步:配置隊列(個人是隊列模式,也就是 1對1消費模式(一條消息消費一次),還有一個發佈訂閱的模式(一條消息能夠被訂閱的多個客戶端消費))spa
第四步:配置模板code
第五步:聲明注入消息發佈者(這個不必,正常可用的類就能夠,在其中引入 @Autowired private JmsTemplate jmsTemplate; 模板類,就能夠發送消息)
發送消息的類
1 package mq; 2 import org.springframework.beans.factory.annotation.Autowired; 3 import org.springframework.jms.core.JmsTemplate; 4 import org.springframework.jms.core.MessageCreator; 5 import javax.annotation.Resource; 6 import javax.jms.*; 7 8 /** 9 * @ Author :tian 10 * @ Date :Created in 下午 2:21 2019/6/13 0015 11 * @ Description:生產者的實現類 12 */ 13 public class ProduceServiceImpl{ 14 @Autowired 15 private JmsTemplate jmsTemplate; 16 @Resource(name = "queueDestination") 17 private Destination destination; 18 19 /** 20 * 發送消息 21 * @param msg 22 */ 23 public void sendMessage(final String msg) { 24 jmsTemplate.convertAndSend("SpringActiveMQMsg",msg); 25 // jmsTemplate.send(destination , new MessageCreator() { 26 // @Override 27 // public Message createMessage(Session session) throws JMSException { 28 // TextMessage textMessage = session.createTextMessage(msg); 29 // return textMessage; 30 // } 31 // }); 32 System.out.println("如今發送的消息爲: " + msg); 33 } 34 35 /** 36 * 接收消息 37 */ 38 public TextMessage receive() { 39 TextMessage tm = (TextMessage) jmsTemplate.receive("SpringActiveMQMsg"); 40 try { 41 System.out.println("從隊列SpringActiveMQMsg收到了消息:\t" 42 + tm.getText()); 43 } catch (JMSException e) { 44 e.printStackTrace(); 45 } 46 47 return tm; 48 49 } 50 }
發送消息能夠用以上兩種方式(不僅這兩種,但這兩個最簡單。)
方式1:
jmsTemplate.convertAndSend("SpringActiveMQMsg",msg) // 參數一:你聲明的隊列的名稱 (隊列能夠不聲明,直接到ActionMQ 的控制檯建立也能夠,簡化配置,只不過要換服務器的時候還得建立一次。) 參數二:發送的消息
方式2:
上面程序中註釋的部分
jmsTemplate.send(destination , new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage(msg); return textMessage; } });
它是能夠正確發送消息的 這裏能夠看到 發送方法也有兩個參數 (參數一:聲明隊列的beanID 參數二:發送的消息建立者)
在這裏能夠看到有一個 TextMessage 類, 這個是消息的一種類型,actionMQ中消息有多種類型 JMS規範中的消息類型包括TextMessage、MapMessage、ObjectMessage、BytesMessage、和StreamMessage等五種。ActiveMQ也有對應的實現
詳細的狀況參見博文:http://www.javashuo.com/article/p-uuamnkrr-db.html
順便介紹一下 消息的手動接收 TextMessage tm = (TextMessage) jmsTemplate.receive("SpringActiveMQMsg"); 使用jmsTemplate 模板 的receive("隊列名稱") ,能夠一次消費一條消息。(確定不止這一個方法了,具體的小夥伴們能夠自行摸索)
消息的消費類型對應發送類型。若是想要消息自動接收消費,那就得配置消息監聽器了。下面貼上消費者的代碼
package mq; import javax.jms.*; /** * Created by Martin Huang on 2018/4/20. */ //bean id public class SimpleMsgListener implements MessageListener { //收到信息時的動做 @Override public void onMessage(Message message ) { TextMessage textMessage = (TextMessage) message; try { System.out.println("收到的信息:" + textMessage.getText()); textMessage.acknowledge(); } catch (JMSException e) { e.printStackTrace(); } } }
這個類必定要實現 MessageListener 的(固然你能夠實現其餘的,千萬別被我誤導了)重載 方法 onMessage(Message message ) 就是消費的方法,這裏我進行了消息的手動確認(默認是自動確認的,那樣消息不成功也會出隊,)消息不成功就不會出隊。
其實這樣作也很差。趕時間作ActionMQ,只能先把問題留下,之後再解決。若是有解決確認問題的好方法,歡迎留言評論,感激涕零...