MQ(消息隊列)是一種跨進程的通訊機制,用於上下游傳遞消息。html
異步處理,代碼解藕。java
1. xml配置spring
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd"> <!-- windq配置start --> <!-- 生產者配置 --> <!-- JMS 鏈接工廠,必須配置destroy-method,會在停應用時,顯式地銷燬資源 --> <bean id="windqConnectionFactory" class="com.xxx.windq.jms.WindQConnectionFactory" destroy-method="destroy"> </bean> <!-- 定義隊列 --> <bean id="fundDetailRequestQueue" class="com.xxx.windq.jms.destination.WindQQueue"> <!--請求資金明細的隊列名稱--> <constructor-arg value="FUND_DETAIL_REQUEST_TTMS"/> </bean> <!-- 緩存session鏈接工廠,只可用於jmsTemplate發送消息,不可用於MessageListenerContainer --> <bean id="cacheConnectionFactory" class="com.xxx.windq.spring.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="windqConnectionFactory"/> <!-- 緩存住的會話數,若是併發峯值超出此閾值仍然會新建會話,只是這些新建的會話在idle後會被關閉。此值應填寫正常狀況下的併發量 --> <property name="sessionCacheSize" value="20"/> </bean> <bean id="windqJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="cacheConnectionFactory"/> </bean> <!-- 用來發送消息的Service實例 --> <bean id="jmsFundDetailRequestSender" class="com.xxx.fms.remit.jms.JmsFundDetailRequestSender"> <property name="jmsTemplate" ref="windqJmsTemplate"/> <!-- 此處關聯定義的隊列或主題 --> <property name="queueOrTopic" ref="fundDetailRequestQueue"/> </bean> <!-- 消費者配置 --> <!-- 用來接收消息的Listener實例 --> <bean id="jmsFundDetailListener" class="com.xxx.fms.remit.jms.JmsFundDetailListener"/> <bean id="listenerContainer" class="com.xxx.windq.spring.DefaultMessageListenerContainer"> <!-- 使用WINDQ原生的鏈接工廠,不要使用cachingConnectionFactory,由於MLC本身內部有緩存機制 --> <property name="connectionFactory" ref="windqConnectionFactory"/> <!-- 填寫上述定義中的實際要消費的隊列(該隊列由資金系統提供) --> <property name="destination" ref="myQueue"/> <!-- 業務處理類 --> <property name="messageListener" ref="jmsFundDetailListener"/> <!--單個JVM的併發consumer的數量:最小-最大。例如1-1,表示最小的和最大的併發消費者數量都是1 --> <property name="concurrency" value="1-1"/> <!-- 打開JMS會話事務(非JTA事務),session類型爲transaction --> <property name="sessionTransacted" value="true"/> </bean> <!-- windq配置end --> </beans>
2. 生產者JmsFundDetailRequestSender實現:緩存
@Component("jmsFundDetailRequestSender") class JmsFundDetailRequestSender { private static Logger LOGGER = LoggerFactory.getLogger(JmsFundDetailRequestSender.class); private Destination queueOrTopic; private JmsTemplate jmsTemplate; /** * 向指定隊列發送消息 * @param message */ public void sendMessage(final Serializable message) { LOGGER.info("發送資金明細查詢windq請求,sendMessage:{}", ToStringBuilder.reflectionToString(message)); jmsTemplate.send(queueOrTopic, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { ObjectMessage objectMessage = session.createObjectMessage(); // 若是須要設置如下任一屬性頭,就調用下clearProperties()方法,默認是不容許設置屬性的,這個語句會打開屬性變爲可設置 objectMessage.clearProperties(); // 定位本條消息的業務字段,用於消息日誌查詢。例如若是填寫訂單號,那麼經過訂單號就能查詢到這條消息。非必填字段 objectMessage.setStringProperty(MessageHeader.WINDQ_MSG_ABSTRACT_HEADER, message.toString()); // 填寫消息體 objectMessage.setObject(message); return objectMessage; } }); } public Destination getQueueOrTopic() { return queueOrTopic; } public void setQueueOrTopic(Destination queueOrTopic) { this.queueOrTopic = queueOrTopic; } public JmsTemplate getJmsTemplate() { return jmsTemplate; } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } }
注:代碼中用到了匿名內部類,有關匿名內部類的解釋,能夠查看匿名內部類詳解。session
3. 消費者JmsFundDetailListener實現:併發
@Component("jmsFundDetailListener") public class JmsFundDetailListener implements MessageListener { @Override public void onMessage(Message message) { if (message != null) { // 業務處理代碼 } } }