官網地址:http://activemq.apache.org/java
參考文章:http://my.oschina.net/nk2011/blog/366395linux
JMS支持兩種消息發送和接收模型。一種稱爲P2P(Ponit to Point)模型,即採用點對點的方式發送消息。P2P模型是基於隊列的,消息生產者發送消息到隊列,消息消費者從隊列中接收消息,隊列的存在使得消息的異步傳輸稱爲可能,P2P模型在點對點的狀況下進行消息傳遞時採用。web
另外一種稱爲Pub/Sub(Publish/Subscribe,即發佈-訂閱)模型,發佈-訂閱模型定義瞭如何向一個內容節點發布和訂閱消息,這個內容節點稱爲topic(主題)。主題能夠認爲是消息傳遞的中介,消息發佈這將消息發佈到某個主題,而消息訂閱者則從主題訂閱消息。主題使得消息的訂閱者與消息的發佈者互相保持獨立,不須要進行接觸便可保證消息的傳遞,發佈-訂閱模型在消息的一對多廣播時採用。apache
ActiveMQ的安裝瀏覽器
下載最新的安裝包apache-activemq-5.13.2-bin.tar.gz(此包linux下的,案例也是針對linux系統進行闡述,固然ActiveMQ也有win版的,這裏就不贅述了),能夠去官網下載,也能夠在下方留言區留下你的郵箱,博主會發給你的~服務器
下載以後解壓: tar -zvxf apache-activemq-5.13.2-bin.tar.gzsession
ActiveMQ目錄內容有:app
bin目錄包含ActiveMQ的啓動腳本less
conf目錄包含ActiveMQ的全部配置文件webapp
data目錄包含日誌文件和持久性消息數據
example: ActiveMQ的示例
lib: ActiveMQ運行所須要的lib
webapps: ActiveMQ的web控制檯和一些相關的demo
運行命令:activemq start(在activemq/bin下運行)
INFO: Loading '/users/shr/apache-activemq-5.13.2//bin/env' INFO: Using java '/users/shr/util/JavaDir/jdk/bin/java' INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details INFO: pidfile created : '/users/shr/apache-activemq-5.13.2//data/activemq.pid' (pid '986')
查看activemq是否運行命令:ps -aux | grep activemq
shr 986 1.2 9.7 1281720 201936 pts/5 Sl 19:43 0:17 /users/shr/util/JavaDir/jdk/bin/java -Xms64M -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=/users/shr/apache-activemq-5.13.2//conf/login.config -Dcom.sun.management.jmxremote -Djava.awt.headless=true -Djava.io.tmpdir=/users/shr/apache-activemq-5.13.2//tmp -Dactivemq.classpath=/users/shr/apache-activemq-5.13.2//conf:/users/shr/apache-activemq-5.13.2//../lib/: -Dactivemq.home=/users/shr/apache-activemq-5.13.2/ -Dactivemq.base=/users/shr/apache-activemq-5.13.2/ -Dactivemq.conf=/users/shr/apache-activemq-5.13.2//conf -Dactivemq.data=/users/shr/apache-activemq-5.13.2//data -jar /users/shr/apache-activemq-5.13.2//bin/activemq.jar start shr 1501 0.0 0.0 5176 724 pts/5 S+ 20:06 0:00 grep activemq
關閉命令: activemq stop
INFO: Loading '/users/shr/apache-activemq-5.13.2//bin/env' INFO: Using java '/users/shr/util/JavaDir/jdk/bin/java' INFO: Waiting at least 30 seconds for regular process termination of pid '986' : Java Runtime: Oracle Corporation 1.7.0_79 /users/shr/util/JavaDir/jdk1.7.0_79/jre Heap sizes: current=63232k free=62218k max=932096k JVM args: -Xms64M -Xmx1G -Djava.util.logging.config.file=logging.properties -Djava.security.auth.login.config=/users/shr/apache-activemq-5.13.2//conf/login.config -Dactivemq.classpath=/users/shr/apache-activemq-5.13.2//conf:/users/shr/apache-activemq-5.13.2//../lib/: -Dactivemq.home=/users/shr/apache-activemq-5.13.2/ -Dactivemq.base=/users/shr/apache-activemq-5.13.2/ -Dactivemq.conf=/users/shr/apache-activemq-5.13.2//conf -Dactivemq.data=/users/shr/apache-activemq-5.13.2//data Extensions classpath: [/users/shr/apache-activemq-5.13.2/lib,/users/shr/apache-activemq-5.13.2/lib/camel,/users/shr/apache-activemq-5.13.2/lib/optional,/users/shr/apache-activemq-5.13.2/lib/web,/users/shr/apache-activemq-5.13.2/lib/extra] ACTIVEMQ_HOME: /users/shr/apache-activemq-5.13.2 ACTIVEMQ_BASE: /users/shr/apache-activemq-5.13.2 ACTIVEMQ_CONF: /users/shr/apache-activemq-5.13.2/conf ACTIVEMQ_DATA: /users/shr/apache-activemq-5.13.2/data Connecting to pid: 986 ..Stopping broker: localhost .. TERMINATED
ActiveMQ的默認服務端口爲61616,這個能夠在conf/activemq.xml配置文件中修改:
<transportConnectors> <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> </transportConnectors>
案例
在下載的apache-activemq-5.13.2-bin.tar.gz包中解壓有一個jar包:activemq-all-5.13.2.jar,引入這個jar到你的項目中便可開始編寫案例代碼。
博主的activemq服務器地址爲10.10.195.187,這個在下面代碼中會有體現。
按照JMS的規範,咱們首先須要得到一個JMS connection factory.,經過這個connection factory來建立connection.在這個基礎之上咱們再建立session, destination, producer和consumer。所以主要的幾個步驟以下:
得到JMS connection factory. 經過咱們提供特定環境的鏈接信息來構造factory。
利用factory構造JMS connection
啓動connection
經過connection建立JMS session.
指定JMS destination.
建立JMS producer或者建立JMS message並提供destination.
建立JMS consumer或註冊JMS message listener.
發送和接收JMS message.
關閉全部JMS資源,包括connection, session, producer, consumer等。
下面來看代碼舉例(P2P式)。
經過Java實現的基於ActiveMQ的請求提交:
package com.zzh.activemq; import java.io.Serializable; import java.util.HashMap; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class RequestSubmit { //消息發送者 private MessageProducer producer; //一個發送或者接受消息的線程 private Session session; public void init() throws Exception { //ConnectionFactory鏈接工廠,JMS用它建立鏈接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://10.10.195.187:61616"); //Connection:JMS客戶端到JMS Provider的鏈接,從構造工廠中獲得鏈接對象 Connection connection = connectionFactory.createConnection(); //啓動 connection.start(); //獲取鏈接操做 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destinatin = session.createQueue("RequestQueue"); //獲得消息生成(發送)者 producer = session.createProducer(destinatin); //設置不持久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } public void submit(HashMap<Serializable,Serializable> requestParam) throws Exception { ObjectMessage message = session.createObjectMessage(requestParam); producer.send(message); session.commit(); } public static void main(String[] args) throws Exception{ RequestSubmit submit = new RequestSubmit(); submit.init(); HashMap<Serializable,Serializable> requestParam = new HashMap<Serializable,Serializable>(); requestParam.put("郟高陽", "zzh"); submit.submit(requestParam); } }
建立Session時有兩個很是重要的參數,第一個boolean類型的參數用來表示是否採用事務消息。若是是事務消息,對於的參數設置爲true,此時消息的提交自動有comit處理,消息的回滾則自動由rollback處理。加入消息不是事務的,則對應的該參數設置爲false,此時分爲三種狀況:
Session.AUTO_ACKNOWLEDGE表示Session會自動確認所接收到的消息。
Session.CLIENT_ACKNOWLEDGE表示由客戶端程序經過調用消息的確認方法來確認所接收到的消息。
Session.DUPS_OK_ACKNOWLEDGE使得Session將「懶惰」地確認消息,即不會當即確認消息,這樣有可能致使消息重複投遞。
提供Java實現的基於ActiveMQ的請求處理:
package com.zzh.activemq; import java.io.Serializable; import java.util.HashMap; import java.util.Map; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.ObjectMessage; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class RequestProcessor { public void requestHandler(HashMap<Serializable,Serializable> requestParam) throws Exception { System.out.println("requestHandler....."+requestParam.toString()); for(Map.Entry<Serializable, Serializable> entry : requestParam.entrySet()) { System.out.println(entry.getKey()+":"+entry.getValue()); } } public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://10.10.195.187:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("RequestQueue"); //消息消費(接收)者 MessageConsumer consumer = session.createConsumer(destination); RequestProcessor processor = new RequestProcessor(); while(true) { ObjectMessage message = (ObjectMessage) consumer.receive(1000); if(null != message) { System.out.println(message); HashMap<Serializable,Serializable> requestParam = (HashMap<Serializable,Serializable>) message.getObject(); processor.requestHandler(requestParam); } else { break; } } } }
輸出結果:
ActiveMQObjectMessage {commandId = 6, responseRequired = false, messageId = ID:hidden-PC-58748-1460550507055-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:hidden-PC-58748-1460550507055-1:1:1:1, destination = queue://RequestQueue, transactionId = TX:ID:hidden-PC-58748-1460550507055-1:1:1, expiration = 0, timestamp = 1460550507333, arrival = 0, brokerInTime = 1460550505969, brokerOutTime = 1460550509143, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@74a456bb, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false} requestHandler.....{郟高陽=zzh} 郟高陽:zzh
能夠經過頁面查看隊列的使用狀況,在瀏覽器中輸入http://10.10.195.187:8161/admin/queues.jsp,用戶名和密碼都是:admin,看到如下頁面:
這個是在jetty服務器下跑的,能夠修改conf/jetty.xml來修改相關jetty配置。
上面的例子是關於P2P模式的,不過有個不妥之處,就是沒有資源的釋放。下面舉一個Pub/Sub模式的。
經過JMS建立ActiveMQ的topic,並給topic發送消息:
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.camel.Produce; public class TopicRequest { //消息發送者 private MessageProducer producer; //一個發送或者接受消息的線程 private Session session; //Connection:JMS客戶端到JMS Provider的鏈接 private Connection connection; public void init() throws Exception { //ConnectionFactory鏈接工廠,JMS用它建立鏈接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://10.10.195.187:61616"); //從構造工廠中獲得鏈接對象 connection = connectionFactory.createConnection(); //啓動 connection.start(); //獲取鏈接操做 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("MessageTopic"); producer = session.createProducer(topic); //設置不持久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } public void submit(String mess) throws Exception { TextMessage message = session.createTextMessage(); message.setText(mess); producer.send(message); } public void close() { try { if(session != null) session.close(); if(producer != null) producer.close(); if(connection !=null ) connection.close(); } catch (JMSException e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { TopicRequest topicRequest = new TopicRequest(); topicRequest.init(); topicRequest.submit("I'm first"); topicRequest.close(); } }
消息發送到對應的topic後,須要將listener註冊到須要訂閱的topic上,以便可以接收該topic的消息:
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class TopicReceive { private MessageConsumer consumer; private Session session; public void init() throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://10.10.195.187:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("MessageTopic"); consumer = session.createConsumer(topic); consumer.setMessageListener(new MessageListener(){ @Override public void onMessage(Message message) { TextMessage tm = (TextMessage) message; System.out.println(tm); try { System.out.println(tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } public static void main(String[] args) throws Exception { TopicReceive receive = new TopicReceive(); receive.init(); } }
輸出結果:
ActiveMQTextMessage {commandId = 5, responseRequired = false, messageId = ID:hidden-PC-50073-1460597487065-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:hidden-PC-50073-1460597487065-1:1:1:1, destination = topic://MessageTopic, transactionId = null, expiration = 0, timestamp = 1460597487308, arrival = 0, brokerInTime = 1460597487297, brokerOutTime = 1460597487298, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@2e4d3abf, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = I'm first} I'm first