ActiveMQ簡單介紹+簡單實例

 

本文出自:http://www.open-open.com/lib/view/open1388994166156.htmlhtml

1. JMS基本概念
     JMS(Java Message Service) 即Java消息服務。它提供標準的產生、發送、接收消息的接口簡化 企業 應用的開發。它支持兩種消息通訊模型:點到點(point-to-point)(P2P)模型和發佈/訂閱(Pub/Sub)模型。P2P 模型規定了一個消息只能有一個接收者;Pub/Sub 模型容許一個消息能夠有多個接收者。 
    對於點到點模型,消息生產者產生一個消息後,把這個消息發送到一個Queue(隊列)中,而後消息接收者再從這個Queue中讀取,一旦這個消息被一個接收者讀取以後,它就在這個Queue中消失了,因此一個消息只能被一個接收者消費。 java

    與點到點模型不一樣,發佈/訂閱模型中,消息生產者產生一個消息後,把這個消息發送到一個Topic中,這個Topic能夠同時有多個接收者在監聽,當一個消息到達這個Topic以後,全部消息接收者都會收到這個消息。spring

2.編程的結構apache

2.1消息產生者向JMS發送消息的步驟 
(1)建立鏈接使用的工廠類JMS ConnectionFactory 
(2)使用管理對象JMS ConnectionFactory創建鏈接Connection 
(3)使用鏈接Connection 創建會話Session 
(4)使用會話Session和管理對象Destination建立消息生產者MessageSender 
(5)使用消息生產者MessageSender發送消息 
2.2消息消費者從JMS接受消息的步驟 
(1)建立鏈接使用的工廠類JMS ConnectionFactory 
(2)使用管理對象JMS ConnectionFactory創建鏈接Connection 
(3)使用鏈接Connection 創建會話Session 
(4)使用會話Session和管理對象Destination建立消息消費者MessageReceiver 
(5)使用消息消費者MessageReceiver接受消息,須要用setMessageListener將MessageListener接口綁定到MessageReceiver 
消息消費者必須實現了MessageListener接口,須要定義onMessage事件方法。編程

3.ActiveMQ的下載服務器

下載地址:http://activemq.apache.org/download.html
解壓縮到本地,而後啓動/bin/activemq.batsession

而且有的Eclipse沒有自帶jms.jar,須要去下載異步

4.發送端代碼:tcp

package com.xkey.JMS;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class JmsSender {
	
	private ConnectionFactory connectionFactory = null;
	private Connection connection = null;
	private Session session = null;
	private Destination destination = null;
	private MessageProducer producer = null;
	
	
	JmsSender(){
		
	}
	
	public void init(){
		connectionFactory = new ActiveMQConnectionFactory(
				ActiveMQConnection.DEFAULT_USER,
				ActiveMQConnection.DEFAULT_PASSWORD,"tcp://localhost:61616");
		
		try{
			
			connection = connectionFactory.createConnection();
			connection.start();
			session = connection.createSession(Boolean.TRUE.booleanValue(),  
                    Session.AUTO_ACKNOWLEDGE);
			//Queue
			destination = session.createQueue("xkey");
			producer = session.createProducer(destination);
			//Topic
			/**
			 * Topic topic = session.createTopic("xkey.Topic");
			 * producer = session.createProducer(topic);
			*/
			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
			sendMessage(session,producer);
			session.commit();
			
		}catch(Exception e){
			e.printStackTrace();
		}finally{
			try {
				connection.close();
			} catch (JMSException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
	
	private void sendMessage(Session session,MessageProducer producer) throws JMSException{
		for (int i = 1; i <= 5; i ++) {  
            TextMessage message = session.createTextMessage("First ActiveMQ Test:::: " + i);  
            // 發送消息
            System.out.println("Sender:" + "First ActiveMQ Test::: " + i);  
            producer.send(message);  
        }  
	}

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		// TODO Auto-generated method stub
		JmsSender jms = new JmsSender();
		jms.init();
	}

}

 

5.接收端代碼 分佈式

package com.xkey.JMS;

import javax.jms.Connection;  
import javax.jms.ConnectionFactory;  
import javax.jms.Destination;  
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;  
import javax.jms.MessageListener;
import javax.jms.Session;  
import javax.jms.TextMessage;  

import org.apache.activemq.ActiveMQConnection;  
import org.apache.activemq.ActiveMQConnectionFactory;  

public class JmsReceiver {

	private ConnectionFactory connectionFactory = null;
	private Connection connection = null;
	private Session session = null;
	private MessageConsumer consumer = null;
	private Destination destination = null;
	
	public JmsReceiver(){
		
	}
	
	public void init(){
		connectionFactory = new ActiveMQConnectionFactory(
				ActiveMQConnection.DEFAULT_USER,
				ActiveMQConnection.DEFAULT_PASSWORD,"tcp://localhost:61616");
		try{
			connection = connectionFactory.createConnection();
			connection.start();
			session = connection.createSession(Boolean.TRUE.booleanValue(),  
	                Session.AUTO_ACKNOWLEDGE); 
			destination = session.createQueue("xkey");
			consumer = session.createConsumer(destination);
			consumer.setMessageListener(new MessageListener(){

				@Override
				public void onMessage(Message msg) {
					// TODO Auto-generated method stub
					TextMessage message = (TextMessage)msg;
					try{
						System.out.println("Receiver " + message.getText());  
					}catch(Exception e){
						e.printStackTrace();
					}
				}
				
			});
			/**while (true) {  
                TextMessage message = (TextMessage) consumer.receive(1000);  
                if (null != message) {  
                    System.out.println("Receiver " + message.getText());  
                } else {  
                    break;  
                }  
            }  */
		}catch(Exception e){
			e.printStackTrace();
		}finally{
			try {
				connection.close();
			} catch (JMSException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		// TODO Auto-generated method stub
		JmsReceiver jms = new JmsReceiver();
		jms.init();
	}

}

  六、JMS+ActiveMQ+Spring+TomCat能夠用來實現企業級的消息隊列,這樣能使服務請求端和服務操做端實現低耦合。不知道怎麼實現分佈式,也就是多個請求,每一個請求用不一樣的服務器去相響應。 

Spring + JMS + ActiveMQ實現簡單的消息隊列(監聽器異步實現)

鏈接:   http://blog.csdn.net/acceptedxukai/article/details/7775746

在Tomcat 6.0下用JNDI鏈接IBM MQ 6.0的配置方法(一)

鏈接:  http://blog.sina.com.cn/s/blog_60dadc490100ecy6.html

http://www.javawind.net/help/html/spring_ref_2.0/html/jms.html

擴展閱讀

ActiveMQ接收消息+發送消息的簡單實例 
ActiveMQ持久化方式 
ActiveMQ持久化方式 
activemq存儲 
activemq特性 

爲您推薦

java發送內嵌圖片郵件
PHP圖片上傳與預覽 
CSS學習總結 
Arcgis Android API開發之離線地圖 
Struts2自定義過濾器 + 百度富文本控件UEditor + Smb上傳圖片到獨立服務器 

更多

ActiveMQ

相關文章
相關標籤/搜索