【ActiveMQ】使用學習java
轉載:web
一、啓動spring
activemq start
二、中止app
activemq stop
http://localhost:8161負載均衡
admin / admintcp
Queue - Point-to-Point (點對點)學習
一條消息只能被一個消費者消費, 且是持久化消息 - 當沒有可用的消費者時,該消息保存直到被消費爲止;當消息被消費者收到但不響應時(具體等待響應時間是多久,如何設置,暫時還沒去了解),該消息會一直保留或會轉到另外一個消費者當有多個消費者的狀況下。當一個Queue有多可用消費者時,能夠在這些消費者中起到負載均衡的做用。測試
Topic - Publisher/Subscriber Model (發佈/訂閱者)url
一條消息發佈時,全部的訂閱者都會收到,topic有2種模式,Nondurable subscription(非持久訂閱)和durable subscription (持久化訂閱 - 每一個持久訂閱者,都至關於一個持久化的queue的客戶端), 默認是非持久訂閱。spa
持久化:消息產生後,會保存到文件/DB中,直到消息被消費, 如上述Queue的持久化消息。默認保存在ActiveMQ中:%ActiveMQ_Home%/data/kahadb
非持久化:消息不會保存,若當下沒有可用的消費者時,消息丟失。
Spring Boot 中使用
配置 JmsTemplate 和 DefaultJmsListenerContainerFactory
package ycx.activemq.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.core.JmsTemplate; import javax.jms.ConnectionFactory; @Configuration @EnableJms public class ActiveMQConfig { public static final String MY_QUEUE = "ycx.queue"; public static final String MY_TOPIC = "ycx.topic"; @Bean("queueJmsTemplate") public JmsTemplate queueJmsTemplate(ConnectionFactory connectionFactory) { JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); jmsTemplate.setDefaultDestinationName(MY_QUEUE); return jmsTemplate; } @Bean("queueContainerFactory") public DefaultJmsListenerContainerFactory queueContainerFactory(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setSessionTransacted(true); factory.setConcurrency("1"); factory.setRecoveryInterval(1000L); return factory; } @Bean("topicJmsTemplate") public JmsTemplate topicJmsTemplate(ConnectionFactory connectionFactory) { JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); jmsTemplate.setDefaultDestinationName(MY_QUEUE); jmsTemplate.setPubSubDomain(true); return jmsTemplate; } @Bean("topicContainerFactory") public DefaultJmsListenerContainerFactory topicContainerFactory(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setSessionTransacted(true); factory.setConcurrency("1"); factory.setRecoveryInterval(1000L); factory.setPubSubDomain(true); return factory; } }
鏈接工廠使用自動注入進來的,若是不想使用默認的能夠自動配置
spring: activemq: broker-url: tcp://localhost:61616 user: admin password: admin
或者在 java中指定
@Bean public ConnectionFactory connectionFactory() { return new ActiveMQConnectionFactory(MY_USERNAME, MY_PASSWORD, MY_BROKER_URL); }
定義監聽器
Queue監聽器A
package ycx.activemq.listener; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; import ycx.activemq.config.ActiveMQConfig; @Component public class QueueMessageListenerA { @JmsListener(destination = ActiveMQConfig.MY_QUEUE, containerFactory = "queueContainerFactory") public void handleMessage(String msg) { System.out.println("queue A >>> " + msg); } }
Queue監聽器B
package ycx.activemq.listener; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; import ycx.activemq.config.ActiveMQConfig; @Component public class QueueMessageListenerB { @JmsListener(destination = ActiveMQConfig.MY_QUEUE, containerFactory = "queueContainerFactory") public void handleMessage(String msg) { System.out.println("queue B >>> " + msg); } }
Topic監聽器A
package ycx.activemq.listener; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; import ycx.activemq.config.ActiveMQConfig; @Component public class TopicMessageListenerA { @JmsListener(destination = ActiveMQConfig.MY_TOPIC, containerFactory = "topicContainerFactory") public void handleMessage(String msg) { System.out.println("topic A >>> " + msg); } }
Topic監聽器B
package ycx.activemq.listener; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; import ycx.activemq.config.ActiveMQConfig; @Component public class TopicMessageListenerB { @JmsListener(destination = ActiveMQConfig.MY_TOPIC, containerFactory = "topicContainerFactory") public void handleMessage(String msg) { System.out.println("topic B >>> " + msg); } }
Topic監聽器C
package ycx.activemq.listener; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; import ycx.activemq.config.ActiveMQConfig; @Component public class TopicMessageListenerC { @JmsListener(destination = ActiveMQConfig.MY_TOPIC, containerFactory = "topicContainerFactory") public void handleMessage(String msg) { System.out.println("topic C >>> " + msg); } }
測試
package ycx.activemq; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.jms.core.JmsTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import ycx.activemq.config.ActiveMQConfig; import java.time.LocalTime; import java.util.HashMap; import java.util.Map; @SpringBootApplication @RestController public class ActivemqServerApplication { public static void main(String[] args) { SpringApplication.run(ActivemqServerApplication.class, args); } @Autowired @Qualifier("queueJmsTemplate") JmsTemplate queueJmsTemplate; @Autowired @Qualifier("topicJmsTemplate") JmsTemplate topicJmsTemplate; @RequestMapping("/queue") public Object queue() { String content = "queue send: " + LocalTime.now().toString(); queueJmsTemplate.convertAndSend(ActiveMQConfig.MY_QUEUE, content); Map<String, String> res = new HashMap<>(); res.put("content", content); return res; } @RequestMapping("/topic") public Object topic() { String content = "topic send : " + LocalTime.now().toString(); topicJmsTemplate.convertAndSend(ActiveMQConfig.MY_TOPIC, content); Map<String, String> res = new HashMap<>(); res.put("content", content); return res; } }
訪問地址: http://localhost:8080/topic
訂閱 收到消息,全部的監聽器都受到消息
topic B >>> topic send : 12:22:05.024 topic C >>> topic send : 12:22:05.024 topic A >>> topic send : 12:22:05.024
訪問地址:http://localhost:8080/queue
隊列 收到消息,只有一個監聽器收到消息
queue B >>> queue send: 12:22:58.491