本人博客開始遷移,博客整個架構本身搭建及編碼 http://www.cookqq.com/listBlog.action java
ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已是好久的事情了,可是JMS在當今的J2EE應用中間仍然扮演着特殊的地位。下面詳細的解釋經常使用類的做用 apache
ConnectionFactory 接口(鏈接工廠) 用戶用來建立到JMS提供者的鏈接的被管對象。JMS客戶經過可移植的接口訪問鏈接,這樣當下層的實現改變時,代碼不須要進行修改。 管理員在JNDI名字空間中配置鏈接工廠,這樣,JMS客戶纔可以查找到它們。根據消息類型的不一樣,用戶將使用隊列鏈接工廠,或者主題鏈接工廠。
Connection 接口(鏈接) 鏈接表明了應用程序和消息服務器之間的通訊鏈路。在得到了鏈接工廠後,就能夠建立一個與JMS提供者的鏈接。根據不一樣的鏈接類型,鏈接容許用戶建立會話,以發送和接收隊列和主題到目標。
Destination 接口(目標) 目標是一個包裝了消息目標標識符的被管對象,消息目標是指消息發佈和接收的地點,或者是隊列,或者是主題。JMS管理員建立這些對象,而後用戶經過JNDI發現它們。和鏈接工廠同樣,管理員能夠建立兩種類型的目標,點對點模型的隊列,以及發佈者/訂閱者模型的主題。
MessageConsumer 接口(消息消費者) 由會話建立的對象,用於接收發送到目標的消息。消費者能夠同步地(阻塞模式),或異步(非阻塞)接收隊列和主題類型的消息。
MessageProducer 接口(消息生產者) 由會話建立的對象,用於發送消息到目標。用戶能夠建立某個目標的發送者,也能夠建立一個通用的發送者,在發送消息時指定目標。
Message 接口(消息) 是在消費者和生產者之間傳送的對象,也就是說從一個應用程序傳送到另外一個應用程序。一個消息有三個主要部分: 消息頭(必須):包含用於識別和爲消息尋找路由的操做設置。 一組消息屬性(可選):包含額外的屬性,支持其餘提供者和用戶的兼容。能夠建立定製的字段和過濾器(消息選擇器)。 一個消息體(可選):容許用戶建立五種類型的消息(文本消息,映射消息,字節消息,流消息和對象消息)。 消息接口很是靈活,並提供了許多方式來定製消息的內容。
Session 接口(會話) 表示一個單線程的上下文,用於發送和接收消息。因爲會話是單線程的,因此消息是連續的,就是說消息是按照發送的順序一個一個接收的。會話的好處是它支持事務。若是用戶選擇了事務支持,會話上下文將保存一組消息,直到事務被提交才發送這些消息。在提交事務以前,用戶能夠使用回滾操做取消這些消息。一個會話容許用戶建立消息生產者來發送消息,建立消息消費者來接收消息。
JMS的消息模式有1.點對點的消息模式(Point to Point Messaging) 服務器
2.發佈訂閱模式(publish – subscribe Mode) session
這裏基於點對點的消息模式進行ActiveMQ發消息和收消息過程的分析,請看模型圖: 架構
點對點的消息發送方式主要創建在 Message Queue,Sender,reciever上,Message Queue 存貯消息,Sneder(客戶端A) 發送消息,receive(客戶端B)接收消息。具體點就是客戶端A發送Message Queue ,而 客戶端B從Queue中接收消息和"發送消息已接受"到Quere,確認消息接收。消息發送客戶端與接收客戶端沒有時間上的依賴,發送客戶端能夠在 任什麼時候刻發送信息到Queue,而不須要知道接收客戶端是否是在運行 。異步
請看下面發消息和收消息的例子jsp
package com.activemq.queue; import java.util.Date; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class ActiveMqTest { private static String queueName = "activemq_queue_"; public static void main(String[] args) { Receiver receiver=new Receiver(); Sender sender =new Sender(); try { sender.send(); receiver.receive(); } catch (Exception e) { e.printStackTrace(); } } static class Receiver { public static void receive() throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(queueName); MessageConsumer consumer = session.createConsumer(destination); //第一種狀況 int i = 0; while (i < 3) { i++; TextMessage message = (TextMessage) consumer.receive(); session.commit(); // TODO something.... System.out .println("收到消息:" +message.getText()); } session.close(); connection.close(); //----------------第一種狀況結束---------------------- //第二種方式 // consumer.setMessageListener(new MessageListener() { // public void onMessage(Message arg0) { // if(arg0 instanceof TextMessage){ // try { // System.out.println("arg0="+((TextMessage)arg0).getText()); // } catch (JMSException e) { // e.printStackTrace(); // } // } // } // }); //第三種狀況 // while (true) { // Message msg = consumer.receive(1000); // TextMessage message = (TextMessage) msg; // if (null != message) { // System.out.println("收到消息:" + message.getText()); // } // } } } static class Sender { public static void send() throws Exception { ConnectionFactory connectionFactory = null; connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, //null ActiveMQConnection.DEFAULT_PASSWORD, //null "tcp://localhost:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(queueName); MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 3; i++) { TextMessage message = session.createTextMessage("count"+new Date().getTime()); Thread.sleep(1000); // 經過消息生產者發出消息 System.out.println("發送消息"+i+new Date()); producer.send(message); } session.commit(); session.close(); connection.close(); } } }
Sender主要的做用是發送消息,Receiver主要的做用是接受消息,而且顯示一下接收消息的內容,這裏詳細的解釋接受消息的方法:tcp
(1)第一種方法使用consumer.receive() 或 consumer.receive(int timeout)接受消息,消息的接收者會一直等待下去,直到有消息到達,或者超時。 ide
其實第一種方法和第三種方法接受原理同樣,區別是第一種知道要接受消息的條數,接受完消息,手動關係鏈接。而第三種不知道要接受多少條數據,因此使用while (true) 死循環直接在接受消息。 編碼
(2)第二種方法:消息消費者註冊一個MessageListener,當有消息到達的時候,會回調它的onMessage()方法。
這裏須要注意的是,你註冊完成MessageListener,千萬不要關閉鏈接session.close();和connection.close();由於你剛剛註冊完成監聽器,就把鏈接關閉,就不會受到消息,因此監聽器中也不會有處理。(這個問題可把我整哭了,搞了半天,才弄明白)
請看ActiveMQ 頁面上顯示隊列的信息
name是隊列名稱
Number Of Pending Messages 是隊列中有多少個消息等待出隊列
Number Of Consumers 是隊列中有多少個消費者
Messages Enqueued 是隊列共有多少個信息
Messages Dequeued 是隊列中已經出列多少個消息
開發中遇到的異常:
(1)javax.jms.JMSException: Could not connect to broker URL: tcp://localhost:61616. Reason: java.net.ConnectException: Connection refused: connect
拒絕鏈接,緣由是activemq服務器沒有開啓。
解決辦方法:開啓activemq服務器,請參照《activemq跑起來》