根據地址http://activemq.apache.org/download.html 找到activeMQ的下載地址,下載相應的版本html
1.win安裝activeMQjava
打開下載文件下的bin文件找到對應電腦版本(32位或是64位)下的的啓動程序web
其中有兩個啓動程序spring
第一個是以DOS命令行的形式雙擊啓動apache
當看到瀏覽器訪問地址時即爲啓動成功,須要注意的是在整個MQ的運行中,該命令窗口是不能被關閉的.瀏覽器
若是以這種形式的啓動方式不方便的話可使用第二個啓動程序,以服務的形式啓動activeMQ,session
雙擊InstallService.bat文件,而後在服務中啓動便可app
無論以哪一種方式啓動,成功後都應該在瀏覽器中訪問127.0.0.1:8161成功訪問,其中ip位實際機器IP,8161爲activeMQ web管理端口.,在後續的使用中將會用到activeMQ的默認端口61616,這些在conf配置中均可修改maven
值得注意的是 ,若是在啓動過程當中在端口沒有沒佔用狀況下任啓動失敗 ,則啓動bat文件時右鍵以管理員身份運行.tcp
2.Liunx 安裝activeMQ
在官網上 找到Liunx的安裝版本 ,也能夠複製地址,在Liunx使用wget 在線下載,固然這是在Liunx能鏈接外網的狀況下.在這裏我使用官網提供的地址http://www.apache.org/dyn/closer.cgi?filename=/activemq/5.15.0/apache-activemq-5.15.0-bin.tar.gz&action=download
1.cd /opt 將tar.gz文件上傳到這裏
2.解壓縮 tar -zxvf apache-activemq-5.15.0-bin.tar.gz
3.啓動:
cd /opt/apache-activemq-5.15.0/bin
sudo ./activemq start
4.端口開放
須要開放8161(web管理頁面端口)、61616(activemq服務監控端口) 兩個端口
firewall-cmd –zone=public –add-port=8161/tcp –permanent
firewall-cmd –zone=public –add-port=61616/tcp –permanent
firewall-cmd –reload
5.打開web管理頁面
http://IP:8161/admin
一樣的以這種方式啓動,終端也是不能關閉的.
對於JMS提供的API在上一章節已經瞭解,在這裏以簡單的java project 來編寫JMS的HelloWord
Queue:
1.建立生產者:
package com.jms.produce; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class QueueProducer { private static final String url="tcp://localhost:61616";//activeMQ地址 private static final String queueName="QUEUE_TEST";//隊列名稱 private static ConnectionFactory connectionFactory;//鏈接工廠 private static Connection connection;//鏈接 private static Session session ;//鏈接會話 private static Destination destination;//目的地 private static MessageProducer producer;//消息提供者--生產者 public static void main(String[] args) throws JMSException{ try { //建立鏈接工廠 connectionFactory=new ActiveMQConnectionFactory(url); //建立鏈接 connection=connectionFactory.createConnection(); //啓動鏈接 session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立隊列(目的地) destination=session.createQueue(queueName); //建立生成者 producer=session.createProducer(destination); for (int i = 0; i < 10; i++) { TextMessage message=session.createTextMessage("test" + i); producer.send(message); System.out.println("發送消息:"+message.getText()); } }catch (JMSException e){ e.printStackTrace(); }finally {//關閉鏈接 if (connection != null) try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } }
2.建立消費者:
package com.jms.consumer; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class QueueConsumer { private static final String url="tcp://localhost:61616";//activeMQ地址 private static final String queueName="QUEUE_TEST";//隊列名稱 private static ConnectionFactory connectionFactory ;//鏈接工廠 private static Session session;//鏈接會話 private static Destination destination ;//目的地 private static MessageConsumer messageConsumer ;//消息消費者--消費者 public static void main(String[] args) { try { //建立鏈接工廠 connectionFactory = new ActiveMQConnectionFactory(url); //建立鏈接 Connection connection = connectionFactory.createConnection(); //啓動鏈接 connection.start(); //建立session會話 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立隊列(目的地) destination=session.createQueue(queueName); //建立一個消費者 messageConsumer=session.createConsumer(destination); //消息監聽器 messageConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("接收消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); }catch (JMSException e){ e.printStackTrace(); } } }
生產者和消費者queueName應該保持一致,不然介紹不到消息提供者發送的消息.應該儘可能保證先啓動Consumer.(若是先啓動Producer,則消息會被第一個啓動的Consumer所有消費掉)
能夠看到消息已經發送到了隊列"QUEUE_TEST",能夠在web中查看隊列的.當建立多個消費者監聽同一個隊列時,多個消費者平均消費隊列中的消息.
Topic:
主題模式的代碼與隊列模式大同小異,只是在建立目的地是有所不一樣,其它的工廠對象,會話等一致.在此不贅述.
//建立主題(目的地) destination=session.createTopic(topicName);
//建立隊列(目的地) destination=session.createQueue(queueName);
使用java提供的JMS規範APi做爲了解jms的基礎知識,在此基礎上使用Spring整合Jms提升編碼效率.
針對冗長和重複的JMS代碼,Spring給出的解決方案市JmsTemplate.JmsTemplate能夠建立鏈接,得到會話以及發送和接收消息.這時的咱們能夠專一與構建要發送的消息或者處理接收到的消息. 同時,也可出對拋出的異常作分析而非統一的拋出JMSException.
1.首先導入spring,activeMQ所須要的Jar包.
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.chuyu</groupId> <artifactId>jms-test</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.14.2</version> <exclusions> <exclusion> <!-- 移除-spring-context --> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> </exclusion> </exclusions> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-spring --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-spring</artifactId> <version>5.14.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework/spring-context --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.3.11.RELEASE</version> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework/spring-jms --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.3.11.RELEASE</version> </dependency> </dependencies> </project>
2.建立xml配置文件
commApp.xml(生產者和消費者共用的配置)
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <context:annotation-config/> <!-- ActiveMq 提供的鏈接工廠 默認端口爲61616 --> <bean id="targetConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>tcp://localhost:61616</value> </property> </bean> <!--spring Jms 提供的鏈接池 --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="targetConnectionFactory"/> </bean> <!-- 目的地類型及名稱--> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="QUEUE.TEST"/> </bean> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="TOPIC.TEST"/> </bean> <!-- Jms模板--> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> </bean> </beans>
3.編寫發送消息的接口服務
SendMessageImpl
package com.jms.services.servicesImpl; import com.jms.services.SendDataServices; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Component; import javax.annotation.Resource; import javax.jms.*; public class SendMessageImpl implements SendDataServices { @Autowired private JmsTemplate jmsTemplate; @Resource(name = "topicDestination") private Destination destination; //@Resource(name = "queueDestination") //private Destination destination; @Override public void sendMessge(final String message) { jmsTemplate.send(destination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { TextMessage textMessage= session.createTextMessage(message); return textMessage; } }); } }
3.配置生產者
producer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd"> <import resource="commApp.xml"/> <bean id="sendMessageImpl" class="com.jms.services.servicesImpl.SendMessageImpl"/> </beans>
至今生產者已配置完.
配置消費者
consumer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd"> <import resource="commApp.xml"/> <bean id="receiveDataService" class="com.jms.services.servicesImpl.ReceiveDataServiceImpl"> <!--在此能夠配置spring的依賴--> </bean> <!-- 消息監聽實現方法 一 --> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="topicDestination" /> <property name="messageListener" ref="pureMDPAdapter"/> <!-- <property name="messageListener" ref="consumerListener"/>--> </bean> <!--實現MessageListener 方法的消息監聽器 --> <!--<bean id="consumerListener" class="com.jms.spring_jms.ConsumerListeer"/>--> <!-- 消息監聽器 MDB(message driver bean 消息驅動bean)--> <bean id="pureMDPAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <property name="delegate" ref="receiveDataService"/> <property name="defaultListenerMethod" value="receiveData"/> </bean> </beans>
在配置消息監聽的時候.有兩種方法,一種是實現MessageListener,而另外一種則是配置基於消息驅動的監聽器(MDB),
針對兩種不一樣實現方法
package com.jms.spring_jms; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class ConsumerListeer implements MessageListener {//實現JMS規範定義的接口 @Override public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收消息到的消息爲:"+textMessage.getText()); }catch ( JMSException e){ e.printStackTrace(); } } }
package com.jms.services.servicesImpl; import com.jms.services.ReciveDataServices; public class ReceiveDataServiceImpl implements ReciveDataServices{ @Override public void receiveData(String message) {//這裏定義的接收對象類型需與發送的對象類型相匹配 System.out.println("接收:"+message); } }
生產者,和消費者以及監聽已經配置完畢,
編寫測試類
package com.jms.spring_jms.producer; import com.jms.services.SendDataServices; import org.apache.xbean.spring.context.ClassPathXmlApplicationContext; public class QueueProducer { private static SendDataServices sendMessageImpl; public static void main(String[] args) { ClassPathXmlApplicationContext applicationContext=new ClassPathXmlApplicationContext("producer.xml"); sendMessageImpl=(SendDataServices) applicationContext.getBean("sendMessageImpl"); for (int i = 0; i <10 ; i++) { sendMessageImpl.sendMessge("發送消息:" + i); System.out.println("發送消息:"+i); } // applicationContext.close(); } }
package com.jms.spring_jms.consumer; import org.apache.xbean.spring.context.ClassPathXmlApplicationContext; import org.springframework.context.support.AbstractApplicationContext; public class QueueReceive { public static void main(String[] args) { AbstractApplicationContext applicationContext=new ClassPathXmlApplicationContext("consumer.xml"); } }
對應隊列模式的啓動順序沒有要求,主題模式須要先啓動消費者,即須要先訂閱消息才能接收到消息.能夠啓動多個消費者查看消息的消費狀況,也能夠訪問瀏覽器查看生產者,消費者,以及鏈接的狀況.
對於Sping整合JMS的簡單應用到此結束.因爲篇幅及時間有限,對於配置中的MessageListener接口的實現類沒有作解釋,能夠參考http://elim.iteye.com/blog/1893676這篇文章作進一步瞭解.
後續文章,將瞭解Spring整合Jms中,對於事務的管理,以及activeMQ的集羣與分佈式部署狀況下的分佈式事務解決方案.
---------------over.