## ActiveMQ Config activemq.brokerURL=tcp\://127.0.0.1\:61616 activemq.userName=admin activemq.password=password activemq.pool.maxConnection=10 activemq.queue=mailqueue activemq.topic=mailtopic
<!-- https://mvnrepository.com/artifact/org.springframework/spring-jms --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.3.7.RELEASE</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-pool --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.15.3</version> </dependency>
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:amq="http://activemq.apache.org/schema/core" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd"> <!-- ActiveMQConnectionFactory就是JMS中負責建立到ActiveMQ鏈接的工廠類 --> <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" > <property name="brokerURL" value="${activemq.brokerURL}"/> <property name="userName" value="${activemq.userName}"/> <property name="password" value="${activemq.password}"/> </bean> <!-- 建立鏈接池 --> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory" ref="connectionFactory"/> <property name="maxConnections" value="${activemq.pool.maxConnection}"/> </bean> <!-- Spring 爲咱們提供了多個 ConnectionFactory,有 SingleConnectionFactory 和 CachingConnectionFactory --> <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="pooledConnectionFactory"/> </bean> <!-- Spring 提供的 JMS 工具類,它能夠進行消息發送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 這個 connectionFactory 對應的是咱們定義的 Spring 提供的那個 ConnectionFactory 對象 --> <property name="connectionFactory" ref="cachingConnectionFactory"/> <property name="defaultDestination" ref="queueDestination"/> </bean> <!--定義消息隊列(Queue)--> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 設置消息隊列的名字 --> <constructor-arg index="0" value="${activemq.queue}"/> </bean> </beans>
ConnectionFactory 是用於產生到 JMS 服務器的連接的,Spring 爲咱們提供了多個 ConnectionFactory,有 SingleConnectionFactory 和 CachingConnectionFactory。java
import com.alibaba.fastjson.JSONObject; import com.github.binarylei.jms.spring.core.Mail; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Service; /** * @author: leigang * @version: 2018-04-03 */ @Service("mqProducer") public class MQProducer { @Autowired private JmsTemplate jmsTemplate; public void sendMail(Mail mail) { jmsTemplate.send((session) -> { return session.createTextMessage(JSONObject.toJSONString(mail)); }); } }
import com.alibaba.fastjson.JSONObject; import com.github.binarylei.jms.spring.core.Mail; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Service; import javax.jms.Destination; /** * @author: leigang * @version: 2018-04-03 */ @Service("mqCustumer") public class MQCustumer { @Autowired private JmsTemplate jmsTemplate; @Autowired private Destination destination; public void sendMail() { String json = (String) jmsTemplate.receiveAndConvert(destination); Mail mail = (Mail) JSONObject.parseObject(json, Mail.class); System.out.println(mail); } }
咱們在 spring 中直接配置異步接收消息的監聽器,這樣就至關於在 spring 中配置了消費者,在接受消息的時候就沒必要要啓動消費者了。git
spring-jms.xml:github
<!--消費者,監聽--> <bean id="messageListener" class="com.github.binarylei.jms.spring.demo1.MessageListener"/> <bean id="messageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="queueDestination"/> <property name="messageListener" ref="messageListener"/> </bean>
生產者往指定目的地 Destination 發送消息後,接下來就是消費者對指定目的地的消息進行消費了。那麼消費者是如何知道有生產者發送消息到指定目的地 Destination了呢?這是經過 Spring 爲咱們封裝的消息監聽容器 MessageListenerContainer 實現的,它負責接收信息,並把接收到的信息分發給真正的 MessageListener 進行處理。spring
每一個消費者對應每一個目的地都須要有對應的 MessageListenerContainer。對於消息監聽容器而言,除了要知道監聽哪一個目的地以外,還須要知道到哪裏去監聽,也就是說它還須要知道去監聽哪一個 JMS 服務器,這是經過在配置 MessageConnectionFactory 的時候往裏面注入一個 ConnectionFactory 來實現的。因此咱們在配置一個 MessageListenerContainer 的時候有三個屬性必須指定,一個是表示從哪裏監聽的 ConnectionFactory;一個是表示監聽什麼的 Destination;一個是接收到消息之後進行消息處理的 MessageListener。apache
Spring 一共爲咱們提供了兩種類型的 MessageListenerContainer:json
消息監聽器:緩存
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.TextMessage; /** * @author: leigang * @version: 2018-04-03 */ public class MessageListener implements javax.jms.MessageListener { @Override public void onMessage(Message message) { TextMessage msg = (TextMessage) message; try { System.out.println("消息:" + msg.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
在 spring-jms.xml 中將 ActiveMQTopic 生成 Topic,其它沒什麼變化:服務器
<!--這個是隊列目的地,發佈訂閱--> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg index="0" value="spring-Topic"/> </bean>
天天用心記錄一點點。內容也許不重要,但習慣很重要!session