package com.shi.page; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQTextMessage; import org.junit.Test; /** * * @author: SHF * @date: 2018年3月16日 上午8:48:10 * @Description:消息隊列測試類 */ public class ActiveMQTest { /** * 點到點形式 發送 消息 生產者 * @throws Exception */ @Test public void queueProducerTest()throws Exception{ //1.建立一個鏈接工廠對象,須要指定服務的ip和端口 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.36.40:61616"); //2.使用工廠對象建立一個Connection對象 Connection connection = connectionFactory.createConnection(); //3.開啓鏈接,調用Connection對象的start方法 connection.start(); //4.建立一個Session對象 //第一個參數:是否開啓事物。若是開啓事物第二個參數無心義。通常不開啓事物。 //第二個參數:應答模式,通常:自動應答,手動應答。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.使用Session對象建立一個Destination對象,倆種形式queue,topic,如今使用queue Queue queue = session.createQueue("test-queue");//Queue extends Destination //6.使用Session對象建立一個producer對象 MessageProducer producer = session.createProducer(queue); //7.建立一個Message對像,能夠使用TextMessage /*TextMessage textMessage=new ActiveMQTextMessage(); textMessage.setText("你要發送的消息");*/ TextMessage textMessage = session.createTextMessage("queue你要發送的消息"); //8.發送消息 producer.send(textMessage); //9.關閉資源 producer.close(); session.close(); connection.close(); } /** * 點到點接受消息 消費者 * @throws Exception */ @Test public void queueConsumerTest()throws Exception{ //1 建立一個ConnectionFactory對象鏈接MQ服務器 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.36.40:61616"); //2 建立一個鏈接對象 Connection connection = connectionFactory.createConnection(); //3 開啓鏈接 connection.start(); //4 使用Connection對象建立一個Session對象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5 建立一個Destination對象 queue對象 Queue queue = session.createQueue("test-queue"); //6 使用Session對象建立一個消費者對象 MessageConsumer consumer = session.createConsumer(queue); //7 接受消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message paramMessage) { // 接受到消息的回調函數 TextMessage testMessage=(TextMessage) paramMessage; try { //8 打印消息 System.out.println(testMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); System.in.read();//等待接受消息 //9 關閉鏈接 consumer.close(); session.close(); connection.close(); } /** * 一對多 發送消息 生產者 * @throws Exception */ @Test public void topicProducerTest()throws Exception{ //1.建立一個鏈接工廠對象,須要指定服務的ip和端口 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.36.40:61616"); //2.使用工廠對象建立一個Connection對象 Connection connection = connectionFactory.createConnection(); //3.開啓鏈接,調用Connection對象的start方法 connection.start(); //4.建立一個Session對象 //第一個參數:是否開啓事物。若是開啓事物第二個參數無心義。通常不開啓事物。 //第二個參數:應答模式,通常:自動應答,手動應答。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.使用Session對象建立一個Destination對象,倆種形式queue,topic,如今使用topic Topic topic = session.createTopic("test-topic");//Queue extends Destination //6.使用Session對象建立一個producer對象 MessageProducer producer = session.createProducer(topic); //7.建立一個Message對像,能夠使用TextMessage /*TextMessage textMessage=new ActiveMQTextMessage(); textMessage.setText("你要發送的消息");*/ TextMessage textMessage = session.createTextMessage("topic你要發送的消息"); //8.發送消息 producer.send(textMessage); //9.關閉資源 producer.close(); session.close(); connection.close(); } /** * 一對多接受消息 消費者 * @throws Exception */ @Test public void topicConsumerTest()throws Exception{ //1 建立一個ConnectionFactory對象鏈接MQ服務器 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.36.40:61616"); //2 建立一個鏈接對象 Connection connection = connectionFactory.createConnection(); //3 開啓鏈接 connection.start(); //4 使用Connection對象建立一個Session對象 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5 建立一個Destination對象 topic對象 Topic topic = session.createTopic("test-topic"); //6 使用Session對象建立一個消費者對象 MessageConsumer consumer = session.createConsumer(topic); //7 接受消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message paramMessage) { // 接受到消息的回調函數 TextMessage testMessage=(TextMessage) paramMessage; try { //8 打印消息 System.out.println(testMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); System.out.println("topic消費者3 已經啓動..."); System.in.read();//等待接受消息 //9 關閉鏈接 consumer.close(); session.close(); connection.close(); } }