JMS進階篇--使用JMS規範鏈接MQ以及spring整合JMS

ActiveMQ的下載和安裝

  根據地址http://activemq.apache.org/download.html 找到activeMQ的下載地址,下載相應的版本html

1.win安裝activeMQjava

    打開下載文件下的bin文件找到對應電腦版本(32位或是64位)下的的啓動程序web

其中有兩個啓動程序spring

第一個是以DOS命令行的形式雙擊啓動apache

當看到瀏覽器訪問地址時即爲啓動成功,須要注意的是在整個MQ的運行中,該命令窗口是不能被關閉的.瀏覽器

若是以這種形式的啓動方式不方便的話可使用第二個啓動程序,以服務的形式啓動activeMQ,session

雙擊InstallService.bat文件,而後在服務中啓動便可app

 

無論以哪一種方式啓動,成功後都應該在瀏覽器中訪問127.0.0.1:8161成功訪問,其中ip位實際機器IP,8161爲activeMQ web管理端口.,在後續的使用中將會用到activeMQ的默認端口61616,這些在conf配置中均可修改maven

值得注意的是 ,若是在啓動過程當中在端口沒有沒佔用狀況下任啓動失敗 ,則啓動bat文件時右鍵以管理員身份運行.tcp

2.Liunx 安裝activeMQ

在官網上 找到Liunx的安裝版本 ,也能夠複製地址,在Liunx使用wget 在線下載,固然這是在Liunx能鏈接外網的狀況下.在這裏我使用官網提供的地址http://www.apache.org/dyn/closer.cgi?filename=/activemq/5.15.0/apache-activemq-5.15.0-bin.tar.gz&action=download

 1.cd /opt 將tar.gz文件上傳到這裏  
2.解壓縮 tar -zxvf apache-activemq-5.15.0-bin.tar.gz

3.啓動: 
cd /opt/apache-activemq-5.15.0/bin 
 sudo ./activemq start

4.端口開放

須要開放8161(web管理頁面端口)、61616(activemq服務監控端口) 兩個端口 
firewall-cmd –zone=public –add-port=8161/tcp –permanent 
firewall-cmd –zone=public –add-port=61616/tcp –permanent 
firewall-cmd –reload 

5.打開web管理頁面 
http://IP:8161/admin 

一樣的以這種方式啓動,終端也是不能關閉的.

使用JMS規範鏈接ActiveMQ消息中間件

對於JMS提供的API在上一章節已經瞭解,在這裏以簡單的java project 來編寫JMS的HelloWord

Queue:

1.建立生產者:

package com.jms.produce;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
public class QueueProducer {
    private static  final String url="tcp://localhost:61616";//activeMQ地址
    private static  final String queueName="QUEUE_TEST";//隊列名稱
    private static  ConnectionFactory connectionFactory;//鏈接工廠
    private static Connection connection;//鏈接
    private static Session session ;//鏈接會話
    private static Destination destination;//目的地
    private static MessageProducer producer;//消息提供者--生產者

    public static void main(String[] args) throws JMSException{

        try {
            //建立鏈接工廠
             connectionFactory=new ActiveMQConnectionFactory(url);
            //建立鏈接
            connection=connectionFactory.createConnection();
            //啓動鏈接
            session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //建立隊列(目的地)
           destination=session.createQueue(queueName);
            //建立生成者
           producer=session.createProducer(destination);
            for (int i = 0; i < 10; i++) {
                TextMessage message=session.createTextMessage("test" + i);
                producer.send(message);
                System.out.println("發送消息:"+message.getText());
            }
        }catch (JMSException e){
            e.printStackTrace();
        }finally {//關閉鏈接
            if (connection != null)
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
        }
    }
}

2.建立消費者:

package com.jms.consumer;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
 
public class QueueConsumer {
    private static  final String url="tcp://localhost:61616";//activeMQ地址
    private static  final String queueName="QUEUE_TEST";//隊列名稱
    private  static  ConnectionFactory connectionFactory ;//鏈接工廠
    private  static Session session;//鏈接會話
    private  static Destination destination ;//目的地
    private  static MessageConsumer messageConsumer ;//消息消費者--消費者
    public static void main(String[] args)  {
        try {
            //建立鏈接工廠
            connectionFactory = new ActiveMQConnectionFactory(url);
            //建立鏈接
            Connection  connection = connectionFactory.createConnection();
            //啓動鏈接
            connection.start();
            //建立session會話
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //建立隊列(目的地)
            destination=session.createQueue(queueName);
            //建立一個消費者
            messageConsumer=session.createConsumer(destination);
            //消息監聽器
            messageConsumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("接收消息:" + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
        }catch (JMSException e){
            e.printStackTrace();
        }
    }
}

生產者和消費者queueName應該保持一致,不然介紹不到消息提供者發送的消息.應該儘可能保證先啓動Consumer.(若是先啓動Producer,則消息會被第一個啓動的Consumer所有消費掉)

能夠看到消息已經發送到了隊列"QUEUE_TEST",能夠在web中查看隊列的.當建立多個消費者監聽同一個隊列時,多個消費者平均消費隊列中的消息.

Topic:

主題模式的代碼與隊列模式大同小異,只是在建立目的地是有所不一樣,其它的工廠對象,會話等一致.在此不贅述.

//建立主題(目的地)
destination=session.createTopic(topicName);
//建立隊列(目的地)
destination=session.createQueue(queueName);

Spring整合JMS

使用java提供的JMS規範APi做爲了解jms的基礎知識,在此基礎上使用Spring整合Jms提升編碼效率.

針對冗長和重複的JMS代碼,Spring給出的解決方案市JmsTemplate.JmsTemplate能夠建立鏈接,得到會話以及發送和接收消息.這時的咱們能夠專一與構建要發送的消息或者處理接收到的消息. 同時,也可出對拋出的異常作分析而非統一的拋出JMSException.

1.首先導入spring,activeMQ所須要的Jar包.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.chuyu</groupId>
    <artifactId>jms-test</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.14.2</version>
            <exclusions>
                <exclusion>
                    <!-- 移除-spring-context -->
                    <groupId>org.springframework</groupId>
                    <artifactId>spring-context</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-spring -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-spring</artifactId>
            <version>5.14.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework/spring-context -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>4.3.11.RELEASE</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework/spring-jms -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>4.3.11.RELEASE</version>
        </dependency>
    </dependencies>
</project>

2.建立xml配置文件

    commApp.xml(生產者和消費者共用的配置)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
       <context:annotation-config/>
       <!-- ActiveMq 提供的鏈接工廠 默認端口爲61616 -->
       <bean id="targetConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
              <property name="brokerURL">
                     <value>tcp://localhost:61616</value>
              </property>
       </bean>
       <!--spring Jms 提供的鏈接池        -->
       <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
              <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
       </bean>
       <!-- 目的地類型及名稱-->

       <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
              <constructor-arg value="QUEUE.TEST"/>
       </bean>
       <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
              <constructor-arg value="TOPIC.TEST"/>
       </bean>
       <!-- Jms模板-->
       <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
              <property name="connectionFactory" ref="connectionFactory"/>
       </bean>
</beans>

3.編寫發送消息的接口服務

SendMessageImpl

package com.jms.services.servicesImpl;

import com.jms.services.SendDataServices;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import javax.jms.*;

public class SendMessageImpl implements SendDataServices {

    @Autowired
    private JmsTemplate jmsTemplate;
    @Resource(name = "topicDestination")
    private Destination destination;
    //@Resource(name = "queueDestination")
    //private Destination destination;
    @Override
    public void sendMessge(final  String message) {
        jmsTemplate.send(destination, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                    TextMessage textMessage= session.createTextMessage(message);
                    return textMessage;
            }
        });
    }
}

3.配置生產者

producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
       <import resource="commApp.xml"/>
       <bean id="sendMessageImpl" class="com.jms.services.servicesImpl.SendMessageImpl"/>
</beans>

至今生產者已配置完.

配置消費者

consumer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">

       <import resource="commApp.xml"/>
       <bean id="receiveDataService" class="com.jms.services.servicesImpl.ReceiveDataServiceImpl">
              <!--在此能夠配置spring的依賴-->
       </bean>


       <!-- 消息監聽實現方法 一 -->
       <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
              <property name="connectionFactory" ref="connectionFactory"/>
              <property name="destination" ref="topicDestination" />
              <property name="messageListener" ref="pureMDPAdapter"/>
             <!-- <property name="messageListener" ref="consumerListener"/>-->
       </bean>

       <!--實現MessageListener 方法的消息監聽器 -->
       <!--<bean id="consumerListener" class="com.jms.spring_jms.ConsumerListeer"/>-->

       <!-- 消息監聽器 MDB(message driver bean 消息驅動bean)-->
       <bean id="pureMDPAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
              <property name="delegate" ref="receiveDataService"/>
              <property name="defaultListenerMethod"  value="receiveData"/>
       </bean>




</beans>

在配置消息監聽的時候.有兩種方法,一種是實現MessageListener,而另外一種則是配置基於消息驅動的監聽器(MDB),

針對兩種不一樣實現方法

package com.jms.spring_jms;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;


public class ConsumerListeer implements MessageListener {//實現JMS規範定義的接口
    @Override
    public void onMessage(Message message) {
        TextMessage textMessage=(TextMessage)message;
        try {
            System.out.println("接收消息到的消息爲:"+textMessage.getText());
        }catch ( JMSException e){
            e.printStackTrace();
        }
    }
}
package com.jms.services.servicesImpl;

import com.jms.services.ReciveDataServices;

public class ReceiveDataServiceImpl implements ReciveDataServices{
    @Override
    public void receiveData(String message) {//這裏定義的接收對象類型需與發送的對象類型相匹配
             System.out.println("接收:"+message);

    }

}

生產者,和消費者以及監聽已經配置完畢,

編寫測試類

package com.jms.spring_jms.producer;

import com.jms.services.SendDataServices;
import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;

public class QueueProducer {

    private  static SendDataServices sendMessageImpl;

    public static void main(String[] args) {

        ClassPathXmlApplicationContext applicationContext=new ClassPathXmlApplicationContext("producer.xml");
        sendMessageImpl=(SendDataServices)  applicationContext.getBean("sendMessageImpl");
        for (int i = 0; i <10 ; i++) {
            sendMessageImpl.sendMessge("發送消息:" + i);
            System.out.println("發送消息:"+i);
        }
//        applicationContext.close();
    }
}
package com.jms.spring_jms.consumer;

import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import org.springframework.context.support.AbstractApplicationContext;

public class QueueReceive {


    public static void main(String[] args) {

        AbstractApplicationContext applicationContext=new ClassPathXmlApplicationContext("consumer.xml");
    }
}

對應隊列模式的啓動順序沒有要求,主題模式須要先啓動消費者,即須要先訂閱消息才能接收到消息.能夠啓動多個消費者查看消息的消費狀況,也能夠訪問瀏覽器查看生產者,消費者,以及鏈接的狀況.

對於Sping整合JMS的簡單應用到此結束.因爲篇幅及時間有限,對於配置中的MessageListener接口的實現類沒有作解釋,能夠參考http://elim.iteye.com/blog/1893676這篇文章作進一步瞭解.

後續文章,將瞭解Spring整合Jms中,對於事務的管理,以及activeMQ的集羣與分佈式部署狀況下的分佈式事務解決方案.

---------------over.

相關文章
相關標籤/搜索