springboot 整合rabbitmq

1,準備

①,拉取rabbitmq 鏡像(推薦使用,鏡像加速:http://www.docker-cn.com/registry-mirror)java

#獲取鏡像
docker pull registry.docker-cn.com/library/rabbitmq

#查看docker的鏡像
[root@localhost ~]# docker images
REPOSITORY                                     TAG                 IMAGE ID            CREATED             SIZE
registry.docker-cn.com/library/rabbitmq        3-management        c51d1c73d028        9 days ago          149 MB

#運行鏡像(-d後臺運行,-p表示暴露的端口,5672是程序鏈接端口,15672是管理端口,
#--name 指定容器名 c51d1c73d028 是鏡像id)
docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq c51d1c73d028

#查看docker裏運行的容器

[root@localhost ~]# docker ps
CONTAINER ID        IMAGE               COMMAND                  CREATED             STATUS              PORTS                                                                                        NAMES
02731f5a5334        c51d1c73d028        "docker-entrypoint..."   3 days ago          Up 4 seconds        4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp   myrabbitmq

②,在瀏覽器上訪問rabbitmq後臺web

③,建立一個direct類型的交換機spring

④,建立隊列docker

⑤,將交換機與隊列綁定json

⑥,綁定成功後以下瀏覽器

⑦,pom依賴springboot

<parent>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-parent</artifactId>
	<version>2.0.2.RELEASE</version>
</parent>
<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-amqp</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-web</artifactId>
	</dependency>
</dependencies>

⑧,application.properties 的配置app

server.port=8081
spring.rabbitmq.host=192.168.43.28
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

2,查看RabbitAutoConfiguration

①,springboot爲咱們注入了兩個重要的beantcp

//	RabbitTemplate  用於發送和接受消息
@Bean
		@ConditionalOnSingleCandidate(ConnectionFactory.class)
		@ConditionalOnMissingBean
		public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
			PropertyMapper map = PropertyMapper.get();
			RabbitTemplate template = new RabbitTemplate(connectionFactory);
            //從ioc容器中獲取消息轉換器
			MessageConverter messageConverter = this.messageConverter.getIfUnique();
			if (messageConverter != null) {
                //獲取到了就配置爲默認的消息轉換器
				template.setMessageConverter(messageConverter);
			}
			template.setMandatory(determineMandatoryFlag());
			RabbitProperties.Template properties = this.properties.getTemplate();
			if (properties.getRetry().isEnabled()) {
				template.setRetryTemplate(createRetryTemplate(properties.getRetry()));
			}
			map.from(properties::getReceiveTimeout).whenNonNull().as(Duration::toMillis)
					.to(template::setReceiveTimeout);
			map.from(properties::getReplyTimeout).whenNonNull().as(Duration::toMillis)
					.to(template::setReplyTimeout);
			map.from(properties::getExchange).to(template::setExchange);
			map.from(properties::getRoutingKey).to(template::setRoutingKey);
			return template;
		}
//AmqpAdmin 是用於建立交換器和隊列以及綁定規則的工具
@Bean
		@ConditionalOnSingleCandidate(ConnectionFactory.class)
		@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
		@ConditionalOnMissingBean
		public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
			return new RabbitAdmin(connectionFactory);
		}

3,配置rabbitmq

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//啓用 RabbitMQ
@EnableRabbit
@Configuration
public class RabbitConfig {
    //注入json類型的消息轉換器,這樣方便咱們觀察消息內容
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

4,配置消息監聽

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
//將該監聽器注入ioc容器
@Component
public class MyRabbitListener {

    //監聽的隊列名爲queue.news
    @RabbitListener(queues = "queue.news")
    public void listener(Object object){
        System.out.println("收到消息:"+object.toString());
    }
}

5,發送接收消息的配置

import com.mq.rabbitmq.bean.Person;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class RabbitController {

    @Autowired
    private RabbitTemplate rabbitTemplate;
   @Autowired
    private AmqpAdmin amqpAdmin;

    
    @RequestMapping("person")
    public String person(){
        //給交換器的名爲exchange.direct 發送路由鍵爲news 的消息
        rabbitTemplate.convertAndSend("exchange.direct","news",new Person(1,"翛蘇"));
        System.out.println("消息發送成功");
        return "消息發送成功";
    }

   /*利用amqpAdmin 建立交換機*/
   @RequestMapping("createDirect")
    public String createDirect(){
            //建立一個direct類型的交換機,名字爲amqb.exange.su
        amqpAdmin.declareExchange(new DirectExchange("amqb.exange.su"));
        return "建立交換器成功";
    }

    /*利用amqpAdmin 建立隊列*/
    @RequestMapping("createQueue")
    public String createQueue(){
        //建立一個隊列名爲:amqb.queue.chun 的隊列
        amqpAdmin.declareQueue(new Queue("amqb.queue.chun"));
        return "建立隊列成功";
    }

    /*利用amqpAdmin 建立交換機與交換機的綁定規則*/
    @RequestMapping("createBinding")
    public String createBinding(){
        amqpAdmin.declareBinding(new Binding("amqb.queue.chun",
                Binding.DestinationType.QUEUE,
                "amqb.exange.su",
                "route.su",null));
        return "建立綁定成功";
    }

   //接收隊列名爲queue.news 的消息
 @RequestMapping("receive")
    public String receive(){
        Object o = rabbitTemplate.receiveAndConvert("queue.news");
        System.out.println(o.toString());
        return o.toString();
    }
}

6,交換機,隊列綁定規則以下

7,測試

①,啓動應用,調度person方法發送消息spring-boot

②,控制檯輸出以下

消息發送成功
收到消息:(Body:'{"id":1,"name":"翛蘇"}' MessageProperties [headers={__TypeId__=com.mq.rabbitmq.bean.Person}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=exchange.direct, receivedRoutingKey=news, deliveryTag=3, consumerTag=amq.ctag-3zRVuPvjjOAn-ulvdLna-Q, consumerQueue=queue.news])

這是消息監聽器的輸出

相關文章
相關標籤/搜索