ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已是好久的事情了,可是JMS在當今的J2EE應用中間仍然扮演着特殊的地位。java
主要特色:spring
1. 多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQPapache
2. 徹底支持JMS1.1和J2EE 1.4規範 (持久化,XA消息,事務)服務器
3. 對Spring的支持,ActiveMQ能夠很容易內嵌到使用Spring的系統裏面去,並且也支持Spring2.0的特性session
4. 經過了常見J2EE服務器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中經過JCA 1.5 resource adaptors的配置,可讓ActiveMQ能夠自動的部署到任何兼容J2EE 1.4 商業服務器上app
5. 支持多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA異步
6. 支持經過JDBC和journal提供高速的消息持久化maven
7. 從設計上保證了高性能的集羣,客戶端-服務器,點對點tcp
8. 支持Ajax分佈式
9. 支持與Axis的整合
10. 能夠很容易得調用內嵌JMS provider,進行測試
JMS的全稱是Java Message Service,即Java消息服務。用於在兩個應用程序之間,或分佈式系統中發送消息,進行異步通訊。
它主要用於在生產者和消費者之間進行消息傳遞,生產者負責產生消息,而消費者負責接收消息。把它應用到實際的業務需求中的話咱們能夠在特定的時候利用生產者生成一消息,並進行發送,對應的消費者在接收到對應的消息後去完成對應的業務邏輯。
對於消息的傳遞有兩種類型:
一種是點對點的,即一個生產者和一個消費者一一對應;
另外一種是發佈/訂閱模式,即一個生產者產生消息並進行發送後,能夠由多個消費者進行接收。
JMS定義了五種不一樣的消息正文格式,以及調用的消息類型,容許你發送並接收以一些不一樣形式的數據,提供現有消息格式的一些級別的兼容性。
· StreamMessage -- Java原始值的數據流
· MapMessage--一套名稱-值對
· TextMessage--一個字符串對象
· ObjectMessage--一個序列化的 Java對象
· BytesMessage--一個字節的數據流
進入http://activemq.apache.org/下載ActiveMQ
安裝步驟:
第一步:安裝jdk,須要jdk1.7以上版本
第二步:解壓縮activeMQ的壓縮包。
第三步:進入bin目錄。
啓動:[root@localhost bin]# ./activemq start
中止:[root@localhost bin]# ./activemq stop
第四步:訪問後臺管理。
http://192.168.45.188:8161/admin
用戶名:admin
密碼:admin
在點對點或隊列模型下,一個生產者向一個特定的隊列發佈消息,一個消費者從該隊列中讀取消息。這裏,生產者知道消費者的隊列,並直接將消息發送到消費者的隊列。這種模式被歸納爲:只有一個消費者將得到消息。生產者不須要在接收者消費該消息期間處於運行狀態,接收者也一樣不須要在消息發送時處於運行狀態。每個成功處理的消息都由接收者簽收。
發佈者/訂閱者模型支持向一個特定的消息主題發佈消息。0或多個訂閱者可能對接收來自特定消息主題的消息感興趣。在這種模型下,發佈者和訂閱者彼此不知道對方。這種模式比如是匿名公告板。這種模式被歸納爲:多個消費者能夠得到消息.在發佈者和訂閱者之間存在時間依賴性。發佈者須要創建一個訂閱(subscription),以便客戶可以購訂閱。訂閱者必須保持持續的活動狀態以接收消息,除非訂閱者創建了持久的訂閱。在那種狀況下,在訂閱者未鏈接時發佈的消息將在訂閱者從新鏈接時從新發布。
ConnectionFactory 接口(鏈接工廠)
用戶用來建立到JMS提供者的鏈接的被管對象。JMS客戶經過可移植的接口訪問鏈接,這樣當下層的實現改變時,代碼不須要進行修改。 管理員在JNDI名字空間中配置鏈接工廠,這樣,JMS客戶纔可以查找到它們。根據消息類型的不一樣,用戶將使用隊列鏈接工廠,或者主題鏈接工廠。
Connection 接口(鏈接)
鏈接表明了應用程序和消息服務器之間的通訊鏈路。在得到了鏈接工廠後,就能夠建立一個與JMS提供者的鏈接。根據不一樣的鏈接類型,鏈接容許用戶建立會話,以發送和接收隊列和主題到目標。
Destination 接口(目標)
目標是一個包裝了消息目標標識符的被管對象,消息目標是指消息發佈和接收的地點,或者是隊列,或者是主題。JMS管理員建立這些對象,而後用戶經過JNDI發現它們。和鏈接工廠同樣,管理員能夠建立兩種類型的目標,點對點模型的隊列,以及發佈者/訂閱者模型的主題。
MessageConsumer 接口(消息消費者)
由會話建立的對象,用於接收發送到目標的消息。消費者能夠同步地(阻塞模式),或異步(非阻塞)接收隊列和主題類型的消息。
MessageProducer 接口(消息生產者)
由會話建立的對象,用於發送消息到目標。用戶能夠建立某個目標的發送者,也能夠建立一個通用的發送者,在發送消息時指定目標。
Message 接口(消息)
是在消費者和生產者之間傳送的對象,也就是說從一個應用程序創送到另外一個應用程序。一個消息有三個主要部分:
消息頭(必須):包含用於識別和爲消息尋找路由的操做設置。
一組消息屬性(可選):包含額外的屬性,支持其餘提供者和用戶的兼容。能夠建立定製的字段和過濾器(消息選擇器)。
一個消息體(可選):容許用戶建立五種類型的消息(文本消息,映射消息,字節消息,流消息和對象消息)。
消息接口很是靈活,並提供了許多方式來定製消息的內容。
Session 接口(會話)
表示一個單線程的上下文,用於發送和接收消息。因爲會話是單線程的,因此消息是連續的,就是說消息是按照發送的順序一個一個接收的。會話的好處是它支持事務。若是用戶選擇了事務支持,會話上下文將保存一組消息,直到事務被提交才發送這些消息。在提交事務以前,用戶可使用回滾操做取消這些消息。一個會話容許用戶建立消息生產者來發送消
把ActiveMQ依賴的jar包添加到工程中。
activemq-all-5.12.0.jar
使用maven工程,則添加jar包的依賴:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.11.2</version> </dependency> |
public class QueueSender {
public static void main(String[] args) { //建立一個鏈接工廠 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616"); try { //從工廠對象中得到鏈接 Connection connection = connectionFactory.createConnection(); //開啓鏈接 connection.start(); /* connection.createSession(paramA, paramB) A)paramA設置爲true時: paramB的值忽略, acknowledgment mode被jms服務器設置 SESSION_TRANSACTED 。 當一個事務被提交的時候,消息確認就會自動發生。 B) paramA設置爲false時: Session.AUTO_ACKNOWLEDGE爲自動確認,當客戶成功的從receive方法返回的時候,或者從 MessageListener.onMessage方法成功返回的時候,會話自動確認客戶收到的消息。 Session.CLIENT_ACKNOWLEDGE 爲客戶端確認。客戶端接收到消息後,必須調用javax.jms.Message的 acknowledge方法。jms服務器纔會刪除消息。(默認是批量確認) */ //開啓一個回話,第一個參數指定不使用事務,第二個參數指定客戶端接收消息的確認方式 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立一目的地Queue或者是Topic Queue queue = session.createQueue("mytestqueue"); //建立一個生產者 MessageProducer producer = session.createProducer(queue); //建立message TextMessage message = new ActiveMQTextMessage(); message.setText("hello"); //發送消息 producer.send(message); //關閉 producer.close(); session.close(); connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); }
} } |
消費者有兩種消費方法::
一、同步消費。經過調用消費者的receive方法從目的地中顯式提取消息。receive方法能夠一直阻塞到消息到達。
二、異步消費。客戶能夠爲消費者註冊一個消息監聽器,以定義在消息到達時所採起的動做。
實現MessageListener接口,在MessageListener()方法中實現消息的處理邏輯。
public class QueueConsumer {
public static void main(String[] args) { //建立一鏈接工廠 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616"); try { //建立一個鏈接 Connection connection = connectionFactory.createConnection(); //打開鏈接 connection.start(); //建立一個回話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立一個目的地Destination Queue queue = session.createQueue("mytestqueue"); //建立一個消費者 MessageConsumer consumer = session.createConsumer(queue); while(true) { //設置接收者接收消息的時間,爲了便於測試,這裏定爲100s Message message = consumer.receive(100000); if (message != null) { System.out.println(message); } else { //超時結束 break; }
} consumer.close(); session.close(); connection.close(); } catch (Exception e) { e.printStackTrace(); }
} } |
public class QueueConsumer {
public static void main(String[] args) { //建立一鏈接工廠 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616"); try { //建立一個鏈接 Connection connection = connectionFactory.createConnection(); //打開鏈接 connection.start(); //建立一個回話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立一個目的地Destination Queue queue = session.createQueue("mytestqueue"); //建立一個消費者 MessageConsumer consumer = session.createConsumer(queue); consumer.setMessageListener(new MessageListener() {
@Override public void onMessage(Message message) { if (message instanceof TextMessage) { String text = ""; try { text = ((TextMessage)message).getText(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(text); }
} }); System.in.read(); //關閉 consumer.close(); session.close(); connection.close(); } catch (Exception e) { e.printStackTrace(); }
} } |
public class TopicProducer {
public static void main(String[] args) { //建立鏈接工廠 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616"); try { //建立鏈接 Connection connection = connectionFactory.createConnection(); //開啓鏈接 connection.start(); //建立一個回話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立一個Destination,queue或者Topic Topic topic = session.createTopic("mytopic"); //建立一個生成者 MessageProducer producer = session.createProducer(topic); //建立一個消息 TextMessage textMessage = new ActiveMQTextMessage(); textMessage.setText("hello my topic"); //發送消息 producer.send(textMessage); //關閉 producer.close(); session.close(); connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } |
public class TopicConsumer {
public static void main(String[] args) { //建立鏈接工廠 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616"); try { //建立鏈接 Connection connection = connectionFactory.createConnection(); connection.start(); //建立一個會話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立一個目標 Destination destination = session.createTopic("mytopic"); //建立一個消費者 MessageConsumer consumer = session.createConsumer(destination); //接收消息 consumer.setMessageListener(new MessageListener() {
@Override public void onMessage(Message message) {
System.out.println(message);
} }); //暫停 System.in.read(); //關閉 consumer.close(); session.close(); connection.close();
} catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } } |
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.0.xsd">
<!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.25.168:61616" /> </bean> <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目標ConnectionFactory對應真實的能夠產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> </beans> |
<!-- 配置生產者 --> <!-- Spring提供的JMS工具類,它能夠進行消息發送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 這個connectionFactory對應的是咱們定義的Spring提供的那個ConnectionFactory對象 --> <property name="connectionFactory" ref="connectionFactory" /> </bean> <!--這個是隊列目的地,點對點的 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>queue</value> </constructor-arg> </bean> <!--這個是主題目的地,一對多的 --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic" /> </bean> |
public class Producer {
public static void main(String[] args) { //建立spring容器 ApplicationContext applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml"); //得到JmsTemplate對象 JmsTemplate template = (JmsTemplate) applicationContext.getBean("jmsTemplate"); //得到Destination ActiveMQQueue queue = (ActiveMQQueue) applicationContext.getBean("queueDestination"); //發送消息 template.send(queue, new MessageCreator() {
@Override public Message createMessage(Session session) throws JMSException { return session.createTextMessage("hello"); } });
} } |
那麼消費者是經過Spring爲咱們封裝的消息監聽容器MessageListenerContainer實現的,它負責接收信息,並把接收到的信息分發給真正的MessageListener進行處理。每一個消費者對應每一個目的地都須要有對應的MessageListenerContainer。
對於消息監聽容器而言,除了要知道監聽哪一個目的地以外,還須要知道到哪裏去監聽,也就是說它還須要知道去監聽哪一個JMS服務器,這是經過在配置MessageConnectionFactory的時候往裏面注入一個ConnectionFactory來實現的。
因此在配置一個MessageListenerContainer的時候有三個屬性必須指定:
一、一個是表示從哪裏監聽的ConnectionFactory
二、一個是表示監聽什麼的Destination;
三、一個是接收到消息之後進行消息處理的MessageListener。
經常使用的MessageListenerContainer實現類是DefaultMessageListenerContainer。
public class MyMessageListener implements MessageListener { @Override public void onMessage(Message message) { System.out.println(message); } }
|
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.0.xsd">
<!-- 真正能夠產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.25.168:61616" /> </bean> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> <!--這個是隊列目的地,點對點的 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>queue</value> </constructor-arg> </bean> <!--這個是主題目的地,一對多的 --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic" /> </bean> <!-- 配置監聽器 --> <bean id="myMessageListener" class="cn.itcast.mq.spring.MyMessageListener" /> <!-- 消息監聽容器 --> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="myMessageListener" /> </bean> </beans> |
public class Consumer {
public static void main(String[] args) { ApplicationContext applicationContext = new ClassPathXmlApplicationContext("applicationContext-consumer.xml"); try { System.in.read(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
} |