ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現html
1. 多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQPjava
2. 徹底支持JMS1.1和J2EE 1.4規範 (持久化,XA消息,事務)linux
3. 對Spring的支持,ActiveMQ能夠很容易內嵌到使用Spring的系統裏面去,並且也支持Spring2.0的特性web
4. 經過了常見J2EE服務器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中經過JCA 1.5 resourceadaptors的配置,可讓ActiveMQ能夠自動的部署到任何兼容J2EE1.4商業服務器上spring
5. 支持多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTAapache
6. 支持經過JDBC和journal提供高速的消息持久化windows
7. 從設計上保證了高性能的集羣,客戶端-服務器,點對點服務器
8. 支持Ajaxsession
9. 支持與Axis的整合架構
10. 能夠很容易得調用內嵌JMS provider,進行測試
一、下載
ActiveMQ的最新版本是5.10.0,但因爲咱們內網下載存在問題,因此目前經過內網只能下載到5.9.0,下載地址: http://activemq.apache.org/activemq-590-release.html。
二、安裝
若是是在windows系統中運行,能夠直接解壓apache-activemq-5.9.0-bin.zip,並運行bin目錄下的activemq.bat文件,此時使用的是默認的服務端口:61616和默認的console端口:8161。
若是是在linux或unix下運行,在bin目錄下執行命令:./activemq setup
三、修改ActiveMQ的服務端口和console端口
A、修改服務端口:打開conf/activemq.xml文件,修改如下紅色字體部分
<transportConnectors>
<transportConnector name="openwire" uri="tcp://10.42.220.72:61618"discoveryUri="multicast://default"/>
</transportConnectors>
B、修改console的地址和端口:打開conf/jetty.xml文件,修改如下紅色字體部分
<bean id="jettyPort"class="org.apache.activemq.web.WebConsolePort"init-method="start">
<property name="port" value="8162"/>
</bean>
配置maven文件,導入依賴的jar包
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.jd.activemq</groupId> <artifactId>ActiveMq</artifactId> <version>0.0.1-SNAPSHOT</version> <name>ActiveMq</name> <properties> <spring-framework.version>3.2.3.RELEASE</spring-framework.version> </properties> <dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.8.0</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.8.0</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring-framework.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${spring-framework.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aspects</artifactId> <version>${spring-framework.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jdbc</artifactId> <version>${spring-framework.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-tx</artifactId> <version>${spring-framework.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.6.1</version> <optional>true</optional> </dependency> </dependencies> </project>
發送端代碼:
package com.jd.mq.queue; import javax.jms.DeliveryMode; import javax.jms.MapMessage; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueSession; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class QueueSender { // 發送次數 public static final int SEND_NUM = 5; // tcp 地址 public static final String BROKER_URL = "tcp://localhost:61616"; // 目標,在ActiveMQ管理員控制檯建立 http://localhost:8161/admin/queues.jsp public static final String DESTINATION = "jd.mq.queue"; /** * <b>function:</b> 發送消息 * @author yuhailang * @param session * @param sender * @throws Exception */ public static void sendMessage(QueueSession session, javax.jms.QueueSender sender) throws Exception { for (int i = 0; i < SEND_NUM; i++) { String message = "queue發送消息第" + (i + 1) + "條"; MapMessage map = session.createMapMessage(); map.setString("text", message); map.setLong("time", System.currentTimeMillis()); System.out.println(map); sender.send(map); } } public static void run() throws Exception { QueueConnection connection = null; QueueSession session = null; try { // 建立連接工廠 QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL); // 經過工廠建立一個鏈接 connection = factory.createQueueConnection(); // 啓動鏈接 connection.start(); // 建立一個session會話 session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 建立一個消息隊列 Queue queue = session.createQueue(DESTINATION); // 建立消息發送者 javax.jms.QueueSender sender = session.createSender(queue); // 設置持久化模式 sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT); sendMessage(session, sender); // 提交會話 session.commit(); } catch (Exception e) { throw e; } finally { // 關閉釋放資源 if (session != null) { session.close(); } if (connection != null) { connection.close(); } } } public static void main(String[] args) throws Exception { QueueSender.run(); } }
接收端代碼:
package com.jd.mq.queue; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueSession; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class QueueReceiver { // tcp 地址 public static final String BROKER_URL = "tcp://localhost:61616"; // 目標,在ActiveMQ管理員控制檯建立 http://localhost:8161/admin/queues.jsp public static final String TARGET = "jd.mq.queue"; public static void run() throws Exception { QueueConnection connection = null; QueueSession session = null; try { // 建立連接工廠 QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL); // 經過工廠建立一個鏈接 connection = factory.createQueueConnection(); // 啓動鏈接 connection.start(); // 建立一個session會話 session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 建立一個消息隊列 Queue queue = session.createQueue(TARGET); // 建立消息製做者 javax.jms.QueueReceiver receiver = session.createReceiver(queue); receiver.setMessageListener(new MessageListener() { public void onMessage(Message msg) { if (msg != null) { MapMessage map = (MapMessage) msg; try { System.out.println(map.getLong("time") + "接收#" + map.getString("text")); } catch (JMSException e) { e.printStackTrace(); } } } }); // 休眠100ms再關閉 Thread.sleep(1000 * 100); // 提交會話 session.commit(); } catch (Exception e) { throw e; } finally { // 關閉釋放資源 if (session != null) { session.close(); } if (connection != null) { connection.close(); } } } public static void main(String[] args) throws Exception { QueueReceiver.run(); } }
經過監控查看消息堆棧的記錄:http://localhost:8161/admin/queues.jsp