初識ActiveMQ及整合springboot

消息中間件的初步認識

什麼是消息中間件?java

  消息中間件是利用高效可靠的消息傳遞機制進行平臺無關的數據交流,並基於數據通訊來進行分佈式系統的集成。經過提供消息傳遞和消息排隊模型,能夠在分佈式架構下擴展進程之間的通訊。web

消息中間件能作什麼?spring

  消息中間件主要解決的就是分佈式系統之間消息傳遞的問題,它可以屏蔽各類平臺以及協議之間的特性,實現應用程序之間的協同。舉個很是簡單的例子,就拿一個電商平臺的註冊功能來簡單分析下,用戶註冊這一個服務,不僅僅只是 insert 一條數據到數據庫裏面就完事了,還須要發送激活郵件、發送新人紅包或者積分、發送營銷短信等一系列操做。假如說這裏面的每個操做,都須要消耗 1s,那麼整個註冊過程就須要耗時 4s 才能響應給用戶。數據庫

ActiveMQ 簡介

ActiveMQ 是徹底基於 JMS 規範實現的一個消息中間件產品。是 Apache 開源基金會研發的消息中間件。ActiveMQ主要應用在分佈式系統架構中,幫助構建高可用、高性能、可伸縮的企業級面向消息服務的系統ActiveMQ 特性apache

1. 多語言和協議編寫客戶端json

  語言:java/C/C++/C#/Ruby/Perl/Python/PHPspringboot

  應用協議 :session

  openwire/stomp/REST/ws/notification/XMPP/AMQP架構

2. 徹底支持 jms1.1 和 J2ee1.4 規範app

3. 對 spring 的支持,ActiveMQ 能夠很容易內嵌到 spring模塊中

ActiveMQ 安裝

1. 登陸到 http://activemq.apache.org/components/classic/download/,找到 ActiveMQ 的下載地址 

 我這裏用的是apache-activemq-5.15.10-bin.tar.gz ,jdk是1.8.0_161

2. 直 接 copy 到 服 務 器 上 通 過 tar -zxvf apache-activeMQ.tar.gz
3. 啓動運行
  a) 普通啓動:到 bin 目錄下, sh activemq start
  b) 啓 動 並 指 定 日 志 文 件 sh activemq start > /tmp/activemqlog
4. 檢查是否已啓動
  ActiveMQ默認採用 61616 端口提供 JMS服務,使用 8161端口提供管理控制檯服務,執行如下命令能夠檢查是否成功啓動 ActiveMQ 服務
  netstat -an|grep 61616

  能夠經過./activemq console來查看日誌。
5. 經過 http://192.168.11.156:8161 訪問 activeMQ 管理頁面 ,默認賬號密碼 admin/admin
6. 關閉 ActiveMQ; sh activemq stop

下面來看一下ActiveMQ的簡單應用:

  消息的發佈:

public static void main(String[] args) {
		
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.254.135:61616");
		Connection connection = null;
		try {

			connection = connectionFactory.createConnection();
			connection.start();
			// 延遲確認
			Session session = connection.createSession(Boolean.TRUE, Session.DUPS_OK_ACKNOWLEDGE);
			// 建立目的地
			Destination destination = session.createQueue("myQueue");
			// 建立消費者
			MessageProducer producer = session.createProducer(destination);
			TextMessage message = session.createTextMessage("Hello World");
			producer.send(message);
			// 表示消息被自動確認
			session.commit();
			session.close();
		} catch (JMSException e) {
			e.printStackTrace();
		} finally {
			if (connection != null) {
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}

  對應的客戶端消費:

public static void main(String[] args) {
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.254.135:61616");
		Connection connection = null;
		try {

			connection = connectionFactory.createConnection();
			connection.start();
			// 延遲確認
			Session session = connection.createSession(Boolean.TRUE, Session.DUPS_OK_ACKNOWLEDGE);
			// 建立目的地
			Destination destination = session.createQueue("myQueue");
			// 建立消費者
			MessageConsumer consumer = session.createConsumer(destination);
			TextMessage textMessage = (TextMessage) consumer.receive();
			System.out.println(textMessage.getText());
			// 表示消息被自動確認
			session.commit();
			session.close();
		} catch (JMSException e) {
			e.printStackTrace();
		} finally {
			if (connection != null) {
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}

  若是須要作到消息監聽的話:

public static void main(String[] args) { ConnectionFactory connectionFactory=
                new ActiveMQConnectionFactory ("tcp://192.168.1.101:61616"); Connection connection=null; try { connection=connectionFactory.createConnection(); connection.start(); Session session=connection.createSession (Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); //建立目的地
            Destination destination=session.createQueue("myQueue"); //建立發送者
            MessageConsumer consumer=session.createConsumer(destination); MessageListener messageListener=new MessageListener() { @Override public void onMessage(Message message) { try { System.out.println(((TextMessage)message).getText()); session.commit(); } catch (JMSException e) { e.printStackTrace(); } } }; consumer.setMessageListener(messageListener); System.in.read(); } catch (JMSException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { if(connection!=null){ try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } }

  基於訂閱發佈的消息發送:

public static void main(String[] args) {
        ConnectionFactory connectionFactory=
                new ActiveMQConnectionFactory
                        ("tcp://192.168.254.135:61616");
        Connection connection=null;
        try {
            connection=connectionFactory.createConnection();
            connection.start();
            Session session=connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
            //建立目的地
            Destination destination=session.createTopic("myTopic");
            //建立發送者
            MessageProducer producer=session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            //建立須要發送的消息
            TextMessage message=session.createTextMessage("topic -message");
            //Text   Map  Bytes  Stream  Object
            producer.send(message);
            session.commit();
//            session.rollback();
            session.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }finally {
            if(connection!=null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }

  基於訂閱發佈的消息消費:這裏須要先啓動消費者

public static void main(String[] args) {
        ConnectionFactory connectionFactory=
                new ActiveMQConnectionFactory
                        ("tcp://192.168.254.135:61616");
        Connection connection=null;
        try {
            connection=connectionFactory.createConnection();
            connection.setClientID("wuzz");
            connection.start();
            Session session=connection.createSession
                    (Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
            //建立目的地
            Topic destination=session.createTopic("myTopic");
            //建立發送者
            MessageConsumer consumer=session.createDurableSubscriber(destination,"wuzz");
            TextMessage textMessage=(TextMessage) consumer.receive();
            System.out.println(textMessage.getText());
            session.commit(); //消息被確認
            session.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }finally {
            if(connection!=null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }

  明白了ActiveMQ的基本使用,下面從源碼的層面去學習一下ActIiveMQ的原理

springboot整合ActiveMQ:

1.pom.xml

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <exclusions><!-- 去掉springboot默認配置 -->
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency> <!-- 引入log4j2依賴 -->
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-log4j2</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.0</version>
        </dependency>
    </dependencies>

2.application.yml:

server: port: 8881 spring: activemq: broker-url: tcp://192.168.1.101:61616
 user: admin password: admin pool: enabled: true packages: trust-all: true # 若是使用ObjectMessage傳輸對象,必需要加上這個信任包,不然會報ClassNotFound異常 jms: pub-sub-domain: true  # 啓動主題消息

3.ActiveMqConfig 配置類:

/** * User: Wuzhenzhao * Date: 2019/12/9 * Time: 17:05 * Description: * ClassPath:com.wuzz.demo.integratedway1.config.ActiveMqConfig */ @Configuration public class ActiveMqConfig { // queue模式的ListenerContainer
 @Bean public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setConnectionFactory(activeMQConnectionFactory); return bean; } // topic模式的ListenerContainer
 @Bean public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setPubSubDomain(true); bean.setConnectionFactory(activeMQConnectionFactory); return bean; } }

4. MqProducer 生產者:

@Service public class MqProducer { @Autowired private JmsMessagingTemplate jmsMessagingTemplate; /** * 發送字符串消息隊列 * * @param queueName 隊列名稱 * @param message 字符串 */
    public void sendStringQueue(String queueName, String message) { this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), message); } /** * 發送字符串集合消息隊列 * * @param queueName 隊列名稱 * @param list 字符串集合 */
    public void sendStringListQueue(String queueName, List<String> list) { this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), list); } /** * 發送對象消息隊列 * * @param queueName 隊列名稱 * @param obj 對象 */
    public void sendObjQueue(String queueName, Serializable obj) { this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), obj); } /** * 發送對象集合消息隊列 * * @param queueName 隊列名稱 * @param objList 對象集合 */
    public void sendObjListQueue(String queueName, List<Serializable> objList) { this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), objList); } /** * 發送字符串消息主題 * * @param topicName 主題名稱 * @param message 字符串 */
    public void sendStringTopic(String topicName, String message) { this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), message); } /** * 發送字符串集合消息主題 * * @param topicName 主題名稱 * @param list 字符串集合 */
    public void sendStringListTopic(String topicName, List<String> list) { this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), list); } /** * 發送對象消息主題 * * @param topicName 主題名稱 * @param obj 對象 */
    public void sendObjTopic(String topicName, Serializable obj) { this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), obj); } /** * 發送對象集合消息主題 * * @param topicName 主題名稱 * @param objList 對象集合 */
    public void sendObjListTopic(String topicName, List<Serializable> objList) { this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), objList); } }

4.隊列消費者 QueueConsumer:

@Component public class QueueConsumer { @JmsListener(destination = "stringQueue", containerFactory = "jmsListenerContainerQueue") public void receiveStringQueue(String msg) { System.out.println("接收到消息...." + msg); } // @JmsListener(destination = "stringListQueue", containerFactory = "jmsListenerContainerQueue") // public void receiveStringListQueue(List<String> list) { // System.out.println("接收到集合隊列消息...." + list); // } //
//
// @JmsListener(destination = "objQueue", containerFactory = "jmsListenerContainerQueue") // public void receiveObjQueue(ObjectMessage objectMessage) throws Exception { // System.out.println("接收到對象隊列消息...." + objectMessage.getObject()); // } //
//
// @JmsListener(destination = "objListQueue", containerFactory = "jmsListenerContainerQueue") // public void receiveObjListQueue(ObjectMessage objectMessage) throws Exception { // System.out.println("接收到的對象隊列消息..." + objectMessage.getObject()); // }
 }

5.主題消費者A ,這裏爲了測試topic消息,咱們使用兩個消費者去訂閱。ATopicConsumer:

@Component public class ATopicConsumer { @JmsListener(destination = "stringTopic", containerFactory = "jmsListenerContainerTopic") public void receiveStringTopic(String msg) { System.out.println("ATopicConsumer接收到消息...." + msg); } // @JmsListener(destination = "stringListTopic", containerFactory = "jmsListenerContainerTopic") // public void receiveStringListTopic(List<String> list) { // System.out.println("ATopicConsumer接收到集合主題消息...." + list); // } //
//
// @JmsListener(destination = "objTopic", containerFactory = "jmsListenerContainerTopic") // public void receiveObjTopic(ObjectMessage objectMessage) throws Exception { // System.out.println("ATopicConsumer接收到對象主題消息...." + objectMessage.getObject()); // } //
//
// @JmsListener(destination = "objListTopic", containerFactory = "jmsListenerContainerTopic") // public void receiveObjListTopic(ObjectMessage objectMessage) throws Exception { // System.out.println("ATopicConsumer接收到的對象集合主題消息..." + objectMessage.getObject()); // }
 }

  BTopicConsumer:

@Component public class BTopicConsumer { @JmsListener(destination = "stringTopic", containerFactory = "jmsListenerContainerTopic") public void receiveStringTopic(String msg) { System.out.println("BTopicConsumer接收到消息...." + msg); } // @JmsListener(destination = "stringListTopic", containerFactory = "jmsListenerContainerTopic") // public void receiveStringListTopic(List<String> list) { // System.out.println("BTopicConsumer接收到集合主題消息...." + list); // } //
//
// @JmsListener(destination = "objTopic", containerFactory = "jmsListenerContainerTopic") // public void receiveObjTopic(ObjectMessage objectMessage) throws Exception { // System.out.println("BTopicConsumer接收到對象主題消息...." + objectMessage.getObject()); // } //
//
// @JmsListener(destination = "objListTopic", containerFactory = "jmsListenerContainerTopic") // public void receiveObjListTopic(ObjectMessage objectMessage) throws Exception { // System.out.println("BTopicConsumer接收到的對象集合主題消息..." + objectMessage.getObject()); // } }

6.實體類 User:

public class User implements Serializable { private String id; private String name; private Integer age; public User() { } public User(String id, String name, Integer age) { this.id = id; this.name = name; this.age = age; }   //省略get set 跟 toString }

7.測試類:

@RestController public class TestController { @Autowired private MqProducer mqProducer; @RequestMapping(value = "/testStringQueue.json", method = {RequestMethod.GET}) public void testStringQueue() { for (int i = 1; i <= 100; i++) { System.out.println("" + i + "次發送字符串隊列消息"); mqProducer.sendStringQueue("stringQueue", "消息:" + i); } } // @RequestMapping(value = "/testStringListQueue.json", method = {RequestMethod.GET}) // public void testStringListQueue() { //
// List<String> idList = new ArrayList<>(); // idList.add("id1"); // idList.add("id2"); // idList.add("id3"); //
// System.out.println("正在發送集合隊列消息ing......"); // mqProducer.sendStringListQueue("stringListQueue", idList); // } //
//
// @RequestMapping(value = "/testObjQueue.json", method = {RequestMethod.GET}) // public void testObjQueue() { //
// System.out.println("正在發送對象隊列消息......"); // mqProducer.sendObjQueue("objQueue", new User("1", "小明", 20)); // } //
//
// @RequestMapping(value = "/testObjListQueue.json", method = {RequestMethod.GET}) // public void testObjListQueue() { //
// System.out.println("正在發送對象集合隊列消息......"); //
// List<Serializable> userList = new ArrayList<>(); // userList.add(new User("1", "小明", 21)); // userList.add(new User("2", "小雪", 22)); // userList.add(new User("3", "小花", 23)); //
// mqProducer.sendObjListQueue("objListQueue", userList); // }
 @RequestMapping(value = "/testStringTopic.json", method = {RequestMethod.GET}) public void testStringTopic() { for (int i = 1; i <= 100; i++) { System.out.println("" + i + "次發送字符串主題消息"); mqProducer.sendStringTopic("stringTopic", "消息:" + i); } } // @RequestMapping(value = "/testStringListTopic.json", method = {RequestMethod.GET}) // public void testStringListTopic() { //
// List<String> idList = new ArrayList<>(); // idList.add("id1"); // idList.add("id2"); // idList.add("id3"); //
// System.out.println("正在發送集合主題消息ing......"); // mqProducer.sendStringListTopic("stringListTopic", idList); // } //
//
// @RequestMapping(value = "/testObjTopic.json", method = {RequestMethod.GET}) // public void testObjTopic() { //
// System.out.println("正在發送對象主題消息......"); // mqProducer.sendObjTopic("objTopic", new User("1", "小明", 20)); // } //
//
// @RequestMapping(value = "/testObjListTopic.json", method = {RequestMethod.GET}) // public void testObjListTopic() { //
// System.out.println("正在發送對象集合主題消息......"); //
// List<Serializable> userList = new ArrayList<>(); // userList.add(new User("1", "小明", 21)); // userList.add(new User("2", "小雪", 22)); // userList.add(new User("3", "小花", 23)); //
// mqProducer.sendObjListTopic("objListTopic", userList); // }
}

  啓動後訪問對應接口就能夠。

相關文章
相關標籤/搜索