springJMS+activeMQ實踐

運行環境:jdk1.6 ,javaEE5 , spring2.5 ,activeMQ5.4.3.html

必定要注意activeMQ的版本與jdk的兼容性,最新的activeMQ版本估計要在jdk1.7以上才能運行。java

先說一下activeMQ的安裝:web

一、下載:http://activemq.apache.org/download.html 選擇合適的Windows版本spring

二、安裝apache

(1) 首先配置JAVA環境變量api

JAVA_HOME=D:\Program Files\Java\jdk1.5.0服務器

CLASSPAHT=.;%JAVA_HOME%\lib網絡

PATH=%JAVA_HOME%\bin;session

(2)直接解壓至任意目錄(例如:D:\apache-activemq-5.3.0)app

三、啓動ActiveMQ服務器:直接運行\bin\win32\activemq.bat

當運行成功後,界面顯示: Started SelectChannelConnector@0.0.0.0:8161 即說明activemq啓動成功。

四、打開ActiveMQ消息管理後臺系統 http://localhost:8161/admin/

 

須要依賴的jar包有:spring.jar , activemq-all-5.4.3.jar , commons-logging-api-1.1.jar , commons-io-1.3.2.jar

 

好了,準備工做作完後,開始上代碼,

先看一下,咱們最終的Spring配置文件applicationContext.xml的內容,以下所示:

 <?xml version="1.0" encoding="UTF-8"?> 
<beans          
 xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:context="http://www.springframework.org/schema/context" 
    xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:tx="http://www.springframework.org/schema/tx"
 xmlns:p="http://www.springframework.org/schema/p"
 xmlns:jee="http://www.springframework.org/schema/jee"
 xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
  http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
  http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd
  http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-2.5.xsd
  http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd">
 
 <!-- jms 鏈接工廠 -->
    <bean id="jmsFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <!-- 配置代理的地址,即配置activeMQ的鏈接URI,
         讓jms工廠可以鏈接到activeMQ服務器(將activeMQ暴露給客戶端使用,
         負責客戶端與activeMQ之間的鏈接通訊) -->
        <property name="brokerURL">
            <value>tcp://localhost:61616</value><!-- 一種標準URI地址,意思是說標識一個本地的端口號位61616的TCP鏈接(其中,"61616"是activeMQ默認的鏈接端口號) -->
        </property>
    </bean>
    <!-- ActiveMQ鏈接器將這種簡單等級結構的URI模式稱爲低等級的鏈接器(low-levelconnectors),
     併爲這些鏈接器實現了基本的網絡通訊協議。低等級鏈接器URIs使用主題(scheme)標識底層使用的網絡協議,
     使用路徑元素定位網絡資源服務(通常爲主機名加上端口號),使用查詢元素用來肯定鏈接器附加信息。 -->

    <!-- jms 鏈接池 -->
    
    <!--  
    <bean id="pooledJmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
        <property name="connectionFactory">
            <ref local="jmsFactory" />
        </property>
    </bean>
    -->
 <!-- jms 模板 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory">
            <ref local="jmsFactory" />
        </property>
    </bean>
    
    <!-- jms Topic -->
    <bean id="myTopic" class="org.apache.activemq.command.ActiveMQTopic"
        autowire="constructor">
        <constructor-arg value="STOCKS.JAVA" />
    </bean>

    <!-- jms Consumer -->
    <bean id="javaConsumer"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="jmsFactory" />
        <property name="destination" ref="myTopic" />
        <property name="messageListener" ref="myTextListener" />
    </bean>

 <!-- 消息監聽器 -->
    <bean id="myTextListener" class="demo.TextListener">
    </bean>
    
    <!-- 消息發佈器 -->
    <bean id="springPublisher" class="demo.SpringPublisher">
        <property name="template">
            <ref local="jmsTemplate" />
        </property>
        <property name="topic">
            <ref local="myTopic" />
        </property>
    </bean>
</beans>

接下來,消息生成器代碼,實現spring的MessageCreator接口:

package demo;
import java.util.Date;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.springframework.jms.core.MessageCreator;
public class MyMessageCreator implements MessageCreator {
 /**
     * 消息序號
     */
    private int msgNo;
 
    public MyMessageCreator(int no) {
        this.msgNo = no;
    }
    
 @Override
 public Message createMessage(Session session) throws JMSException {
  TextMessage textMsg = session.createTextMessage();
        textMsg.setText(new Date() + "第" + this.msgNo + "條消息發出");
 
        return textMsg;
 }

}

接下來,消息監聽器,實現javaEE的規範MessageListener接口便可,由於要注入到spring的DefaultMessageListenerContainer中。此監聽器經過監聽來自destination(在spring中配置)的消息,一旦有消息就打印出來:

 package demo;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class TextListener implements MessageListener {
 @Override
 public void onMessage(Message message) {
  TextMessage msg = null;
   
        try {
            if (message instanceof TextMessage) {
                msg = (TextMessage) message;
                System.out.println("Reading message: " + msg.getText());
            } else {
                System.out.println("Message of wrong type: "
                        + message.getClass().getName());
            }
        } catch (JMSException e) {
            System.out.println("JMSException in onMessage(): " + e.toString());
        } catch (Throwable t) {
            System.out.println("Exception in onMessage():" + t.getMessage());
        }
 }
}

下面是消息發佈器,經過spring的jms模板,便可輕鬆的得到與activeMQ的鏈接與通訊,從而得到Connection和Destination,再經過JmsTemplate的send方法,便可發送消息到指定的destination(在spring中配置)中,以供客戶端接收:

package demo;
import javax.jms.Destination;
import org.springframework.jms.core.JmsTemplate;
public class SpringPublisher {
 /**
     * Jms模板
     */
    private JmsTemplate template;
 
    /**
     * Topic
     */
    private Destination topic;
 
    public JmsTemplate getTemplate() {
        return template;
    }
 
    public void setTemplate(JmsTemplate template) {
        this.template = template;
    }
 
    public Destination getTopic() {
        return topic;
    }
 
    public void setTopic(Destination topic) {
        this.topic = topic;
    }
 
    /**
     * Start
     * 
     * @throws InterruptedException
     */
    public void start() throws InterruptedException {
 
        int messageCount = 10;
 
        while ((--messageCount) > 0) {
            sendMessage(messageCount);
            Thread.sleep(1000);
        }
    }
 
    /**
     * 消息發送
     */
    protected void sendMessage(int msgNO) {
 
        this.template.send(this.topic, new MyMessageCreator(msgNO));
    }
}

至此,基本的jms就已經搭建好了,很簡單吧,一個spring上下文配置,一個消息生成器,一個消息發佈器,一個監聽器,搞定。接下來,編寫一個測試類,看運行結果(注意在運行測試類前,必定要先啓動activeMQ服務器):

 package test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import demo.SpringPublisher;
public class SpringJmsTestMain {
 /**
  * @param args
  */
 public static void main(String[] args) throws InterruptedException {
   
        ApplicationContext context = new ClassPathXmlApplicationContext(
                new String[] { "applicationContext.xml" });
 
        SpringPublisher publisher = (SpringPublisher) context
                .getBean("springPublisher");
        publisher.start();
    }
}

運行結果以下:

Reading message: Sun Jun 28 19:40:05 CST 2015第9條消息發出Reading message: Sun Jun 28 19:40:06 CST 2015第8條消息發出Reading message: Sun Jun 28 19:40:07 CST 2015第7條消息發出Reading message: Sun Jun 28 19:40:08 CST 2015第6條消息發出Reading message: Sun Jun 28 19:40:09 CST 2015第5條消息發出Reading message: Sun Jun 28 19:40:10 CST 2015第4條消息發出Reading message: Sun Jun 28 19:40:12 CST 2015第3條消息發出Reading message: Sun Jun 28 19:40:13 CST 2015第2條消息發出Reading message: Sun Jun 28 19:40:14 CST 2015第1條消息發出

相關文章
相關標籤/搜索