ActiveMQ (二)


 今天繼續給你們分享的是ActiveMQ,若有不足,敬請指教。java

 上次咱們說到,咱們發現消費者每次只能消費一條消息。當隊列中有多條消息的時候,咱們須要屢次運行消費者,才能消費完這些消息。很麻煩!!!!如何解決這個問題呢?web

 這就須要使用ActiveMQ監聽器來監聽隊列,持續消費消息。redis

1、ActiveMQ監聽器

1.1 配置步驟說明

  • 建立一個監聽器對象。
  • 修改消費者代碼,加載監聽器

1.2 配置步驟

1.2.1 建立監聽器MyListener類

package com.xkt.listener;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * @author lzx
 *
 */
public class MyListener implements MessageListener {

	@Override
	public void onMessage(Message message) {

		if (null != message) {
			if (message instanceof TextMessage) {
				try {
					TextMessage tMsg = (TextMessage) message;
					String content = tMsg.getText();
					System.out.println("監聽到的消息是 " + content);
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}

}

1.2.2 修改MyConsumer代碼,加載監聽器

  • 監聽器須要持續加載,所以消費程序不能結束。這裏咱們使用輸入流阻塞消費線程結束
package com.xkt.consumer;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

import com.xkt.listener.MyListener;

/**
 * @author lzx
 *
 */
public class Myconsumer {

	private ConnectionFactory factory;

	private Connection connection;

	private Session session;

	private Destination destination;

	private MessageConsumer consumer;

	public void receiveFromMq() {

		try {
			factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616");

			connection = factory.createConnection();
			connection.start();

			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

			// 建立目的地, 目的地命名即隊列命名, 消息消費者須要經過此命名訪問對應的隊列
			destination = session.createQueue("queue");

			// 5.建立消息消費者, 建立的消息消費者與某目的地對應, 即方法參數目的地
			consumer = session.createConsumer(destination);

			// 7.加載監聽器
			consumer.setMessageListener(new MyListener());
			// 監聽器須要持續加載,這裏咱們使用輸入流阻塞當前線程結束。監聽指定隊列,只要有消息進來,就消費這條消息
			System.in.read();

			// 在java項目中,能夠經過IO阻塞程序,持續加載監聽器
			// 在web項目中,能夠經過配置文件,直接加載監聽器。

		} catch (Exception e) {
			e.printStackTrace();
			System.out.println("讀取失敗");
		} finally {
			if (null != consumer) {
				try {
					consumer.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}

			if (null != session) {
				try {
					session.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}

			if (null != connection) {
				try {
					connection.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}

	}

}

1.2.3 測試

  • 屢次運行生產者,發送多條消息到隊列中
圖示
  • 運行消費者。觀察結果
圖示
  • 查看ActiveMQ管理控制界面,全部消息都被消費了!
圖示

 在以上示例中,只能向一個消費者發送消息。可是有一些場景,需求有多個消費者都能接收到消息,好比:美團APP天天的消息推送。該如何實現呢?apache

2、Topic模式實現

2.1 配置步驟說明

  1. 搭建ActiveMQ消息服務器。(略)
  2. 建立主題訂閱者。
  3. 建立主題發佈者。

2.2 配置步驟

2.2.1 建立主題訂閱者MySubscriber

  • 說明:主題訂閱模式下,能夠有多個訂閱者。咱們這裏用多線程來模擬
package com.xkt.subscriber;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * @author lzx
 *
 */
public class MySubscriber implements Runnable {

	/**
	 * 多線程的線程安全問題 解決方案:
	 * 
	 * (1)加鎖 --極不推薦 (2)不使用全局變量 ---> SpringMVC是線程安全的嗎? 答:默認不是 解決辦法:(1)使用原型模式--不推薦
	 * (2)不使用全局變量 (3)ThreadLocal (3)其它框架來代替,好比redis
	 */
	private TopicConnectionFactory factory;

	private TopicConnection connection;

	private TopicSession session;

	private Topic topic;

	private TopicSubscriber subscriber;

	private Message message;

	@Override
	public void run() {

		try {
			// 一、建立鏈接工廠
			factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616");

			// 二、建立鏈接
			connection = factory.createTopicConnection();
			connection.start();

			// 三、建立會話
			session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);

			// 四、建立topic主題
			topic = session.createTopic("topic-gzsxt");

			// 五、建立訂閱者
			subscriber = session.createSubscriber(topic);

			// 六、訂閱
			while (true) {

				message = subscriber.receive();

				if (null != message) {
					if (message instanceof TextMessage) {
						TextMessage tMsg = (TextMessage) message;

						String content = tMsg.getText();

						System.out.println("訂閱者: " + Thread.currentThread().getName() + " 接收的消息是:" + content);
					}
				}
			}

		} catch (JMSException e) {

			e.printStackTrace();
		}

	}

}

2.2.2 修改測試類

package com.xkt.test;

import com.xkt.subscriber.MySubscriber;

/**
 * @author lzx
 *
 */
public class TestMQ {

	public static void main(String[] args) {

		MySubscriber sub = new MySubscriber();

		Thread t1 = new Thread(sub);

		Thread t2 = new Thread(sub);

		t1.start();

		t2.start();
	}

}

2.2.3 查看測試結果

  • 查看AcitveMQ管理界面 |圖示 | | :------------: | | |

2.2.4 建立主題發佈者MyPublisher

package com.xkt.publish;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * @author lzx
 *
 */
public class MyPublisher {

	private TopicConnectionFactory factory;

	private TopicConnection connection;

	private TopicSession session;

	private Topic topic;

	private TopicPublisher publisher;

	private Message message;

	public void publish(String msg) {

		try {

			// 一、建立鏈接工廠
			factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.109.3:61616");

			// 二、建立鏈接
			connection = factory.createTopicConnection();
			connection.start();

			// 三、建立會話
			session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);

			// 四、建立topic主題
			topic = session.createTopic("topic-gzsxt");

			// 五、建立發佈者
			publisher = session.createPublisher(topic);

			// 六、建立消息對象
			message = session.createTextMessage(msg);

			// 七、發佈消息
			publisher.publish(message);

		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if (null != publisher) {
				try {
					publisher.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}

			if (null != session) {
				try {
					session.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}

			if (null != connection) {
				try {
					connection.stop();

					connection.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}

			if (null != session) {
				try {
					session.close();
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}

	}
}

2.2.5 修改測試類

package com.xkt.test;

import org.junit.Test;

import com.xkt.publish.MyPublisher;
import com.xkt.subscriber.MySubscriber;

/**
 * @author lzx
 *
 */
public class TestMQ {

	public static void main(String[] args) {

		MySubscriber sub = new MySubscriber();

		Thread t1 = new Thread(sub);

		Thread t2 = new Thread(sub);

		t1.start();

		t2.start();
	}

	@Test
	public void publish() {
		MyPublisher publisher = new MyPublisher();

		publisher.publish("hello,歡迎收聽FM 89.9頻道-交通頻道");
	}
}

2.2.6 查看測試結果安全

2.3 Topic小結

  1. Topic模式可以實現多個訂閱者同時消費消息。
  2. Topic主題模式下,消息不會保存,只有在線的訂閱者纔會接收到消息。
  3. 一般能夠用來解決公共消息推送的相關業務。

版權說明:歡迎以任何方式進行轉載,但請在轉載後註明出處!服務器

相關文章
相關標籤/搜索