1、特性及優點html
一、實現JMS1.1規範,支持J2EE1.4以上
二、可運行於任何jvm和大部分web容器(ActiveMQ works great in any JVM)
三、支持多種語言客戶端(java, C, C++, AJAX, ACTIONSCRIPT等等)
四、支持多種協議(stomp,openwire,REST)
五、良好的spring支持(ActiveMQ has great Spring Support)
六、速度很快,JBossMQ的十倍(ActiveMQ is very fast; often 10x faster than JBossMQ.)
七、與OpenJMS、JbossMQ等開源jms provider相比,ActiveMQ有Apache的支持,持續發展的優點明顯。java
2、下載部署web
一、下載,spring
http://activemq.apache.org/activemq-510-release.html,下載5.1.0 Windows Distribution版本
二、安裝, 直接解壓至任意目錄(如:d:\ apache-activemq-5.1.0)
三、啓動ActiveMQ服務器
方法1:
直接運行bin\activemq.bat
方法2(在JVM中嵌套啓動):數據庫
1 2 |
cd example ant embedBroker |
四、ActiveMQ消息管理後臺系統:http://localhost:8161/admin
3、運行附帶的示例程序apache
一、Queue消息示例:
* 啓動Queue消息消費者api
1 2 |
cd example ant consumer |
* 啓動Queue消息生產者tomcat
1 2 |
cd example ant producer |
簡要說明:生產者(producer)發消息,消費者(consumer)接消息,發送/接收2000個消息後自動關閉服務器
二、Topic消息示例:
* 啓動Topic消息消費者session
1 2 |
cd example ant topic-listener |
* 啓動Topic消息生產者
1 2 |
cd example ant topic-publisher |
簡要說明:重複10輪,publisher每輪發送2000個消息,並等待獲取listener的處理結果報告,而後進入下一輪發送,最後統計全局發送時間。
4、Queue與Topic的比較
一、JMS Queue執行load balancer語義:
一條消息僅能被一個consumer收到。若是在message發送的時候沒有可用的consumer,那麼它將被保存一直到能處理該message的consumer可用。若是一個consumer收到一條message後卻不響應它,那麼這條消息將被轉到另外一個consumer那兒。一個Queue能夠有不少consumer,而且在多個可用的consumer中負載均衡。
二、Topic實現publish和subscribe語義:
一條消息被publish時,它將發到全部感興趣的訂閱者,因此零到多個subscriber將接收到消息的一個拷貝。可是在消息代理接收到消息時,只有激活訂閱的subscriber可以得到消息的一個拷貝。
三、分別對應兩種消息模式:
Point-to-Point (點對點),Publisher/Subscriber Model (發佈/訂閱者)
其中在Publicher/Subscriber 模式下又有Nondurable subscription(非持久訂閱)和durable subscription (持久化訂閱)2種消息處理方式。
5、Point-to-Point (點對點)消息模式開發流程
一、生產者(producer)開發流程(ProducerTool.java):
1.1 建立Connection:根據url,user和password建立一個jms Connection。
1.2 建立Session:在connection的基礎上建立一個session,同時設置是否支持事務和ACKNOWLEDGE標識。
1.3 建立Destination對象:需指定其對應的主題(subject)名稱,producer和consumer將根據subject來發送/接收對應的消息。
1.4 建立MessageProducer:根據Destination建立MessageProducer對象,同時設置其持久模式。
1.5 發送消息到隊列(Queue):封裝TextMessage消息,使用MessageProducer的send方法將消息發送出去。
二、消費者(consumer)開發流程(ConsumerTool.java):
2.1 實現MessageListener接口:消費者類必須實現MessageListener接口,而後在onMessage()方法中監聽消息的到達並處理。
2.2 建立Connection:根據url,user和password建立一個jms Connection,若是是durable模式,還須要給connection設置一個clientId。
2.3 建立Session和Destination:與ProducerTool.java中的流程相似,再也不贅述。
2.4建立replyProducer【可選】:能夠用來將消息處理結果發送給producer。
2.5 建立MessageConsumer:根據Destination建立MessageConsumer對象。
2.6 消費message:在onMessage()方法中接收producer發送過來的消息進行處理,並能夠經過replyProducer反饋信息給producer
1 2 3 4 |
if (message.getJMSReplyTo() != null) { replyProducer.send(message.getJMSReplyTo(), session.createTextMessage("Reply: " + message.getJMSMessageID())); } |
6、Publisher/Subscriber(發佈/訂閱者)消息模式開發流程
一、訂閱者(Subscriber)開發流程(TopicListener.java):
1.1 實現MessageListener接口:在onMessage()方法中監聽發佈者發出的消息隊列,並作相應處理。
1.2 建立Connection:根據url,user和password建立一個jms Connection。
1.3 建立Session:在connection的基礎上建立一個session,同時設置是否支持事務和ACKNOWLEDGE標識。
1.4 建立Topic:建立2個Topic, topictest.messages用於接收發布者發出的消息,topictest.control用於向發佈者發送消息,實現雙方的交互。
1.5 建立consumer和producer對象:根據topictest.messages建立consumer,根據topictest.control建立producer。
1.6 接收處理消息:在onMessage()方法中,對收到的消息進行處理,可直接簡單在本地顯示消息,或者根據消息內容不一樣處理對應的業務邏輯(好比:數據庫更新、文件操做等等),而且可使用producer對象將處理結果返回給發佈者。
二、發佈者(Publisher)開發流程(TopicPublisher.java):
2.1 實現MessageListener接口:在onMessage()方法中接收訂閱者的反饋消息。
2.2 建立Connection:根據url,user和password建立一個jms Connection。
2.3 建立Session:在connection的基礎上建立一個session,同時設置是否支持事務和ACKNOWLEDGE標識。
2.4 建立Topic:建立2個Topic,topictest.messages用於向訂閱者發佈消息,topictest.control用於接收訂閱者反饋的消息。這2個topic與訂閱者開發流程中的topic是一一對應的。
2.5 建立consumer和producer對象:根據topictest.messages建立publisher;根據topictest.control建立consumer,同時監聽訂閱者反饋的消息。
2.6 給全部訂閱者發送消息,並接收反饋消息:示例代碼中,一共重複10輪操做。每輪先向全部訂閱者發送2000個消息;
而後堵塞線程,開始等待;最後經過onMessage()方法,接收到訂閱者反饋的「REPORT」類信息後,才print反饋信息並解除線程堵塞,進入下一輪。
注:可同時運行多個訂閱者測試查看此模式效果
7、ActiveMQ與Tomcat整合
說明:Tomcat示例版本6.0.14,其它版本在配置上可能有一些差別
一、準備jar包:將ActiveMQ lib目錄下的5個jar包複製到Tomcat lib目錄下:
activemq-core-5.1.0.jar
activemq-web-5.1.0.jar
geronimo-j2ee-management_1.0_spec-1.0.jar
geronimo-jms_1.1_spec-1.1.1.jar
geronimo-jta_1.0.1B_spec-1.0.1.jar
二、修改配置文件:
2.1 修改Tomcat的conf/context.xml文件:
在節點中添加如下內容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
<Resource name="jms/FailoverConnectionFactory" auth="Container" type="org.apache.activemq.ActiveMQConnectionFactory" description="JMS Connection Factory" factory="org.apache.activemq.jndi.JNDIReferenceFactory" brokerURL="failover:(tcp://localhost:61616)?initialReconnectDelay=100&maxReconnectAttempts=5" brokerName="localhost" useEmbeddedBroker="false"/> <Resource name="jms/NormalConnectionFactory" auth="Container" type="org.apache.activemq.ActiveMQConnectionFactory" description="JMS Connection Factory" factory="org.apache.activemq.jndi.JNDIReferenceFactory" brokerURL="tcp://localhost:61616" brokerName="localhost" useEmbeddedBroker="false"/> <Resource name="jms/topic/MyTopic" auth="Container" type="org.apache.activemq.command.ActiveMQTopic" factory="org.apache.activemq.jndi.JNDIReferenceFactory" physicalName="MY.TEST.FOO"/> <Resource name="jms/queue/MyQueue" auth="Container" type="org.apache.activemq.command.ActiveMQQueue" factory="org.apache.activemq.jndi.JNDIReferenceFactory" physicalName="MY.TEST.FOO.QUEUE"/> 配置說明:以JNDI的方式定義了ActiveMQ的broker鏈接url、Topic和Queue。 |
此處需加以注意的是Listener端的borkerURL使用了failover傳輸方式:
1 |
failover:(tcp://localhost:61616)?initialReconnectDelay=100&maxReconnectAttempts=5 |
客戶端使用普通傳輸方式:tcp://localhost:61616
failover transport是一種從新鏈接機制,用於創建可靠的傳輸。此處配置的是一旦ActiveMQ broker中斷,Listener端將每隔100ms自動嘗試鏈接,直至成功鏈接或重試5次鏈接失敗爲止。
failover還支持多個borker同時提供服務,實現負載均衡的同時可增長系統容錯性,格式:
failover:(uri1,…,uriN)?transportOptions
2.2 新建web應用(webapps/jms-test),修改WEB-INF/web.xml文件:
增長一個自啓動Servlet,該Servlet實現了MessageListener接口,做爲Topic消息的Listener端。
1 2 3 4 5 6 7 |
<servlet> <servlet-name>jms-listener</servlet-name> <servlet-class> com.flvcd.servlet.JMSListener </servlet-class> <load-on-startup>1</load-on-startup> </servlet> |
2.3 修改activemq.xml文件:
爲了支持持久化消息,需修改ActiveMQ的配置文件以下,使用默認的AMQ Message Store方式(索引文件方式)存儲消息,據官網介紹是快速、穩定的。數據庫存儲方式可參照官網相關文檔。
1 2 3 |
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" persistent="true" useShutdownHook="false"> <persistenceAdapter> <amqPersistenceAdapter class="MsoNormal"> |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
connection.setClientID("MyClient"); Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //普通消息訂閱者,沒法接收持久消息 //MessageConsumer consumer = jmsSession.createConsumer((Destination) envContext.lookup("jms/topic/MyTopic")); //基於Topic建立持久的消息訂閱者,前提:Connection必須指定一個惟一的clientId,當前爲MyClient TopicSubscriber consumer = jmsSession.createDurableSubscriber((Topic) envContext.lookup("jms/topic/MyTopic"), "MySub"); consumer.setMessageListener(this); connection.start(); } catch (NamingException e) { e.printStackTrace(); } catch (JMSException e) { e.printStackTrace(); } } /** 接收消息,作對應處理 */ public void onMessage(Message message) { if (checkText(message, "RefreshArticleId") != null) { String articleId = checkText(message, "RefreshArticleId"); System.out.println("接收刷新文章消息,開始刷新文章ID=" + articleId); } else if (checkText(message, "RefreshThreadId") != null) { String threadId = checkText(message, "RefreshThreadId"); System.out.println("接收刷新論壇帖子消息,開始刷新帖子ID=" + threadId); } else { System.out.println("接收普通消息,不作任何處理!"); } } private static String checkText(Message m, String s) { try { return m.getStringProperty(s); } catch (JMSException e) { e.printStackTrace(System.out); return null; } } } |
編譯JMSListener.java至classes目錄:
1 |
javac -cp .;D:\apache-tomcat-6.0.14\lib\servlet-api.jar;D:\apache-tomcat-6.0.14\lib\geronimo-jms_1.1_spec-1.1.1.jar;D:\apache-tomcat-6.0.14\lib\activemq-core-5.1.0.jar -d . JMSListener.java |
注:D:\apache-tomcat-6.0.14請替換成本地對應目錄。
四、Publisher端(publish.jsp)實現:
在jms-test目錄下新建publish.jsp文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
<%@ page language="java" import="javax.jms.*" pageEncoding="GBK"%> <%@ page language="java" import="javax.naming.*"%> <%@ page language="java" import="org.apache.activemq.ActiveMQConnectionFactory"%> <% try { InitialContext initCtx = new InitialContext(); Context envContext = (Context) initCtx.lookup("java:comp/env"); ConnectionFactory connectionFactory = (ConnectionFactory) envContext.lookup("jms/NormalConnectionFactory"); Connection connection = connectionFactory.createConnection(); Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = jmsSession.createProducer((Destination) envContext.lookup("jms/topic/MyTopic")); //設置持久方式 producer.setDeliveryMode(DeliveryMode.PERSISTENT); Message testMessage = jmsSession.createMessage(); //發佈刷新文章消息 testMessage.setStringProperty("RefreshArticleId", "2046"); producer.send(testMessage); //發佈刷新帖子消息 testMessage.clearProperties(); testMessage.setStringProperty("RefreshThreadId", "331"); producer.send(testMessage); } catch (NamingException e) { e.printStackTrace(); } catch (JMSException e) { e.printStackTrace(); } %> |
Publisher和Listner之間經過Message的setStringProperty和getStringProperty方法,實現對應的業務邏輯。
上述示例代碼中,RefreshArticleId表明刷新某篇文章,RefreshThreadId表明刷新某個帖子,property值保持對應的ID。固然用戶可根據實際需求靈活地使用。
五、運行Demo:
5.1 啓動ActiveMQ服務器
5.2 啓動Tomcat服務器:JMSListener將自動鏈接ActiveMQ broker,日誌信息:
Successfully connected to tcp://localhost:61616
5.3 訪問http://localhost:8080/jms-test/publish.jsp
Tomcat服務器日誌將提示:
接收刷新文章消息,開始刷新文章ID=2046
接收刷新論壇帖子消息,開始刷新帖子ID=331
5.4 訪問http://localhost:8161/admin/topics.jsp查看MY.TEST.FOO的消息日誌,分別發送和接收2條。
至此,已成功完成ActiveMQ與Tomcat的基本整合!
Publisher和Listener徹底能夠獨立部署到不一樣的Web服務器上,並經過ActiveMQ來進行消息傳遞,實現用戶所需的業務邏輯。
測試持久消息的具體步驟:
l 啓動Publisher所在Web服務器
l 啓動ActiveMQ
l 訪問publish.jsp發送消息,此時Listener還未啓動,消息將保存在ActiveMQ的bin\activemq-data目錄下,查看日誌能夠看到發送2條,接收0條
l 啓動Listener所在Web服務器,將自動接收到ActiveMQ的持久消息並處理,查看日誌:發送2條,接收2條,代表持久消息應用成功
http://www.cnblogs.com/shuyangdehou/archive/2010/10/07/1845239.html
activeMQ相關:
http://baike.baidu.com/view/157103.htm jms百度百科
http://www.189works.com/article-57444-1.html ActiveMQ消息收發簡單例子
http://liyebing.iteye.com/blog/1044825 jms實現簡單的聊天
http://jinguo.iteye.com/blog/234311 一個生產者消費者實例
http://blog.csdn.net/linkyou/article/details/4101152 activema文檔
http://i-coding.iteye.com/blog/1018920 activemq實現QQ聊天
http://www.cnblogs.com/shuyangdehou/archive/2010/10/07/1845239.html activeMq教程
http://www.cnblogs.com/opaljc/archive/2012/03/25/2416545.html activeMQ使用
http://www.cnblogs.com/sjzzqy/archive/2012/05/29/activemq.html 例子
通信
http://download.csdn.net/download/gaojie8273150/2290220
http://download.csdn.net/detail/gaoweipeng/1635506
http://download.csdn.net/download/qqq626/3081379