ActiveMQ的入門程序

package com.shi.page;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.junit.Test;

/**
 * 
 * @author: SHF
 * @date: 2018年3月16日 上午8:48:10
 * @Description:消息隊列測試類
 */
public class ActiveMQTest {

	/**
	 * 點到點形式 發送 消息 生產者
	 * @throws Exception
	 */
	@Test
	public void queueProducerTest()throws Exception{
		//1.建立一個鏈接工廠對象,須要指定服務的ip和端口
		ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.36.40:61616");
		//2.使用工廠對象建立一個Connection對象
		Connection connection = connectionFactory.createConnection();
		//3.開啓鏈接,調用Connection對象的start方法
		connection.start();
		//4.建立一個Session對象
				//第一個參數:是否開啓事物。若是開啓事物第二個參數無心義。通常不開啓事物。
		 		//第二個參數:應答模式,通常:自動應答,手動應答。
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//5.使用Session對象建立一個Destination對象,倆種形式queue,topic,如今使用queue
		Queue queue = session.createQueue("test-queue");//Queue extends Destination
		//6.使用Session對象建立一個producer對象
		MessageProducer producer = session.createProducer(queue);
		//7.建立一個Message對像,能夠使用TextMessage
		/*TextMessage textMessage=new ActiveMQTextMessage();
		textMessage.setText("你要發送的消息");*/
		TextMessage textMessage = session.createTextMessage("queue你要發送的消息");
		//8.發送消息
		producer.send(textMessage);
		//9.關閉資源
		producer.close();
		session.close();
		connection.close();
	}
	
	/**
	 * 點到點接受消息  消費者
	 * @throws Exception
	 */
	@Test
	public void queueConsumerTest()throws Exception{
		//1 建立一個ConnectionFactory對象鏈接MQ服務器
		ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.36.40:61616");
		//2 建立一個鏈接對象
		Connection connection = connectionFactory.createConnection();
		//3 開啓鏈接
		connection.start();
		//4 使用Connection對象建立一個Session對象
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//5 建立一個Destination對象 queue對象
		Queue queue = session.createQueue("test-queue");
		//6 使用Session對象建立一個消費者對象
		MessageConsumer consumer = session.createConsumer(queue);
		//7 接受消息
		consumer.setMessageListener(new MessageListener() {
			
			@Override
			public void onMessage(Message paramMessage) {
				// 接受到消息的回調函數
				TextMessage testMessage=(TextMessage) paramMessage;
				try {
					//8 打印消息
					System.out.println(testMessage.getText());
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		});
		System.in.read();//等待接受消息
		//9 關閉鏈接
		consumer.close();
		session.close();
		connection.close();
	}
	
	/**
	 * 一對多 發送消息  生產者
	 * @throws Exception
	 */
	@Test
	public void topicProducerTest()throws Exception{
		//1.建立一個鏈接工廠對象,須要指定服務的ip和端口
		ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.36.40:61616");
		//2.使用工廠對象建立一個Connection對象
		Connection connection = connectionFactory.createConnection();
		//3.開啓鏈接,調用Connection對象的start方法
		connection.start();
		//4.建立一個Session對象
				//第一個參數:是否開啓事物。若是開啓事物第二個參數無心義。通常不開啓事物。
		 		//第二個參數:應答模式,通常:自動應答,手動應答。
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//5.使用Session對象建立一個Destination對象,倆種形式queue,topic,如今使用topic
		Topic topic = session.createTopic("test-topic");//Queue extends Destination
		//6.使用Session對象建立一個producer對象
		MessageProducer producer = session.createProducer(topic);
		//7.建立一個Message對像,能夠使用TextMessage
		/*TextMessage textMessage=new ActiveMQTextMessage();
		textMessage.setText("你要發送的消息");*/
		TextMessage textMessage = session.createTextMessage("topic你要發送的消息");
		//8.發送消息
		producer.send(textMessage);
		//9.關閉資源
		producer.close();
		session.close();
		connection.close();
	}
	
	
	/**
	 * 一對多接受消息  消費者
	 * @throws Exception
	 */
	@Test
	public void topicConsumerTest()throws Exception{
		//1 建立一個ConnectionFactory對象鏈接MQ服務器
		ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.36.40:61616");
		//2 建立一個鏈接對象
		Connection connection = connectionFactory.createConnection();
		//3 開啓鏈接
		connection.start();
		//4 使用Connection對象建立一個Session對象
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//5 建立一個Destination對象 topic對象
		Topic topic = session.createTopic("test-topic");
		//6 使用Session對象建立一個消費者對象
		MessageConsumer consumer = session.createConsumer(topic);
		//7 接受消息
		consumer.setMessageListener(new MessageListener() {
			
			@Override
			public void onMessage(Message paramMessage) {
				// 接受到消息的回調函數
				TextMessage testMessage=(TextMessage) paramMessage;
				try {
					//8 打印消息
					System.out.println(testMessage.getText());
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		});
		System.out.println("topic消費者3 已經啓動...");
		System.in.read();//等待接受消息
		//9 關閉鏈接
		consumer.close();
		session.close();
		connection.close();
	}
}
相關文章
相關標籤/搜索