簡介:html
Apache ActiveMQ ™ 是最流行最強大的開源消息及繼承模式服務器。ijava
Apache ActiveMQ 速度快,支持多種語言的客戶端及代理,可便捷的使用企業集成模式,完整支持JMS1.1及JEE1.4 ,符合 Apache2.0 Licence。macos
0.下載地址apache
https://activemq.apache.org/download.html服務器
1.解壓並啓動activemq服務(需根據系統的不一樣選擇不一樣的啓動文件)session
/apache-activemq-5.13.1/bin/macosx/activemq starteclipse
2.登陸activemq服務器進行查看tcp
地址:http://localhost:8161/spa
點擊[Manage ActiveMQ broker]登陸查看詳細數據,默認用戶名密碼admin/admin代理
3.ActiviteMQ消息有3種形式
JMS 公共 |
點對點域 |
發佈/訂閱域 |
ConnectionFactory |
QueueConnectionFactory |
TopicConnectionFactory |
Connection |
QueueConnection |
TopicConnection |
Destination |
Queue |
Topic |
Session |
QueueSession |
TopicSession |
MessageProducer |
QueueSender |
TopicPublisher |
MessageConsumer |
QueueReceiver |
TopicSubscriber
|
point-to-point : 點對點的消息發送方式主要創建在 Message Queue,Sender,reciever上,Message Queue 存貯消息,Sneder 發送消息,receive接收消息。Sender Client發送Message Queue ,Receiver Client從Queue中接收消息和"發送消息已接受"到Queue,確認消息接收。消息發送客戶端與接收客戶端沒有時間上的依賴,發送客戶端能夠在任什麼時候刻發送信息到Queue,而不須要知道接收客戶端是否是在運行。
publish/subscriber Messaging :發佈/訂閱方式用於多接收客戶端的方式。做爲發佈訂閱的方式,可能存在多個接收客戶端,而且接收端客戶端與發送客戶端存在時間上的依賴,一個接收端只能接收他建立之後發送客戶端發送的信息。做爲subscriber ,在接收消息時有兩種方法,destination的receive方法,和實現message listener 接口的onMessage 方法。
3.1 發送消息的基本步驟:
(1)、建立鏈接使用的工廠類JMS ConnectionFactory
(2)、使用管理對象JMS ConnectionFactory創建鏈接Connection,並啓動
(3)、使用鏈接Connection 創建會話Session
(4)、使用會話Session和管理對象Destination建立消息生產者MessageSender
(5)、使用消息生產者MessageSender發送消息
3.2 接收消息的基本步驟
(1)、建立鏈接使用的工廠類JMS ConnectionFactory
(2)、使用管理對象JMS ConnectionFactory創建鏈接Connection,並啓動
(3)、使用鏈接Connection 創建會話Session
(4)、使用會話Session和管理對象Destination建立消息接收者MessageReceiver
(5)、使用消息接收者MessageReceiver接受消息,須要用setMessageListener將MessageListener接口綁定到MessageReceiver消息接收者必須實現了MessageListener接口,須要定義onMessage事件方法。
4.代碼示例
建立eclipse項目
/apache-activemq-5.13.1/lib下倒入所需jar包
4.1 通用jms示例
public class Sender { private static final int SEND_NUMBER=5; public static void main(String[] args){ ConnectionFactory connectionFactory; Connection connection =null; Session session; Destination destination; MessageProducer producer; connectionFactory=new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try{ connection = connectionFactory.createConnection(); connection.start(); session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); destination=session.createQueue("JMeterQueue"); producer=session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); sendMessage(session,producer); session.commit(); }catch(Exception e){ e.printStackTrace(); }finally{ try{ if(null!=connection){ connection.close(); } }catch(Throwable ignore){ } } } public static void sendMessage(Session session,MessageProducer producer) throws JMSException{ for(int i=1;i<SEND_NUMBER;i++){ TextMessage message=session.createTextMessage("ActiveMq send "+i); System.out.println("ActiveMq send "+i); producer.send(message); } } }
public class Receiver { public static void main(String[] args){ ConnectionFactory connectionFactory ; Connection connection=null; Session session; Destination destination; MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try{ connection = connectionFactory.createConnection(); connection.start(); session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); destination=session.createQueue("JMeterQueue"); consumer=session.createConsumer(destination); while(true){ TextMessage message=(TextMessage)consumer.receive(10000); if(null !=message){ System.out.println("Message receive "+ message.getText()); }else{ break; } } session.commit(); //session.commit 以後,Messages Enqueued 中的消息纔會被被消費掉,Messages Dequeued 纔會增長; //若是不commit,Messages Dequeued會一直爲0,每次啓動receiver後都會受到全部未消費的消息 }catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } }
4.2 p2p示例
public class QueueSender { // 發送次數 public static final int SEND_NUM = 5; // tcp 地址 public static final String BROKER_URL = "tcp://localhost:61616"; // 目標,在ActiveMQ管理員控制檯建立 public static final String DESTINATION = "mq.p2p.queue"; public static void run() throws Exception { QueueConnection connection = null; QueueSession session = null; try { // 建立連接工廠 QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL); // 經過工廠建立一個鏈接 connection = factory.createQueueConnection(); // 啓動鏈接 connection.start(); // 建立一個session會話 session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 建立一個消息隊列 Queue queue = session.createQueue(DESTINATION); // 建立消息發送者 javax.jms.QueueSender sender = session.createSender(queue); // 設置持久化模式 sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT); sendMessage(session, sender); // 提交會話 session.commit(); } catch (Exception e) { throw e; } finally { // 關閉釋放資源 if (session != null) { session.close(); } if (connection != null) { connection.close(); } } } public static void sendMessage(QueueSession session, javax.jms.QueueSender sender) throws Exception { for (int i = 0; i < SEND_NUM; i++) { String message = "發送消息第" + (i + 1) + "條"; Message msg=session.createTextMessage(message); sender.send(msg); } } public static void main(String[] args) throws Exception { QueueSender.run(); } }
public class QueueReceiver { // tcp 地址 public static final String BROKER_URL = "tcp://localhost:61616"; // 目標,在ActiveMQ管理員控制檯建立 public static final String TARGET = "mq.p2p.queue"; public static void run() throws Exception { QueueConnection connection = null; QueueSession session = null; try { // 建立連接工廠 QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL); // 經過工廠建立一個鏈接 connection = factory.createQueueConnection(); // 啓動鏈接 connection.start(); // 建立一個session會話 session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 建立一個消息隊列 Queue queue = session.createQueue(TARGET); // 建立消息製做者 javax.jms.QueueReceiver receiver = session.createReceiver(queue); receiver.setMessageListener(new MessageListener() { public void onMessage(Message msg) { if (msg != null) { TextMessage map = (TextMessage) msg; try { System.out.println(map.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }); // 休眠100ms再關閉 Thread.sleep(1000 * 20); // 提交會話 session.commit(); } catch (Exception e) { throw e; } finally { // 關閉釋放資源 if (session != null) { session.close(); } if (connection != null) { connection.close(); } } } public static void main(String[] args) throws Exception { QueueReceiver.run(); } }
4.3 訂閱示例
public class TopicSender { // 發送次數 public static final int SEND_NUM = 5; // tcp 地址 public static final String BROKER_URL = "tcp://localhost:61616"; // 目標,在ActiveMQ管理員控制檯建立 public static final String DESTINATION = "mq.topic"; public static void run() throws Exception { TopicConnection connection = null; TopicSession session = null; try { // 建立連接工廠 TopicConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL); // 經過工廠建立一個鏈接 connection = factory.createTopicConnection(); // 啓動鏈接 connection.start(); // 建立一個session會話 session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 建立一個消息隊列 Topic topic = session.createTopic(DESTINATION); // 建立消息發送者 TopicPublisher publisher = session.createPublisher(topic); // 設置持久化模式 publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT); sendMessage(session, publisher); // 提交會話 session.commit(); } catch (Exception e) { throw e; } finally { // 關閉釋放資源 if (session != null) { session.close(); } if (connection != null) { connection.close(); } } } public static void sendMessage(TopicSession session, TopicPublisher publisher) throws Exception { for (int i = 0; i < SEND_NUM; i++) { String message = "發送消息第" + (i + 1) + "條"; TextMessage msg =session.createTextMessage(message); publisher.send(msg); } } public static void main(String[] args) throws Exception { TopicSender.run(); } }
public class TopicReceiver { // tcp 地址 public static final String BROKER_URL = "tcp://localhost:61616"; // 目標,在ActiveMQ管理員控制檯建立 public static final String TARGET = "mq.topic"; public static void run() throws Exception { TopicConnection connection = null; TopicSession session = null; try { // 建立連接工廠 TopicConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL); // 經過工廠建立一個鏈接 connection = factory.createTopicConnection(); // 啓動鏈接 connection.start(); // 建立一個session會話 session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 建立一個消息隊列 Topic topic = session.createTopic(TARGET); // 建立消息製做者 TopicSubscriber subscriber = session.createSubscriber(topic); subscriber.setMessageListener(new MessageListener() { public void onMessage(Message msg) { System.out.println(msg); } }); // 休眠100ms再關閉 Thread.sleep(1000 * 20); // 提交會話 session.commit(); } catch (Exception e) { throw e; } finally { // 關閉釋放資源 if (session != null) { session.close(); } if (connection != null) { connection.close(); } } } public static void main(String[] args) throws Exception { TopicReceiver.run(); } }