MQ實戰

MQ是什麼?

MQ(消息隊列)是一種跨進程的通訊機制,用於上下游傳遞消息。html

MQ的優勢

異步處理,代碼解藕。java

spring中集成MQ的實現

1. xml配置spring

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jee="http://www.springframework.org/schema/jee"
       xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
      http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd
      http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
       http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd">
    <!-- windq配置start -->
    <!-- 生產者配置 -->
    <!-- JMS 鏈接工廠,必須配置destroy-method,會在停應用時,顯式地銷燬資源 -->
    <bean id="windqConnectionFactory" class="com.xxx.windq.jms.WindQConnectionFactory" destroy-method="destroy">
    </bean>
    <!-- 定義隊列 -->
    <bean id="fundDetailRequestQueue" class="com.xxx.windq.jms.destination.WindQQueue">
        <!--請求資金明細的隊列名稱-->
        <constructor-arg value="FUND_DETAIL_REQUEST_TTMS"/>
    </bean>
    <!-- 緩存session鏈接工廠,只可用於jmsTemplate發送消息,不可用於MessageListenerContainer -->
    <bean id="cacheConnectionFactory" class="com.xxx.windq.spring.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="windqConnectionFactory"/>
        <!-- 緩存住的會話數,若是併發峯值超出此閾值仍然會新建會話,只是這些新建的會話在idle後會被關閉。此值應填寫正常狀況下的併發量 -->
        <property name="sessionCacheSize" value="20"/>
    </bean>
    <bean id="windqJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="cacheConnectionFactory"/>
    </bean>
    <!-- 用來發送消息的Service實例 -->
    <bean id="jmsFundDetailRequestSender" class="com.xxx.fms.remit.jms.JmsFundDetailRequestSender">
        <property name="jmsTemplate" ref="windqJmsTemplate"/>
        <!-- 此處關聯定義的隊列或主題 -->
        <property name="queueOrTopic" ref="fundDetailRequestQueue"/>
    </bean>

    <!-- 消費者配置 -->
    <!-- 用來接收消息的Listener實例 -->
    <bean id="jmsFundDetailListener" class="com.xxx.fms.remit.jms.JmsFundDetailListener"/>
    <bean id="listenerContainer" class="com.xxx.windq.spring.DefaultMessageListenerContainer">
        <!-- 使用WINDQ原生的鏈接工廠,不要使用cachingConnectionFactory,由於MLC本身內部有緩存機制 -->
        <property name="connectionFactory" ref="windqConnectionFactory"/>
        <!-- 填寫上述定義中的實際要消費的隊列(該隊列由資金系統提供) -->
        <property name="destination" ref="myQueue"/>
        <!-- 業務處理類 -->
        <property name="messageListener" ref="jmsFundDetailListener"/>
        <!--單個JVM的併發consumer的數量:最小-最大。例如1-1,表示最小的和最大的併發消費者數量都是1 -->
        <property name="concurrency" value="1-1"/>
        <!-- 打開JMS會話事務(非JTA事務),session類型爲transaction -->
        <property name="sessionTransacted" value="true"/>
    </bean>
    <!-- windq配置end -->
</beans>

2. 生產者JmsFundDetailRequestSender實現:緩存

@Component("jmsFundDetailRequestSender")
class JmsFundDetailRequestSender {

    private static Logger LOGGER = LoggerFactory.getLogger(JmsFundDetailRequestSender.class);

    private Destination queueOrTopic;

    private JmsTemplate jmsTemplate;

    /**
     * 向指定隊列發送消息
     * @param message
     */
    public void sendMessage(final Serializable message) {
        LOGGER.info("發送資金明細查詢windq請求,sendMessage:{}", ToStringBuilder.reflectionToString(message));
        jmsTemplate.send(queueOrTopic, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                ObjectMessage objectMessage = session.createObjectMessage();
                // 若是須要設置如下任一屬性頭,就調用下clearProperties()方法,默認是不容許設置屬性的,這個語句會打開屬性變爲可設置
                objectMessage.clearProperties();
                // 定位本條消息的業務字段,用於消息日誌查詢。例如若是填寫訂單號,那麼經過訂單號就能查詢到這條消息。非必填字段
                objectMessage.setStringProperty(MessageHeader.WINDQ_MSG_ABSTRACT_HEADER, message.toString());
                // 填寫消息體
                objectMessage.setObject(message);
                return objectMessage;
            }
        });
    }

    public Destination getQueueOrTopic() {
        return queueOrTopic;
    }

    public void setQueueOrTopic(Destination queueOrTopic) {
        this.queueOrTopic = queueOrTopic;
    }

    public JmsTemplate getJmsTemplate() {
        return jmsTemplate;
    }

    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }
}

注:代碼中用到了匿名內部類,有關匿名內部類的解釋,能夠查看匿名內部類詳解session

3. 消費者JmsFundDetailListener實現:併發

@Component("jmsFundDetailListener")
public class JmsFundDetailListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        if (message != null) {
            // 業務處理代碼
        }
    }
}
相關文章
相關標籤/搜索