ActiveMQ消息中間件

最近學習到ActiveMQ,以前也沒有用過相關或者相似的工具,所以特意寫個文章進行相關的學習記錄。html

相關參考博文:http://www.javashuo.com/article/p-mgrbykqq-eh.htmlhttps://blog.csdn.net/qq_26641781/article/details/80408987https://blog.csdn.net/qinweili751/article/details/80620104spring

 

1.安裝ActiveMQapache

(1)進入官網http://activemq.apache.org/,選擇最新的版本下載windows

(2)再選擇對應的系統環境(我這裏選擇的是windows版本)瀏覽器

(3)下載完成後將其解壓(我這裏將它存放在D盤根目錄下),目錄結構以下springboot

(4)進入bin/win64/目錄,啓動activemq.bat文件(注意:MQ與jdk版本必需要匹配。我這裏下載的MQ是5.15版本,對應的jdk最低要求是1.8)。session

(5)啓動完成後,輸入瀏覽器地址http://localhost:8161/admin,會彈出用戶名/密碼輸入框異步

咱們的帳號密碼是存放在ActiveMQ根目錄的conf/jetty-realm.properties文件中。tcp

打開能夠看到最下面已經有兩個建立好了的用戶了。若是咱們須要添加本身的用戶,或是修改它們的角色,均可以按照上面所寫的格式"用戶名:密碼 [,角色]"來進行配置(角色被定義在~/conf/jetty.xml中)。spring-boot

這裏咱們使用默認賬號,admin/admin

(6)至此,咱們的ActiveMQ已經安裝完成。

 

2.使用ActiveMQ

  • ActiveMQ的使用通常分爲如下幾個步驟:
  • connectionFactory:建立鏈接工廠;
  • connection:從鏈接工廠中獲得鏈接;
  • session:從鏈接中得到一個會話;
  • destination:從會話中獲取一個destination。能夠是Queue(P2P)或Topic(Pub/Sub)
  • Producer:根據session和destination建立服務生產者。
  •   Message:根據session建立消息。
  •   send:消息生產者將message發送給MQ
  • Consumer:根據session和destination建立服務消費者。
  •   receive:接收MQ中的消息。能夠是同步接收,也能夠是建立監聽器異步接收。
  • 關閉資源。

因爲我這裏是Springboot的項目,所以有部分步驟已經在自動配置中處理好了。

(1)引入依賴

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.15.4</version>
        </dependency>

(2)添加相關配置

#默認端口是61616,而不是咱們訪問網站的8161端口
spring.activemq.broker-url=tcp://localhost:61616 spring.activemq.user=admin spring.activemq.password=admin spring.activemq.pool.enabled=false

(3)添加配置類(PS:若是不配置該類,默認只會使用P2P,即設置Queue爲destination。若是要使用Topic,則必需要配置下面的類)。

@Configuration
public class JmsConfig {

    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(false);  //Queue是P2P,所以Pub/Sub設置爲false。默認是false。
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }

    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(true);  //Topic是Pub/Sub,須要顯示聲明。
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }

}

(4)編寫服務消費者

這裏爲了能區別P2P和Pub/Sub,建立了兩個服務消費者。

消費者1

@Service
public class ConsumerService {

    @JmsListener(destination = "springboot.queue.test", containerFactory = "jmsListenerContainerQueue")
    public void receiveQueue(String msg) {
        System.out.println(LocalDateTime.now().toString() + " consumer接收到Queue消息:" + msg);
    }

    @JmsListener(destination = "springboot.topic.test", containerFactory = "jmsListenerContainerTopic")
    public void receiveTopic(String msg) {
        System.out.println(LocalDateTime.now().toString() + " consumer接收到Topic消息:" + msg);
    }

}

消費者2

@Service
public class Consumer2Service {

    @JmsListener(destination = "springboot.queue.test", containerFactory = "jmsListenerContainerQueue")
    public void receiveQueue(String msg) {
        System.out.println(LocalDateTime.now().toString() + " consumer2接收Queue消息:" + msg);
    }

    @JmsListener(destination = "springboot.topic.test", containerFactory = "jmsListenerContainerTopic")
    public void receiveTopic(String msg) {
        System.out.println(LocalDateTime.now().toString() + " consumer2接收到Topic消息:" + msg);
    }
}

(5)編寫服務生產者

@Service
public class ProducerService {

    @Autowired
    private JmsTemplate jmsTemplate;

    public void sendMessage(Destination destination, String msg) {
        System.out.println(LocalDateTime.now().toString() + " productor發送消息:" + msg);
        jmsTemplate.convertAndSend(destination, msg);
    }

}

(6)測試類

先測試P2P下的消息:

    @Test
    public void testMQQueue() {
        Destination destination = new ActiveMQQueue("springboot.queue.test");
        for (int i = 0; i < 3; i++) {
            producerService.sendMessage(destination, "hellow world " + i);
        }
    }

輸出結果

2019-05-22T17:29:39.324 productor發送消息:hellow world 0
2019-05-22T17:29:39.373 consumer2接收Queue消息:hellow world 0
2019-05-22T17:29:39.379 productor發送消息:hellow world 1
2019-05-22T17:29:39.385 productor發送消息:hellow world 2
2019-05-22T17:29:39.388 consumer接收到Queue消息:hellow world 1
2019-05-22T17:29:39.391 consumer2接收Queue消息:hellow world 2

能夠看到生產者每發出一個消息,都只會有一個消費者對消息進行處理。而且這裏採用的是輪詢的方式,即此次是消費者1接收了消息,下次就是消費者2接收,再下次又是消費者1。以此類推。

 

而後咱們再測試下Pub/Sub的消息:

    @Test
    public void testMQTopic() {
        Destination destination = new ActiveMQTopic("springboot.topic.test");
        for (int i = 0; i < 2; i++) {
            producerService.sendMessage(destination, "hellow world " + i);
        }
    }

輸出結果:

2019-05-22T17:35:58.535 productor發送消息:hellow world 0
2019-05-22T17:35:58.576 productor發送消息:hellow world 1
2019-05-22T17:35:58.581 consumer接收到Topic消息:hellow world 0
2019-05-22T17:35:58.582 consumer接收到Topic消息:hellow world 1
2019-05-22T17:35:58.582 consumer2接收到Topic消息:hellow world 0
2019-05-22T17:35:58.584 consumer2接收到Topic消息:hellow world 1

這裏能夠看到,每個消息被髮出來後,會被全部的服務消費者接收並處理。

相關文章
相關標籤/搜索