ActiveMQ發消息和收消息

本人博客開始遷移,博客整個架構本身搭建及編碼 http://www.cookqq.com/listBlog.action java

ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已是好久的事情了,可是JMS在當今的J2EE應用中間仍然扮演着特殊的地位。下面詳細的解釋經常使用類的做用 apache

ConnectionFactory 接口(鏈接工廠) 用戶用來建立到JMS提供者的鏈接的被管對象。JMS客戶經過可移植的接口訪問鏈接,這樣當下層的實現改變時,代碼不須要進行修改。 管理員在JNDI名字空間中配置鏈接工廠,這樣,JMS客戶纔可以查找到它們。根據消息類型的不一樣,用戶將使用隊列鏈接工廠,或者主題鏈接工廠。
Connection 接口(鏈接) 鏈接表明了應用程序和消息服務器之間的通訊鏈路。在得到了鏈接工廠後,就能夠建立一個與JMS提供者的鏈接。根據不一樣的鏈接類型,鏈接容許用戶建立會話,以發送和接收隊列和主題到目標。
Destination 接口(目標) 目標是一個包裝了消息目標標識符的被管對象,消息目標是指消息發佈和接收的地點,或者是隊列,或者是主題。JMS管理員建立這些對象,而後用戶經過JNDI發現它們。和鏈接工廠同樣,管理員能夠建立兩種類型的目標,點對點模型的隊列,以及發佈者/訂閱者模型的主題。
MessageConsumer 接口(消息消費者) 由會話建立的對象,用於接收發送到目標的消息。消費者能夠同步地(阻塞模式),或異步(非阻塞)接收隊列和主題類型的消息。
MessageProducer 接口(消息生產者) 由會話建立的對象,用於發送消息到目標。用戶能夠建立某個目標的發送者,也能夠建立一個通用的發送者,在發送消息時指定目標。
Message 接口(消息) 是在消費者和生產者之間傳送的對象,也就是說從一個應用程序傳送到另外一個應用程序。一個消息有三個主要部分: 消息頭(必須):包含用於識別和爲消息尋找路由的操做設置。 一組消息屬性(可選):包含額外的屬性,支持其餘提供者和用戶的兼容。能夠建立定製的字段和過濾器(消息選擇器)。 一個消息體(可選):容許用戶建立五種類型的消息(文本消息,映射消息,字節消息,流消息和對象消息)。 消息接口很是靈活,並提供了許多方式來定製消息的內容。
Session 接口(會話) 表示一個單線程的上下文,用於發送和接收消息。因爲會話是單線程的,因此消息是連續的,就是說消息是按照發送的順序一個一個接收的。會話的好處是它支持事務。若是用戶選擇了事務支持,會話上下文將保存一組消息,直到事務被提交才發送這些消息。在提交事務以前,用戶能夠使用回滾操做取消這些消息。一個會話容許用戶建立消息生產者來發送消息,建立消息消費者來接收消息。

JMS的消息模式有1.點對點的消息模式(Point to Point Messaging) 服務器

2.發佈訂閱模式(publish – subscribe Mode) session

這裏基於點對點的消息模式進行ActiveMQ發消息和收消息過程的分析,請看模型圖: 架構

點對點的消息發送方式主要創建在 Message Queue,Sender,reciever上,Message Queue 存貯消息,Sneder(客戶端A) 發送消息,receive(客戶端B)接收消息。具體點就是客戶端A發送Message Queue ,而 客戶端B從Queue中接收消息和"發送消息已接受"到Quere,確認消息接收。消息發送客戶端與接收客戶端沒有時間上的依賴,發送客戶端能夠在 任什麼時候刻發送信息到Queue,而不須要知道接收客戶端是否是在運行 。異步

請看下面發消息和收消息的例子jsp

package com.activemq.queue;


import java.util.Date;


import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;


import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;


public class ActiveMqTest {
 
 private static String queueName = "activemq_queue_";


 public static void main(String[] args) {
 Receiver receiver=new Receiver();
 Sender sender =new Sender();
 try {
 sender.send();
 receiver.receive();
 } catch (Exception e) {
 e.printStackTrace();
 }
 }


 static class Receiver {
 public static void receive() throws Exception {
 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
 Connection connection = connectionFactory.createConnection();
 connection.start();
 Session session = connection.createSession(Boolean.TRUE,
 Session.AUTO_ACKNOWLEDGE);
 Destination destination = session.createQueue(queueName);
 MessageConsumer consumer = session.createConsumer(destination);
 //第一種狀況
 int i = 0;
 while (i < 3) {
 i++;
 TextMessage message = (TextMessage) consumer.receive();
 session.commit();
 // TODO something....
 System.out
 .println("收到消息:" +message.getText());
 }
 session.close();
 connection.close();
 //----------------第一種狀況結束----------------------
 //第二種方式
//			consumer.setMessageListener(new MessageListener() {
//				public void onMessage(Message arg0) {
//					if(arg0 instanceof TextMessage){
//						try {
//							System.out.println("arg0="+((TextMessage)arg0).getText());
//						} catch (JMSException e) {
//							e.printStackTrace();
//						}
//					}
//				}
//			});
 //第三種狀況
//			 while (true) {
//            Message msg = consumer.receive(1000);
//            TextMessage message = (TextMessage) msg;
//            if (null != message) { 
//           	 System.out.println("收到消息:" + message.getText());
//            } 
//        }
 }
 }


 static class Sender {
 public static void send() throws Exception {
 ConnectionFactory connectionFactory = null;
 connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER, //null
                ActiveMQConnection.DEFAULT_PASSWORD, //null
                "tcp://localhost:61616");


 Connection connection = connectionFactory.createConnection();
 connection.start();


 Session session = connection.createSession(Boolean.TRUE,
 Session.AUTO_ACKNOWLEDGE);
 Destination destination = session.createQueue(queueName);


 MessageProducer producer = session.createProducer(destination);
 for (int i = 0; i < 3; i++) {
 TextMessage message = session.createTextMessage("count"+new Date().getTime());
 Thread.sleep(1000);
 // 經過消息生產者發出消息
 System.out.println("發送消息"+i+new Date());
 producer.send(message);
 }
 session.commit();
 session.close();
 connection.close();
 }
 }
}

Sender主要的做用是發送消息,Receiver主要的做用是接受消息,而且顯示一下接收消息的內容,這裏詳細的解釋接受消息的方法:tcp

(1)第一種方法使用consumer.receive() 或 consumer.receive(int timeout)接受消息,消息的接收者會一直等待下去,直到有消息到達,或者超時。 ide

其實第一種方法和第三種方法接受原理同樣,區別是第一種知道要接受消息的條數,接受完消息,手動關係鏈接。而第三種不知道要接受多少條數據,因此使用while (true) 死循環直接在接受消息 編碼

(2)第二種方法:消息消費者註冊一個MessageListener當有消息到達的時候,會回調它的onMessage()方法。

這裏須要注意的是,你註冊完成MessageListener,千萬不要關閉鏈接session.close();和connection.close();由於你剛剛註冊完成監聽器,就把鏈接關閉,就不會受到消息,因此監聽器中也不會有處理。(這個問題可把我整哭了,搞了半天,才弄明白)

請看ActiveMQ 頁面上顯示隊列的信息

name是隊列名稱

Number Of Pending Messages  是隊列中有多少個消息等待出隊列

Number Of Consumers  是隊列中有多少個消費者

Messages Enqueued  隊列共有多少個信息

Messages Dequeued  是隊列中已經出列多少個消息

開發中遇到的異常:

(1)javax.jms.JMSException: Could not connect to broker URL: tcp://localhost:61616. Reason: java.net.ConnectException: Connection refused: connect

拒絕鏈接,緣由是activemq服務器沒有開啓。

解決辦方法:開啓activemq服務器,請參照《activemq跑起來

相關文章
相關標籤/搜索