activemq使用方法

說說在項目裏是怎麼使用activemq(簡稱爲amq)進行通訊的。java

有2個系統,面向不一樣的用戶,簡稱爲系統A和系統B。本文爲了簡單,只將系統A做爲 隊列A.CreateMessage的生產端,系統B做爲 隊列A.CreateMessage的消費端,傳輸的message可爲一個設計好的類的對象,本文爲了簡單,傳輸的是一個String對象。spring

另外,系統A也能夠做爲另外一隊列QC的消費端,系統B做爲隊列QC的生產端。apache

 1.下載一個apache-activemq-5.10.2,根據系統類型(操做系統位數),選擇啓動bin目錄下的win32或win64目錄下的activemq.bat文件。啓動後,打開瀏覽器,輸入localhost:8161/admin/queues.jsp,瀏覽器

若是頁面是下面這樣的服務器

     

輸入用戶名:admin,密碼:admin就OK了。session

          

2.amq也啓動了,那麼接下來是在系統A加上amq相關內容。app

項目目錄結構以下:jsp

    

 

系統A的applicationContext-amq.xml文件:tcp

<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
 http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.10.2.xsd">
    <!--
        使用spring的listenerContainer,消息用持久化保存,服務器重啓不會丟失
    -->
     <!-- 鏈接外部的activeMQ-->
    <amq:connectionFactory id="jmsConnectionFactory" userName="admin" password="admin" brokerURL="tcp://localhost:61616"></amq:connectionFactory>

    <!--  Spring JmsTemplate config -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory">
            <!--  lets wrap in a pool to avoid creating a connection per send -->
            <bean class="org.springframework.jms.connection.SingleConnectionFactory">
                <property name="targetConnectionFactory" ref="jmsConnectionFactory" />
            </bean>
        </property>
        <!-- custom MessageConverter -->
        <property name="messageConverter" ref="defaultMessageConverter" />
    </bean>
    
    <!-- converter  -->
    <bean id="defaultMessageConverter" class="com.pack.app.amq.DefaultMessageConverter" />
    
    <!--  ActiveMQ destinations  -->
    <!--  使用Queue方式-->
    <amq:queue name="QUEUE" physicalName="TESTQ" />

    <bean id="queueMessageProducer" class="tools.amq.QueueMessageProducer">
        <property name="template" ref="jmsTemplate" />
        <property name="destination" ref="QUEUE" />
    </bean>
    
  
  <!-- consumer for queue -->
    <bean id="queueConsumer" class="tools.amq.QueueConsumer" />

    <!-- Message Listener for queue -->
    <bean id="queueListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
        <constructor-arg ref="queueConsumer" />
        <!--  may be other method -->
        <property name="defaultListenerMethod" value="receive" />
        <!-- custom MessageConverter define -->
        <property name="messageConverter" ref="defaultMessageConverter" />
    </bean>

    <!--  listener container,MDP無需實現接口 -->
    <bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="jmsConnectionFactory" />
        <property name="destination" ref="QUEUE" />
        <property name="messageListener" ref="queueListener" />
    </bean>

<!-- 測試 向MQ發消息 --> <amq:queue name="CreateMessage" physicalName="A.CreateMessage" /> <!-- 生產數據 --> <bean id="createMessageProducer" class="com.pack.app.amq.producer.CreateMessageProducer"> <property name="template" ref="jmsTemplate" /> <property name="destination" ref="CreateMessage" /> </bean> </beans>

DefaultMessageConverter.java測試

public class DefaultMessageConverter implements MessageConverter {
    /**
     * Logger for this class
     */
    private static final Log log = LogFactory.getLog(DefaultMessageConverter.class);

    public Message toMessage(Object obj, Session session) throws JMSException {
        if (log.isDebugEnabled()) {
            log.debug("toMessage(Object, Session) - start");
        }

        // check Type
        ActiveMQObjectMessage objMsg = (ActiveMQObjectMessage) session.createObjectMessage();
        HashMap<String, byte[]> map = new HashMap<String, byte[]>();
        try {
            // POJO must implements Seralizable
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(bos);
            oos.writeObject(obj);
            map.put("POJO", bos.toByteArray());
            objMsg.setObjectProperty("Map", map);

        } catch (IOException e) {
            e.printStackTrace();
            log.error("toMessage(Object, Session)", e);
        }
        return objMsg;
    }

    public Object fromMessage(Message msg) throws JMSException {
        if (log.isDebugEnabled()) {
            log.debug("fromMessage(Message) - start");
        }

        if (msg instanceof ObjectMessage) {
            HashMap<String, byte[]> map = (HashMap<String, byte[]>) ((ObjectMessage) msg).getObjectProperty("Map");
            try {
                // POJO must implements Seralizable
                ByteArrayInputStream bis = new ByteArrayInputStream(map.get("POJO"));
                ObjectInputStream ois = new ObjectInputStream(bis);
                Object returnObject = ois.readObject();
                return returnObject;
            } catch (IOException e) {
                e.printStackTrace();
                log.error("fromMessage(Message)", e);

            } catch (ClassNotFoundException e) {
                e.printStackTrace();
                log.error("fromMessage(Message)", e);
            }

            return null;
        } else {
            throw new JMSException("Msg:[" + msg + "] is not Map");
        }
    }
}

QueueMessageProducer
.java
import org.springframework.jms.core.JmsTemplate;

import javax.jms.Queue;

/**
 * Date: 2015-7-1
 * Time: 17:14:23
 */
public class QueueMessageProducer {

    private JmsTemplate template;

    private Queue destination;

    public void setTemplate(JmsTemplate template) {
        this.template = template;
    }

    public void setDestination(Queue destination) {
        this.destination = destination;
    }

    public void send(FooMessage message) {
        template.convertAndSend(this.destination, message);
    }

}

CreateMessageProducer.java(消息生產者)

import javax.jms.Queue;
import org.springframework.jms.core.JmsTemplate;

public class CreateMessageProducer {

     private JmsTemplate template;

     private Queue destination;
     
     public void setTemplate(JmsTemplate template) {
        this.template = template;
    }

    public void setDestination(Queue destination) {
        this.destination = destination;
    }

    public void send(String str) {
        template.convertAndSend(this.destination, str);
        System.out.println("system A send message to system B~~~~~~~~~~");
    }
}

 

 3.在系統B加上amq相關內容。

applicationContext-amq.xml文件

<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
 http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.10.2.xsd">
    <!--
        使用spring的listenerContainer,消息用持久化保存,服務器重啓不會丟失
    -->
     <!-- 鏈接外部的activeMQ-->
    <amq:connectionFactory id="jmsConnectionFactory" userName="admin" password="admin" brokerURL="tcp://localhost:61616"></amq:connectionFactory>

    <!--  ActiveMQ destinations  -->
    <!--  使用Queue方式-->
    <amq:queue name="QUEUE" physicalName="TESTQ" />

    <!--  Spring JmsTemplate config -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory">
            <!--  lets wrap in a pool to avoid creating a connection per send -->
            <bean class="org.springframework.jms.connection.SingleConnectionFactory">
                <property name="targetConnectionFactory" ref="jmsConnectionFactory" />
            </bean>
        </property>
        <!-- custom MessageConverter -->
        <property name="messageConverter" ref="defaultMessageConverter" />
    </bean>

    <!-- converter  -->
    <bean id="defaultMessageConverter" class="com.pack.app.amq.DefaultMessageConverter" />

    <bean id="queueMessageProducer" class="tools.amq.QueueMessageProducer">
        <property name="template" ref="jmsTemplate" />
        <property name="destination" ref="QUEUE" />
    </bean>

    <!-- consumer for queue -->
    <bean id="queueConsumer" class="tools.amq.QueueConsumer" />

    <!-- Message Listener for queue -->
    <bean id="queueListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
        <constructor-arg ref="queueConsumer" />
        <!--  may be other method -->
        <property name="defaultListenerMethod" value="receive" />
        <!-- custom MessageConverter define -->
        <property name="messageConverter" ref="defaultMessageConverter" />
    </bean>

    <!--  listener container,MDP無需實現接口 -->
    <bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="jmsConnectionFactory" />
        <property name="destination" ref="QUEUE" />
        <property name="messageListener" ref="queueListener" />
    </bean>
    
    
    <!-- 測試  接收消息 -->
    <amq:queue name="CreateMessage" physicalName="A.CreateMessage" />
    <!-- 接收數據 -->
    <bean id="createMessageConsumer" class="com.pack.app.amq.consumer.CreateMessageConsumer" />  
    <!-- 監聽 -->
    <bean id="createMessageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
        <constructor-arg ref="createMessageConsumer" />
        <!--  may be other method -->
        <property name="defaultListenerMethod" value="process" />
        <!-- custom MessageConverter define -->
        <property name="messageConverter" ref="defaultMessageConverter" />
    </bean>

    <!--  listener container,MDP無需實現接口 -->
    <bean id="createMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="jmsConnectionFactory" />
        <property name="destination" ref="CreateMessage" />
        <property name="messageListener" ref="createMessageListener" />
        
        <!-- 消費者個數  -->
        <!-- <property name="concurrentConsumers" value="4"></property> -->
    </bean>
</beans>
DefaultMessageConverter.java、QueueMessageProducer.java、QueueConsumer.java與系統A同樣。

CreateMessageConsumer.java
public class CreateMessageConsumer {

    @Autowired
    public AgentService agentService;
    
    public void process(String str) {
        System.out.println("system B receive message from  system A ");
        
        agentService.agentPath(str);
    }
}

4.啓動系統A和系統B的應用,只要系統A往隊列A.CreateMessage產生消息,系統B會自動接收到消息。

相關文章
相關標籤/搜索