【ActiveMQ】使用學習

【ActiveMQ】使用學習java

轉載:web

一、啓動spring

activemq start

二、中止app

activemq stop

 

http://localhost:8161負載均衡

admin / admintcp

 

Queue - Point-to-Point (點對點)學習

一條消息只能被一個消費者消費, 且是持久化消息 - 當沒有可用的消費者時,該消息保存直到被消費爲止;當消息被消費者收到但不響應時(具體等待響應時間是多久,如何設置,暫時還沒去了解),該消息會一直保留或會轉到另外一個消費者當有多個消費者的狀況下。當一個Queue有多可用消費者時,能夠在這些消費者中起到負載均衡的做用。測試

Topic - Publisher/Subscriber Model (發佈/訂閱者)url

一條消息發佈時,全部的訂閱者都會收到,topic有2種模式,Nondurable subscription(非持久訂閱)和durable subscription (持久化訂閱 - 每一個持久訂閱者,都至關於一個持久化的queue的客戶端), 默認是非持久訂閱。spa

持久化:消息產生後,會保存到文件/DB中,直到消息被消費, 如上述Queue的持久化消息。默認保存在ActiveMQ中:%ActiveMQ_Home%/data/kahadb

非持久化:消息不會保存,若當下沒有可用的消費者時,消息丟失。

Spring Boot 中使用

配置 JmsTemplate 和 DefaultJmsListenerContainerFactory

package ycx.activemq.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;

import javax.jms.ConnectionFactory;

@Configuration
@EnableJms
public class ActiveMQConfig {
    public static final String MY_QUEUE = "ycx.queue";
    public static final String MY_TOPIC = "ycx.topic";

    @Bean("queueJmsTemplate")
    public JmsTemplate queueJmsTemplate(ConnectionFactory connectionFactory) {
        JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
        jmsTemplate.setDefaultDestinationName(MY_QUEUE);
        return jmsTemplate;
    }

    @Bean("queueContainerFactory")
    public DefaultJmsListenerContainerFactory queueContainerFactory(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setSessionTransacted(true);
        factory.setConcurrency("1");
        factory.setRecoveryInterval(1000L);
        return factory;
    }

    @Bean("topicJmsTemplate")
    public JmsTemplate topicJmsTemplate(ConnectionFactory connectionFactory) {
        JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
        jmsTemplate.setDefaultDestinationName(MY_QUEUE);
        jmsTemplate.setPubSubDomain(true);
        return jmsTemplate;
    }

    @Bean("topicContainerFactory")
    public DefaultJmsListenerContainerFactory topicContainerFactory(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setSessionTransacted(true);
        factory.setConcurrency("1");
        factory.setRecoveryInterval(1000L);
        factory.setPubSubDomain(true);
        return factory;
    }
}

 

鏈接工廠使用自動注入進來的,若是不想使用默認的能夠自動配置

spring:
  activemq:
    broker-url: tcp://localhost:61616
    user: admin
    password: admin

或者在 java中指定

@Bean
public ConnectionFactory connectionFactory() {
    return new ActiveMQConnectionFactory(MY_USERNAME, MY_PASSWORD, MY_BROKER_URL);
}

 

定義監聽器

Queue監聽器A

package ycx.activemq.listener;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import ycx.activemq.config.ActiveMQConfig;

@Component
public class QueueMessageListenerA {

    @JmsListener(destination = ActiveMQConfig.MY_QUEUE, containerFactory = "queueContainerFactory")
    public void handleMessage(String msg) {
        System.out.println("queue A >>> " + msg);
    }
}

Queue監聽器B

package ycx.activemq.listener;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import ycx.activemq.config.ActiveMQConfig;

@Component
public class QueueMessageListenerB {

    @JmsListener(destination = ActiveMQConfig.MY_QUEUE, containerFactory = "queueContainerFactory")
    public void handleMessage(String msg) {
        System.out.println("queue B >>> " + msg);
    }
}

Topic監聽器A

package ycx.activemq.listener;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import ycx.activemq.config.ActiveMQConfig;

@Component
public class TopicMessageListenerA {
    @JmsListener(destination = ActiveMQConfig.MY_TOPIC, containerFactory = "topicContainerFactory")
    public void handleMessage(String msg) { System.out.println("topic A >>> " + msg); }
}

Topic監聽器B

package ycx.activemq.listener;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import ycx.activemq.config.ActiveMQConfig;

@Component
public class TopicMessageListenerB {
    @JmsListener(destination = ActiveMQConfig.MY_TOPIC, containerFactory = "topicContainerFactory")
    public void handleMessage(String msg) {
        System.out.println("topic B >>> " + msg);
    }
}

Topic監聽器C

package ycx.activemq.listener;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import ycx.activemq.config.ActiveMQConfig;

@Component
public class TopicMessageListenerC {
    @JmsListener(destination = ActiveMQConfig.MY_TOPIC, containerFactory = "topicContainerFactory")
    public void handleMessage(String msg) {
        System.out.println("topic C >>> " + msg);
    }
}

 

測試

package ycx.activemq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import ycx.activemq.config.ActiveMQConfig;

import java.time.LocalTime;
import java.util.HashMap;
import java.util.Map;

@SpringBootApplication
@RestController
public class ActivemqServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ActivemqServerApplication.class, args);
    }

    @Autowired
    @Qualifier("queueJmsTemplate")
    JmsTemplate queueJmsTemplate;

    @Autowired
    @Qualifier("topicJmsTemplate")
    JmsTemplate topicJmsTemplate;

    @RequestMapping("/queue")
    public Object queue() {
        String content = "queue send: " + LocalTime.now().toString();
        queueJmsTemplate.convertAndSend(ActiveMQConfig.MY_QUEUE, content);

        Map<String, String> res = new HashMap<>();
        res.put("content", content);
        return res;
    }

    @RequestMapping("/topic")
    public Object topic() {
        String content = "topic send : " + LocalTime.now().toString();
        topicJmsTemplate.convertAndSend(ActiveMQConfig.MY_TOPIC, content);

        Map<String, String> res = new HashMap<>();
        res.put("content", content);
        return res;
    }
}

訪問地址: http://localhost:8080/topic

訂閱 收到消息,全部的監聽器都受到消息

topic B >>> topic send : 12:22:05.024
topic C >>> topic send : 12:22:05.024
topic A >>> topic send : 12:22:05.024

訪問地址:http://localhost:8080/queue

隊列 收到消息,只有一個監聽器收到消息

queue B >>> queue send: 12:22:58.491
相關文章
相關標籤/搜索