spring整合activemq發送MQ消息[Topic模式]實例

Topic模式消息發送實例html

一、pom引入java

<dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.11.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>4.1.4.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>4.1.4.RELEASE</version>
        </dependency>

二、生產者配置spring

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd">

    <!-- 配置JMS鏈接工廠 -->
    <bean id="providerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="failover:(tcp://192.168.147.131:61616)" />
        <property name="useAsyncSend" value="true" />
        <property name="clientID" value="providerClienctConnect" />
    </bean>

    <!-- 定義消息Destination -->
    <bean id="topicDestination"  class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="testSpringTopic"/>
    </bean>

    <!-- 消息發送者客戶端 -->
    <bean id="providerJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="providerConnectionFactory" />
        <property name="defaultDestination" ref="topicDestination" />
        <!-- 開啓訂閱模式 -->
        <property name="pubSubDomain" value="true"/>
        <property name="receiveTimeout" value="10000" />
        <!-- deliveryMode, priority, timeToLive 的開關要生效,必須配置爲true,默認false-->
        <property name="explicitQosEnabled" value="true"/>
        <!-- 發送模式 DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久 -->
        <property name="deliveryMode" value="1"/>
    </bean>
    
</beans>

生產者程序數據庫

package com.mq.spring.topic; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import javax.annotation.Resource; import javax.jms.*; /** * created on 2015/6/4 * @author dennisit@163.com * @version 1.0 */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations={"classpath:spring-topic.xml"}) public class TopicSender { @Resource(name = "providerJmsTemplate") private JmsTemplate jmsTemplate; @Test public void send(){ sendMqMessage(null,"spring activemq topic type message[with listener] !"); } /** * 說明:發送的時候若是這裏沒有顯示的指定destination.將用spring xml中配置的destination * @param destination * @param message */
    public void sendMqMessage(Destination destination, final String message){ if(null == destination){ destination = jmsTemplate.getDefaultDestination(); } jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); System.out.println("spring send text message..."); } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } }

三、消費者配置apache

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd">


    <!-- 配置JMS鏈接工廠 -->
    <bean id="consumerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="failover:(tcp://192.168.147.131:61616)" />
        <property name="useAsyncSend" value="true" />
        <property name="clientID" value="consumerClienctConnect" />
    </bean>

    <!-- 定義消息Destination -->
    <bean id="topic1Destination"  class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="testSpringTopic1"/>
    </bean>

    <!-- 配置消息消費監聽者 -->
    <bean id="consumerMessageListener" class="com.mq.spring.topic.ConsumerMessageListener" />

    <!-- 消息訂閱客戶端1 -->
    <bean id="consumerListenerClient1" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="consumerConnectionFactory" />
        <!-- 開啓訂閱模式 -->
        <property name="pubSubDomain" value="true"/>
        <property name="destination" ref="topic1Destination" />
        <property name="subscriptionDurable" value="true"/>
        <!---這裏是設置接收客戶端的ID,在持久化時,但這個客戶端不在線時,消息就存在數據庫裏,直到被這個ID的客戶端消費掉-->
        <property name="clientId" value="consumerClient1"/>
        <property name="messageListener" ref="consumerMessageListener" />
        <!-- 消息應答方式 Session.AUTO_ACKNOWLEDGE 消息自動簽收 Session.CLIENT_ACKNOWLEDGE 客戶端調用acknowledge方法手動簽收 Session.DUPS_OK_ACKNOWLEDGE 沒必要必須簽收,消息可能會重複發送 -->
        <property name="sessionAcknowledgeMode" value="1"/>
    </bean>

    <!-- 消息訂閱客戶端2 -->
    <bean id="consumerListenerClient2" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="consumerConnectionFactory" />
        <!-- 開啓訂閱模式 -->
        <property name="pubSubDomain" value="true"/>
        <property name="destination" ref="topicDestination" />
        <property name="subscriptionDurable" value="true"/>
        <!---這裏是設置接收客戶端的ID,在持久化時,但這個客戶端不在線時,消息就存在數據庫裏,直到被這個ID的客戶端消費掉-->
        <property name="clientId" value="consumerClient2"/>
        <property name="messageListener" ref="consumerMessageListener" />
        <!-- 消息應答方式 Session.AUTO_ACKNOWLEDGE 消息自動簽收 Session.CLIENT_ACKNOWLEDGE 客戶端調用acknowledge方法手動簽收 Session.DUPS_OK_ACKNOWLEDGE 沒必要必須簽收,消息可能會重複發送 -->
        <property name="sessionAcknowledgeMode" value="1"/>
    </bean>

</beans>

消費者監聽代碼session

package com.mq.spring.topic; import org.apache.commons.lang.builder.ToStringBuilder; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * created on 2015/6/4 * @author dennisit@163.com * @version 1.0 */
public class ConsumerMessageListener implements MessageListener{ @Override public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("---------消息消費---------"); System.out.println("消息內容:\t" + tm.getText()); System.out.println("消息ID:\t" + tm.getJMSMessageID()); System.out.println("消息Destination:\t" + tm.getJMSDestination()); System.out.println("---------更多信息---------"); System.out.println(ToStringBuilder.reflectionToString(tm)); System.out.println("-------------------------"); } catch (JMSException e) { e.printStackTrace(); } } }

運行結果:tcp

說明:屬於學習,網上資料結合我的理解,理解有誤的地方,期待指導和建議,共同窗習.ide

轉載請註明出處:[http://www.cnblogs.com/dennisit/p/4552686.html]學習

相關文章
相關標籤/搜索