Apache ActiveMQ教程

1、特性及優點html

一、實現JMS1.1規範,支持J2EE1.4以上
二、可運行於任何jvm和大部分web容器(ActiveMQ works great in any JVM)
三、支持多種語言客戶端(java, C, C++, AJAX, ACTIONSCRIPT等等)
四、支持多種協議(stomp,openwire,REST)
五、良好的spring支持(ActiveMQ has great Spring Support)
六、速度很快,JBossMQ的十倍(ActiveMQ is very fast; often 10x faster than JBossMQ.)
七、與OpenJMS、JbossMQ等開源jms provider相比,ActiveMQ有Apache的支持,持續發展的優點明顯。java

2、下載部署web

一、下載,spring

http://activemq.apache.org/activemq-510-release.html,下載5.1.0 Windows Distribution版本

二、安裝, 直接解壓至任意目錄(如:d:\ apache-activemq-5.1.0)
三、啓動ActiveMQ服務器
方法1:
直接運行bin\activemq.bat
方法2(在JVM中嵌套啓動):數據庫

1
2
cd example
        ant embedBroker

四、ActiveMQ消息管理後臺系統:http://localhost:8161/admin

3、運行附帶的示例程序apache

一、Queue消息示例:
* 啓動Queue消息消費者api

1
2
cd example
ant consumer

* 啓動Queue消息生產者tomcat

1
2
cd example
ant producer

簡要說明:生產者(producer)發消息,消費者(consumer)接消息,發送/接收2000個消息後自動關閉服務器

二、Topic消息示例:
* 啓動Topic消息消費者session

1
2
cd example
ant topic-listener

* 啓動Topic消息生產者

1
2
cd example
ant topic-publisher

簡要說明:重複10輪,publisher每輪發送2000個消息,並等待獲取listener的處理結果報告,而後進入下一輪發送,最後統計全局發送時間。

4、Queue與Topic的比較

一、JMS Queue執行load balancer語義:
一條消息僅能被一個consumer收到。若是在message發送的時候沒有可用的consumer,那麼它將被保存一直到能處理該message的consumer可用。若是一個consumer收到一條message後卻不響應它,那麼這條消息將被轉到另外一個consumer那兒。一個Queue能夠有不少consumer,而且在多個可用的consumer中負載均衡。

二、Topic實現publish和subscribe語義:
一條消息被publish時,它將發到全部感興趣的訂閱者,因此零到多個subscriber將接收到消息的一個拷貝。可是在消息代理接收到消息時,只有激活訂閱的subscriber可以得到消息的一個拷貝。

三、分別對應兩種消息模式:
Point-to-Point (點對點),Publisher/Subscriber Model (發佈/訂閱者)
其中在Publicher/Subscriber 模式下又有Nondurable subscription(非持久訂閱)和durable subscription (持久化訂閱)2種消息處理方式。

5、Point-to-Point (點對點)消息模式開發流程

一、生產者(producer)開發流程(ProducerTool.java):
1.1 建立Connection:根據url,user和password建立一個jms Connection。
1.2 建立Session:在connection的基礎上建立一個session,同時設置是否支持事務和ACKNOWLEDGE標識。
1.3 建立Destination對象:需指定其對應的主題(subject)名稱,producer和consumer將根據subject來發送/接收對應的消息。
1.4 建立MessageProducer:根據Destination建立MessageProducer對象,同時設置其持久模式。
1.5 發送消息到隊列(Queue):封裝TextMessage消息,使用MessageProducer的send方法將消息發送出去。

二、消費者(consumer)開發流程(ConsumerTool.java):
2.1 實現MessageListener接口:消費者類必須實現MessageListener接口,而後在onMessage()方法中監聽消息的到達並處理。
2.2 建立Connection:根據url,user和password建立一個jms Connection,若是是durable模式,還須要給connection設置一個clientId。
2.3 建立Session和Destination:與ProducerTool.java中的流程相似,再也不贅述。
2.4建立replyProducer【可選】:能夠用來將消息處理結果發送給producer。
2.5 建立MessageConsumer:根據Destination建立MessageConsumer對象。
2.6 消費message:在onMessage()方法中接收producer發送過來的消息進行處理,並能夠經過replyProducer反饋信息給producer

1
2
3
4
if (message.getJMSReplyTo() != null) {
    replyProducer.send(message.getJMSReplyTo(),
    session.createTextMessage("Reply: " + message.getJMSMessageID()));
}

6、Publisher/Subscriber(發佈/訂閱者)消息模式開發流程

一、訂閱者(Subscriber)開發流程(TopicListener.java):
1.1 實現MessageListener接口:在onMessage()方法中監聽發佈者發出的消息隊列,並作相應處理。
1.2 建立Connection:根據url,user和password建立一個jms Connection。
1.3 建立Session:在connection的基礎上建立一個session,同時設置是否支持事務和ACKNOWLEDGE標識。
1.4 建立Topic:建立2個Topic, topictest.messages用於接收發布者發出的消息,topictest.control用於向發佈者發送消息,實現雙方的交互。
1.5 建立consumer和producer對象:根據topictest.messages建立consumer,根據topictest.control建立producer。
1.6 接收處理消息:在onMessage()方法中,對收到的消息進行處理,可直接簡單在本地顯示消息,或者根據消息內容不一樣處理對應的業務邏輯(好比:數據庫更新、文件操做等等),而且可使用producer對象將處理結果返回給發佈者。

二、發佈者(Publisher)開發流程(TopicPublisher.java):
2.1 實現MessageListener接口:在onMessage()方法中接收訂閱者的反饋消息。
2.2 建立Connection:根據url,user和password建立一個jms Connection。
2.3 建立Session:在connection的基礎上建立一個session,同時設置是否支持事務和ACKNOWLEDGE標識。
2.4 建立Topic:建立2個Topic,topictest.messages用於向訂閱者發佈消息,topictest.control用於接收訂閱者反饋的消息。這2個topic與訂閱者開發流程中的topic是一一對應的。
2.5 建立consumer和producer對象:根據topictest.messages建立publisher;根據topictest.control建立consumer,同時監聽訂閱者反饋的消息。
2.6 給全部訂閱者發送消息,並接收反饋消息:示例代碼中,一共重複10輪操做。每輪先向全部訂閱者發送2000個消息;
而後堵塞線程,開始等待;最後經過onMessage()方法,接收到訂閱者反饋的「REPORT」類信息後,才print反饋信息並解除線程堵塞,進入下一輪。
注:可同時運行多個訂閱者測試查看此模式效果

7、ActiveMQ與Tomcat整合

說明:Tomcat示例版本6.0.14,其它版本在配置上可能有一些差別

一、準備jar包:將ActiveMQ lib目錄下的5個jar包複製到Tomcat lib目錄下:

activemq-core-5.1.0.jar
activemq-web-5.1.0.jar
geronimo-j2ee-management_1.0_spec-1.0.jar
geronimo-jms_1.1_spec-1.1.1.jar
geronimo-jta_1.0.1B_spec-1.0.1.jar

二、修改配置文件:
2.1 修改Tomcat的conf/context.xml文件:
在節點中添加如下內容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<Resource
    name="jms/FailoverConnectionFactory"
    auth="Container"
    type="org.apache.activemq.ActiveMQConnectionFactory"
    description="JMS Connection Factory"
    factory="org.apache.activemq.jndi.JNDIReferenceFactory"
    brokerURL="failover:(tcp://localhost:61616)?initialReconnectDelay=100&maxReconnectAttempts=5"
    brokerName="localhost"
    useEmbeddedBroker="false"/>
 
<Resource
    name="jms/NormalConnectionFactory"
    auth="Container"
    type="org.apache.activemq.ActiveMQConnectionFactory"
    description="JMS Connection Factory"
    factory="org.apache.activemq.jndi.JNDIReferenceFactory"
    brokerURL="tcp://localhost:61616"
    brokerName="localhost"
    useEmbeddedBroker="false"/>
 
<Resource name="jms/topic/MyTopic"
    auth="Container"
    type="org.apache.activemq.command.ActiveMQTopic"
    factory="org.apache.activemq.jndi.JNDIReferenceFactory"
    physicalName="MY.TEST.FOO"/>
 
<Resource name="jms/queue/MyQueue"
    auth="Container"
    type="org.apache.activemq.command.ActiveMQQueue"
    factory="org.apache.activemq.jndi.JNDIReferenceFactory"
    physicalName="MY.TEST.FOO.QUEUE"/> 
配置說明:以JNDI的方式定義了ActiveMQ的broker鏈接url、Topic和Queue。

此處需加以注意的是Listener端的borkerURL使用了failover傳輸方式:

1
failover:(tcp://localhost:61616)?initialReconnectDelay=100&maxReconnectAttempts=5

客戶端使用普通傳輸方式:tcp://localhost:61616

failover transport是一種從新鏈接機制,用於創建可靠的傳輸。此處配置的是一旦ActiveMQ broker中斷,Listener端將每隔100ms自動嘗試鏈接,直至成功鏈接或重試5次鏈接失敗爲止。

failover還支持多個borker同時提供服務,實現負載均衡的同時可增長系統容錯性,格式:
failover:(uri1,…,uriN)?transportOptions

2.2 新建web應用(webapps/jms-test),修改WEB-INF/web.xml文件:
增長一個自啓動Servlet,該Servlet實現了MessageListener接口,做爲Topic消息的Listener端。

1
2
3
4
5
6
7
<servlet>
    <servlet-name>jms-listener</servlet-name>
    <servlet-class>
       com.flvcd.servlet.JMSListener
    </servlet-class>
    <load-on-startup>1</load-on-startup>
</servlet>

2.3 修改activemq.xml文件:
爲了支持持久化消息,需修改ActiveMQ的配置文件以下,使用默認的AMQ Message Store方式(索引文件方式)存儲消息,據官網介紹是快速、穩定的。數據庫存儲方式可參照官網相關文檔。

1
2
3
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" persistent="true" useShutdownHook="false">
<persistenceAdapter>
<amqPersistenceAdapter class="MsoNormal">
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
connection.setClientID("MyClient");
            Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
            //普通消息訂閱者,沒法接收持久消息
            //MessageConsumer consumer = jmsSession.createConsumer((Destination) envContext.lookup("jms/topic/MyTopic"));
            //基於Topic建立持久的消息訂閱者,前提:Connection必須指定一個惟一的clientId,當前爲MyClient
            TopicSubscriber consumer = jmsSession.createDurableSubscriber((Topic) envContext.lookup("jms/topic/MyTopic"), "MySub");
            consumer.setMessageListener(this);
            connection.start();
        } catch (NamingException e) {
            e.printStackTrace();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
 
    /** 接收消息,作對應處理 */
    public void onMessage(Message message) {
        if (checkText(message, "RefreshArticleId") != null) {
            String articleId = checkText(message, "RefreshArticleId");
            System.out.println("接收刷新文章消息,開始刷新文章ID=" + articleId);
        }
        else if (checkText(message, "RefreshThreadId") != null) {
            String threadId = checkText(message, "RefreshThreadId");
            System.out.println("接收刷新論壇帖子消息,開始刷新帖子ID=" + threadId);
        } else {
            System.out.println("接收普通消息,不作任何處理!");
        }
    }
 
    private static String checkText(Message m, String s) {
        try {
            return m.getStringProperty(s);
        } catch (JMSException e) {
            e.printStackTrace(System.out);
            return null;
        }
    }
}

編譯JMSListener.java至classes目錄:

1
javac -cp .;D:\apache-tomcat-6.0.14\lib\servlet-api.jar;D:\apache-tomcat-6.0.14\lib\geronimo-jms_1.1_spec-1.1.1.jar;D:\apache-tomcat-6.0.14\lib\activemq-core-5.1.0.jar -d . JMSListener.java

注:D:\apache-tomcat-6.0.14請替換成本地對應目錄。

四、Publisher端(publish.jsp)實現:
在jms-test目錄下新建publish.jsp文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<%@ page language="java" import="javax.jms.*" pageEncoding="GBK"%>
<%@ page language="java" import="javax.naming.*"%>
<%@ page language="java" import="org.apache.activemq.ActiveMQConnectionFactory"%>
 <%
    try {
        InitialContext initCtx = new InitialContext();
        Context envContext = (Context) initCtx.lookup("java:comp/env");
        ConnectionFactory connectionFactory = (ConnectionFactory) envContext.lookup("jms/NormalConnectionFactory");
        Connection connection = connectionFactory.createConnection();
        Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageProducer producer = jmsSession.createProducer((Destination) envContext.lookup("jms/topic/MyTopic"));
        //設置持久方式
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        Message testMessage = jmsSession.createMessage();
        //發佈刷新文章消息
        testMessage.setStringProperty("RefreshArticleId", "2046");   
        producer.send(testMessage);
        //發佈刷新帖子消息
        testMessage.clearProperties();
        testMessage.setStringProperty("RefreshThreadId", "331");
        producer.send(testMessage);
    } catch (NamingException e) {
        e.printStackTrace();
    } catch (JMSException e) {
        e.printStackTrace();
    }
%>

Publisher和Listner之間經過Message的setStringProperty和getStringProperty方法,實現對應的業務邏輯。
上述示例代碼中,RefreshArticleId表明刷新某篇文章,RefreshThreadId表明刷新某個帖子,property值保持對應的ID。固然用戶可根據實際需求靈活地使用。

五、運行Demo:
5.1 啓動ActiveMQ服務器
5.2 啓動Tomcat服務器:JMSListener將自動鏈接ActiveMQ broker,日誌信息:
Successfully connected to tcp://localhost:61616
5.3 訪問http://localhost:8080/jms-test/publish.jsp
Tomcat服務器日誌將提示:
接收刷新文章消息,開始刷新文章ID=2046
接收刷新論壇帖子消息,開始刷新帖子ID=331
5.4 訪問http://localhost:8161/admin/topics.jsp查看MY.TEST.FOO的消息日誌,分別發送和接收2條。
至此,已成功完成ActiveMQ與Tomcat的基本整合!
Publisher和Listener徹底能夠獨立部署到不一樣的Web服務器上,並經過ActiveMQ來進行消息傳遞,實現用戶所需的業務邏輯。
測試持久消息的具體步驟:
l 啓動Publisher所在Web服務器
l 啓動ActiveMQ
l 訪問publish.jsp發送消息,此時Listener還未啓動,消息將保存在ActiveMQ的bin\activemq-data目錄下,查看日誌能夠看到發送2條,接收0條
l 啓動Listener所在Web服務器,將自動接收到ActiveMQ的持久消息並處理,查看日誌:發送2條,接收2條,代表持久消息應用成功

http://www.cnblogs.com/shuyangdehou/archive/2010/10/07/1845239.html

activeMQ相關:

http://baike.baidu.com/view/157103.htm jms百度百科
http://www.189works.com/article-57444-1.html ActiveMQ消息收發簡單例子
http://liyebing.iteye.com/blog/1044825 jms實現簡單的聊天
http://jinguo.iteye.com/blog/234311 一個生產者消費者實例
http://blog.csdn.net/linkyou/article/details/4101152 activema文檔
http://i-coding.iteye.com/blog/1018920 activemq實現QQ聊天
http://www.cnblogs.com/shuyangdehou/archive/2010/10/07/1845239.html activeMq教程
http://www.cnblogs.com/opaljc/archive/2012/03/25/2416545.html activeMQ使用
http://www.cnblogs.com/sjzzqy/archive/2012/05/29/activemq.html 例子

通信

http://download.csdn.net/download/gaojie8273150/2290220
http://download.csdn.net/detail/gaoweipeng/1635506 
http://download.csdn.net/download/qqq626/3081379

相關文章
相關標籤/搜索