1.POM文件java
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.4.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.atguigu</groupId> <artifactId>springboot-activemq</artifactId> <version>0.0.1-SNAPSHOT</version> <name>springboot-activemq</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <!--activemq依賴--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <!--消息隊列鏈接池--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.12.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!--log快捷方式插件--> <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.12</version> </dependency> <!--spring boot 熱部署功能--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.45</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
2.yml文件web
spring: ## URL of the ActiveMQ broker. Auto-generated by default. For instance `tcp://localhost:61616` # failover:(tcp://localhost:61616,tcp://localhost:61617) # tcp://localhost:61616 jms: pub-sub-domain: true activemq: broker-url: failover:tcp://192.168.68.137:61616 in-memory: true pool: enabled: false packages: trust-all: true user: admin password: admin server: port: 9011
3.定義容器與屬性文件spring
package com.atguigu.springbootactivemq.config; import org.apache.activemq.command.ActiveMQQueue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.jms.Queue; @Configuration public class BeanConfig { //定義存放消息 @Bean public Queue queue(){ return new ActiveMQQueue("ActiveMQQueue"); } }
自定義配置(獲取yml文件的mq屬性)apache
package com.atguigu.springbootactivemq.config; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.RedeliveryPolicy; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.connection.CachingConnectionFactory; import org.springframework.jms.support.destination.DestinationResolver; import javax.jms.ConnectionFactory; import javax.jms.Destination; /** * 自定義配置JMS * * @author : caigq * @version : 1.0 * @date : 2018-06-01 9:32 */ @Configuration @EnableJms public class MyJmsListenerConfigurer { @Value("${spring.activemq.broker-url}") private String activeMQURL; @Value("${spring.activemq.user}") private String userName; @Value("${spring.activemq.password}") private String password; /** * JMS 隊列的監聽容器工廠 */ @Bean(name = "MyjmsQueueListener") public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ConnectionFactory jmsConnectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(jmsConnectionFactory); factory.setConnectionFactory(cachingConnectionFactory); factory.setSessionTransacted(true); factory.setConcurrency("5"); DestinationResolver destinationResolver = (session, destinationName, pubSubDomain) -> { Destination destination = session.createQueue(destinationName); return destination; }; factory.setDestinationResolver(destinationResolver); return factory; } @Bean(name = "MyjmsTopicListener") public DefaultJmsListenerContainerFactory jmsTopicListenerContainerFactory(ConnectionFactory jmsConnectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(jmsConnectionFactory); factory.setConnectionFactory(cachingConnectionFactory); factory.setPubSubDomain(true); factory.setSessionTransacted(true); factory.setConcurrency("6"); return factory; } @Bean public ConnectionFactory jmsConnectionFactory(){ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); connectionFactory.setBrokerURL(activeMQURL); connectionFactory.setUserName(userName); connectionFactory.setPassword(password); connectionFactory.setTrustAllPackages(true); connectionFactory.setMaxThreadPoolSize(ActiveMQConnection.DEFAULT_THREAD_POOL_SIZE); RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); //定義ReDelivery(重發機制)機制 ,重發時間間隔是100毫秒,最大重發次數是3次 //是否在每次嘗試從新發送失敗後,增加這個等待時間 redeliveryPolicy.setUseExponentialBackOff(true); //重發次數,默認爲6次 這裏設置爲1次 redeliveryPolicy.setMaximumRedeliveries(1); //重發時間間隔,默認爲1秒 redeliveryPolicy.setInitialRedeliveryDelay(1000); //第一次失敗後從新發送以前等待500毫秒,第二次失敗再等待500 * 2毫秒,這裏的2就是value redeliveryPolicy.setBackOffMultiplier(2); //最大傳送延遲,只在useExponentialBackOff爲true時有效(V5.5),假設首次重連間隔爲10ms,倍數爲2,那麼第 //二次重連時間間隔爲 20ms,第三次重連時間間隔爲40ms,當重連時間間隔大的最大重連時間間隔時,之後每次重連時間間隔都爲最大重連時間間隔。 redeliveryPolicy.setMaximumRedeliveryDelay(1000); connectionFactory.setRedeliveryPolicy(redeliveryPolicy); return connectionFactory; } }
4.建立消息的監聽類(消費者)json
package com.atguigu.springbootactivemq.consumer;
import com.atguigu.springbootactivemq.pojo.student;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class QueueAddFriendReceiver {
@JmsListener(destination = "QUEUE_RECEIVE_ADD_FIREND", containerFactory = "MyjmsQueueListener") //紅色爲監聽的隊列名稱
public void receiveAddFriend(student student) {
System.out.println("啦啦啦啦"+student.toString());
log.error("receiveAddFriend Exception:{}");
}
}
5.建立消息提供者springboot
package com.atguigu.springbootactivemq.controller; import com.atguigu.springbootactivemq.pojo.student; import com.atguigu.springbootactivemq.produce.QueueProducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class ProviderController { @Autowired private QueueProducer queueProducer; //注入存放消息的隊列,用於下列方法一 @GetMapping("/value") public String value() { String queueName="QUEUE_RECEIVE_ADD_FIREND"; //自定義隊列名稱 student student = new student(); student.setName("小明"); student.setAge(11); queueProducer.sendObjectMessage(queueName, student); //發送到MQS return "消息已經發送"; } }
6.消息發送的工具類session
package com.atguigu.springbootactivemq.produce; import lombok.extern.slf4j.Slf4j; import org.apache.activemq.command.ActiveMQMapMessage; import org.apache.activemq.command.ActiveMQObjectMessage; import org.apache.activemq.command.ActiveMQQueue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import javax.jms.Destination; import java.io.Serializable; import java.util.Date; /** * 隊列模式提供者 */ @Component @Slf4j public class QueueProducer { /** * MQ jms實例 **/ @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private ThreadPoolTaskExecutor threadPoolTaskExecutor; public void sendMapMessage(String queueName, Object message) { threadPoolTaskExecutor.submit(() -> { try { Destination destination = new ActiveMQQueue(queueName); // 這裏定義了Queue的key ActiveMQMapMessage mqMapMessage = new ActiveMQMapMessage(); mqMapMessage.setJMSDestination(destination); mqMapMessage.setObject("result", message); this.jmsMessagingTemplate.convertAndSend(destination, mqMapMessage); } catch (Throwable e) { log.error("{}", e); } }); } public void sendObjectMessage(String queueName, Object message) { threadPoolTaskExecutor.submit(() -> { try { log.info("發送添加好友請求:{}",message.toString()); Destination destination = new ActiveMQQueue(queueName); // 這裏定義了Queue的key ActiveMQObjectMessage mqObjectMessage = new ActiveMQObjectMessage(); mqObjectMessage.setJMSDestination(destination); mqObjectMessage.setObject((Serializable) message); this.jmsMessagingTemplate.convertAndSend(destination, mqObjectMessage); } catch (Throwable e) { log.error("{}", e); } }); } public void sendObjectMessage(Destination destination, Object message) { threadPoolTaskExecutor.submit(() -> { Date date = new Date(); try { // 這裏定義了Queue的key log.info("【queue-->send】:activeCount={},queueCount={},completedTaskCount={},taskCount={}", threadPoolTaskExecutor.getThreadPoolExecutor().getActiveCount(), threadPoolTaskExecutor.getThreadPoolExecutor().getQueue().size(), threadPoolTaskExecutor.getThreadPoolExecutor().getCompletedTaskCount(), threadPoolTaskExecutor.getThreadPoolExecutor().getTaskCount()); ActiveMQObjectMessage mqObjectMessage = new ActiveMQObjectMessage(); mqObjectMessage.setJMSDestination(destination); mqObjectMessage.setObject((Serializable) message); this.jmsMessagingTemplate.convertAndSend(destination, mqObjectMessage); } catch (Throwable e) { log.error("{}", e); } }); } }
7.結構圖app
8.MQ的可視化界面dom