spring 整合 actionMQ(一)

前段時間配置了rabbitMQ 主要是爲了公司發送大批量郵件而服務,結果錦鯉說要用actionMQ,還能說什麼,確定是actionMq的走起了。html

鑑於第一次配置、使用ActionMQ,因此進行詳細的配置,記錄。若是有最新的用法,或者跟詳細、更深刻的使用,我會不按期的更新,java

期待和小夥伴們共同進步。spring

添加MAVEN依賴apache

<!--添加activemq的依賴-->
<!--ActiveMQ-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
<version>5.14.5</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.14.5</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-spring</artifactId>
<version>5.14.5</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jms-client</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${org.springframework.version}</version>
</dependency>

  這裏有的小夥伴可能依賴了服務器

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${org.springframework.version}</version>
</dependency>
 

畢竟一個頂的上上面的一大堆了,可出問題的時候也是來的猝不及防的。slf4j 衝突。大部分的程序中都會有slf4j,若是你報錯了,就乖乖用上面的一大堆把。或者能夠考慮把衝突的包從Maven程序中排除。具體怎麼排除,能夠百度了。session

 

配置文件  (對於我我的來講,第一步,就是搞定配置文件,類什麼的在配置的過程當中再去書寫)

 

 1 <?xml version="1.0" encoding="UTF-8" ?>
 2 
 3 <beans xmlns="http://www.springframework.org/schema/beans"
 4        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 5        xmlns:context="http://www.springframework.org/schema/context"
 6        xsi:schemaLocation="http://www.springframework.org/schema/beans
 7        http://www.springframework.org/schema/beans/spring-beans.xsd
 8        http://www.springframework.org/schema/context
 9        http://www.springframework.org/schema/context/spring-context.xsd ">
10 
11     <context:annotation-config />
12 
13     <!--Activemq的鏈接工廠-->
14     <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
15         <property name="brokerURL" value="tcp://127.0.0.1:61616" />
16         <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" />  <!-- 引用重發機制 -->
17     </bean>
18     <!--spring jms爲咱們提供的鏈接池 獲取一個鏈接工廠-->
19     <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
20         <property name="targetConnectionFactory" ref="targetConnectionFactory" />
21     </bean>
22 
23     <!-- 消息目的地  點對點的模式-->
24     <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
25         <constructor-arg value="SpringActiveMQMsg"/>
26     </bean>
27     <!-- 消息目的地  點對點的模式-->
28     <bean id="queueDestinationTwo" class="org.apache.activemq.command.ActiveMQQueue">
29         <constructor-arg value="EmailMQMsg"/>
30     </bean>
31     <!-- jms模板  用於進行消息發送-->
32     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
33         <property name="connectionFactory" ref="connectionFactory"/>
34     </bean>
35     <!--注入咱們的生產者-->
36     <bean class="mq.ProduceServiceImpl"/>
37 
38 
39     <!-- 配置消息監聽器-->
40     <bean id="SimpleMsgListener" class="mq.SimpleMsgListener"/>
41     <!--配置消息容器-->
42     <bean id ="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
43         <!--配置鏈接工廠-->
44         <property name="connectionFactory" ref="connectionFactory"/>
45         <!--配置監聽的隊列-->
46         <property name="destination" ref="queueDestination"/>
47         <!--配置消息監聽器-->
48         <property name="messageListener" ref="SimpleMsgListener"/>
49         <!--應答模式-->
50         <property name="sessionAcknowledgeMode" value="4"></property>
51     </bean>
52 
53 
54     <!-- 定義ReDelivery(重發機制)機制 ,重發時間間隔是100毫秒,最大重發次數是3次 http://www.kuqin.com/shuoit/20140419/339344.html -->
55     <bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
56         <!--是否在每次嘗試從新發送失敗後,增加這個等待時間 -->
57         <property name="useExponentialBackOff" value="true"></property>
58         <!--重發次數,默認爲6次   這裏設置爲1次 -->
59         <property name="maximumRedeliveries" value="2"></property>
60         <!--重發時間間隔,默認爲1秒 -->
61         <property name="initialRedeliveryDelay" value="1000"></property>
62         <!--第一次失敗後從新發送以前等待500毫秒,第二次失敗再等待500 * 2毫秒,這裏的2就是value -->
63         <property name="backOffMultiplier" value="2"></property>
64         <!--最大傳送延遲,只在useExponentialBackOff爲true時有效(V5.5),假設首次重連間隔爲10ms,倍數爲2,那麼第
65             二次重連時間間隔爲 20ms,第三次重連時間間隔爲40ms,當重連時間間隔大的最大重連時間間隔時,之後每次重連時間間隔都爲最大重連時間間隔。 -->
66         <property name="maximumRedeliveryDelay" value="1000"></property>
67     </bean>
68 </beans>

  配置的第一步確定是連接到MQ服務了。(ActionMQ 連接工廠能夠添加的屬性還有不少,例如 username、password ...)tcp

  第二步:配置spring jms爲咱們提供的鏈接池ide

  第三步:配置隊列(個人是隊列模式,也就是 1對1消費模式(一條消息消費一次),還有一個發佈訂閱的模式(一條消息能夠被訂閱的多個客戶端消費))spa

  第四步:配置模板code

  第五步:聲明注入消息發佈者(這個不必,正常可用的類就能夠,在其中引入 @Autowired private JmsTemplate jmsTemplate; 模板類,就能夠發送消息

發送消息的類

 1 package mq;
 2 import org.springframework.beans.factory.annotation.Autowired;
 3 import org.springframework.jms.core.JmsTemplate;
 4 import org.springframework.jms.core.MessageCreator;
 5 import javax.annotation.Resource;
 6 import javax.jms.*;
 7 
 8 /**
 9  * @ Author     :tian
10  * @ Date       :Created in 下午 2:21 2019/6/13 0015
11  * @ Description:生產者的實現類
12  */
13 public class ProduceServiceImpl{
14     @Autowired
15     private JmsTemplate jmsTemplate;
16     @Resource(name = "queueDestination")
17     private Destination destination;
18 
19     /**
20      * 發送消息
21      * @param msg
22      */
23     public void sendMessage(final String msg) {
24         jmsTemplate.convertAndSend("SpringActiveMQMsg",msg);
25 //        jmsTemplate.send(destination , new MessageCreator() {
26 //            @Override
27 //            public Message createMessage(Session session) throws JMSException {
28 //                TextMessage textMessage = session.createTextMessage(msg);
29 //                return textMessage;
30 //            }
31 //        });
32         System.out.println("如今發送的消息爲: " + msg);
33     }
34 
35     /**
36      * 接收消息
37      */
38     public TextMessage receive() {
39         TextMessage tm = (TextMessage) jmsTemplate.receive("SpringActiveMQMsg");
40         try {
41             System.out.println("從隊列SpringActiveMQMsg收到了消息:\t"
42                     + tm.getText());
43         } catch (JMSException e) {
44             e.printStackTrace();
45         }
46 
47         return tm;
48 
49     }
50 }

發送消息能夠用以上兩種方式(不僅這兩種,但這兩個最簡單。)

方式1:

jmsTemplate.convertAndSend("SpringActiveMQMsg",msg) // 參數一:你聲明的隊列的名稱 (隊列能夠不聲明,直接到ActionMQ 的控制檯建立也能夠,簡化配置,只不過要換服務器的時候還得建立一次。) 參數二:發送的消息

方式2:

上面程序中註釋的部分

 

 jmsTemplate.send(destination , new MessageCreator() {  @Override  public Message createMessage(Session session) throws JMSException {  TextMessage textMessage = session.createTextMessage(msg);  return textMessage;  }  });
它是能夠正確發送消息的 這裏能夠看到 發送方法也有兩個參數 (參數一:聲明隊列的beanID 參數二:發送的消息建立者)
在這裏能夠看到有一個 TextMessage 類, 這個是消息的一種類型,actionMQ中消息有多種類型
JMS規範中的消息類型包括TextMessage、MapMessage、ObjectMessage、BytesMessage、和StreamMessage等五種。ActiveMQ也有對應的實現
詳細的狀況參見博文:http://www.javashuo.com/article/p-uuamnkrr-db.html

 

順便介紹一下 消息的手動接收 TextMessage tm = (TextMessage) jmsTemplate.receive("SpringActiveMQMsg"); 使用jmsTemplate 模板 的receive("隊列名稱") ,能夠一次消費一條消息。(確定不止這一個方法了,具體的小夥伴們能夠自行摸索)

消息的消費類型對應發送類型。若是想要消息自動接收消費,那就得配置消息監聽器了。下面貼上消費者的代碼

 

package mq;
import javax.jms.*;

/**
 * Created by Martin Huang on 2018/4/20.
 */
//bean id
public class SimpleMsgListener implements MessageListener {

    //收到信息時的動做
    @Override
    public void onMessage(Message message ) {
        TextMessage textMessage = (TextMessage) message;
        try {
            System.out.println("收到的信息:" + textMessage.getText());
            textMessage.acknowledge();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

 

這個類必定要實現 MessageListener 的(固然你能夠實現其餘的,千萬別被我誤導了)重載 方法 onMessage(Message message ) 就是消費的方法,這裏我進行了消息的手動確認(默認是自動確認的,那樣消息不成功也會出隊,)消息不成功就不會出隊。

其實這樣作也很差。趕時間作ActionMQ,只能先把問題留下,之後再解決。若是有解決確認問題的好方法,歡迎留言評論,感激涕零...

相關文章
相關標籤/搜索