①,拉取rabbitmq 鏡像(推薦使用,鏡像加速:http://www.docker-cn.com/registry-mirror)java
#獲取鏡像 docker pull registry.docker-cn.com/library/rabbitmq #查看docker的鏡像 [root@localhost ~]# docker images REPOSITORY TAG IMAGE ID CREATED SIZE registry.docker-cn.com/library/rabbitmq 3-management c51d1c73d028 9 days ago 149 MB #運行鏡像(-d後臺運行,-p表示暴露的端口,5672是程序鏈接端口,15672是管理端口, #--name 指定容器名 c51d1c73d028 是鏡像id) docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq c51d1c73d028 #查看docker裏運行的容器 [root@localhost ~]# docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 02731f5a5334 c51d1c73d028 "docker-entrypoint..." 3 days ago Up 4 seconds 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp myrabbitmq
②,在瀏覽器上訪問rabbitmq後臺web
③,建立一個direct類型的交換機spring
④,建立隊列docker
⑤,將交換機與隊列綁定json
⑥,綁定成功後以下瀏覽器
⑦,pom依賴springboot
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.2.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies>
⑧,application.properties 的配置app
server.port=8081 spring.rabbitmq.host=192.168.43.28 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
①,springboot爲咱們注入了兩個重要的beantcp
// RabbitTemplate 用於發送和接受消息 @Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnMissingBean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { PropertyMapper map = PropertyMapper.get(); RabbitTemplate template = new RabbitTemplate(connectionFactory); //從ioc容器中獲取消息轉換器 MessageConverter messageConverter = this.messageConverter.getIfUnique(); if (messageConverter != null) { //獲取到了就配置爲默認的消息轉換器 template.setMessageConverter(messageConverter); } template.setMandatory(determineMandatoryFlag()); RabbitProperties.Template properties = this.properties.getTemplate(); if (properties.getRetry().isEnabled()) { template.setRetryTemplate(createRetryTemplate(properties.getRetry())); } map.from(properties::getReceiveTimeout).whenNonNull().as(Duration::toMillis) .to(template::setReceiveTimeout); map.from(properties::getReplyTimeout).whenNonNull().as(Duration::toMillis) .to(template::setReplyTimeout); map.from(properties::getExchange).to(template::setExchange); map.from(properties::getRoutingKey).to(template::setRoutingKey); return template; } //AmqpAdmin 是用於建立交換器和隊列以及綁定規則的工具 @Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true) @ConditionalOnMissingBean public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); }
import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; //啓用 RabbitMQ @EnableRabbit @Configuration public class RabbitConfig { //注入json類型的消息轉換器,這樣方便咱們觀察消息內容 @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }
import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; //將該監聽器注入ioc容器 @Component public class MyRabbitListener { //監聽的隊列名爲queue.news @RabbitListener(queues = "queue.news") public void listener(Object object){ System.out.println("收到消息:"+object.toString()); } }
import com.mq.rabbitmq.bean.Person; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class RabbitController { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private AmqpAdmin amqpAdmin; @RequestMapping("person") public String person(){ //給交換器的名爲exchange.direct 發送路由鍵爲news 的消息 rabbitTemplate.convertAndSend("exchange.direct","news",new Person(1,"翛蘇")); System.out.println("消息發送成功"); return "消息發送成功"; } /*利用amqpAdmin 建立交換機*/ @RequestMapping("createDirect") public String createDirect(){ //建立一個direct類型的交換機,名字爲amqb.exange.su amqpAdmin.declareExchange(new DirectExchange("amqb.exange.su")); return "建立交換器成功"; } /*利用amqpAdmin 建立隊列*/ @RequestMapping("createQueue") public String createQueue(){ //建立一個隊列名爲:amqb.queue.chun 的隊列 amqpAdmin.declareQueue(new Queue("amqb.queue.chun")); return "建立隊列成功"; } /*利用amqpAdmin 建立交換機與交換機的綁定規則*/ @RequestMapping("createBinding") public String createBinding(){ amqpAdmin.declareBinding(new Binding("amqb.queue.chun", Binding.DestinationType.QUEUE, "amqb.exange.su", "route.su",null)); return "建立綁定成功"; } //接收隊列名爲queue.news 的消息 @RequestMapping("receive") public String receive(){ Object o = rabbitTemplate.receiveAndConvert("queue.news"); System.out.println(o.toString()); return o.toString(); } }
①,啓動應用,調度person方法發送消息spring-boot
②,控制檯輸出以下
消息發送成功 收到消息:(Body:'{"id":1,"name":"翛蘇"}' MessageProperties [headers={__TypeId__=com.mq.rabbitmq.bean.Person}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=exchange.direct, receivedRoutingKey=news, deliveryTag=3, consumerTag=amq.ctag-3zRVuPvjjOAn-ulvdLna-Q, consumerQueue=queue.news])
這是消息監聽器的輸出