RabbitMQ與SpringBoot整合

首先添加依賴

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

添加配置java

 在application.properties中添加如下配置spring

#rabbitmq
spring.rabbitmq.host=118.24.103.51
spring.rabbitmq.port=5672
#服務器帳號密碼
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
#消費者數量
spring.rabbitmq.listener.simple.concurrency= 10
spring.rabbitmq.listener.simple.max-concurrency= 10
#\u6D88\u8D39\u8005\u6BCF\u6B21\u4ECE\u961F\u5217\u83B7\u53D6\u7684\u6D88\u606F\u6570\u91CF
spring.rabbitmq.listener.simple.prefetch= 1
#\u6D88\u8D39\u8005\u81EA\u52A8\u542F\u52A8
spring.rabbitmq.listener.simple.auto-startup=true
#\u6D88\u8D39\u5931\u8D25\uFF0C\u81EA\u52A8\u91CD\u65B0\u5165\u961F
spring.rabbitmq.listener.simple.default-requeue-rejected= true
#\u542F\u7528\u53D1\u9001\u91CD\u8BD5
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=1000
spring.rabbitmq.template.retry.max-attempts=3
spring.rabbitmq.template.retry.max-interval=10000
spring.rabbitmq.template.retry.multiplier=1.0

 演示一 :direct模式交換機(exchange)模式

建立消息隊列服務器

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class MQConfig {

    public static final String QUEUE_NAME = "queue";

    @Bean
    public Queue queue(){
        return new Queue(QUEUE_NAME,true);//第一個參數是隊列名  第二是是否持久化
    }
}

消息發送者 app

import com.hz1202.miaosha.service.RedisService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MQSender {
    @Autowired
    private  AmqpTemplate amqpTemplate;

    public void send(Object message){
        String msg = RedisService.beanToString(message);
        amqpTemplate.convertAndSend(MQConfig.QUEUE_NAME,msg);
        System.out.println("send message:"+msg);
    }
}

消息接收者spring-boot

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class MQReceiver {

    @RabbitListener(queues = MQConfig.QUEUE_NAME)
    public void receiver(String message){
        System.out.println("receiveMessage:"+message);
    }
}

 注意:用戶guest是rabbitMQ的默認用戶  密碼爲guest  可是 guest不支持遠程登陸,要讓guest支持遠程登陸  請在rabbitMQ 安裝目錄下你的 /etc/rabbitmq文件夾中的rabbitmq.config(沒有的話本身建立)配置文件中加入 如下代碼oop

[{rabbit, [{loopback_users, []}]}].

演示二  :Topic模式交換機(exchange)模式

 建立消息隊列fetch

/**
     * topic 模式
     */
    @Bean
    public Queue topicQueue1(){
        return new Queue("topic.queue1",true);//第一個參數是隊列名  第二是是否持久化
    }

    @Bean
    public Queue topicQueue2(){
        return new Queue("topic.queue2",true);//第一個參數是隊列名  第二是是否持久化
    }

    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("topicExchange");
    }

    @Bean
    public Binding topicBinding(){
        return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.key1");
    }

    @Bean
    public Binding topicBinding2(){
        return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.#");
    }

 流程說明:咱們先建立了兩個queue 分別命名爲 topic.queue1 和 topic.queue2 , 而後再建立一個交換機 命名爲 topicExchange ,最後將兩個queue和交換機綁定,同時制定了匹配規則 ,"#"表明所有匹配ui

消息發送者 spa

import com.hz1202.miaosha.service.RedisService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MQSender {
    @Autowired
    private  AmqpTemplate amqpTemplate;

    public void sendTopic(Object message){
        String msg = RedisService.beanToString(message);
        amqpTemplate.convertAndSend("topicExchange","topic.key1",msg+"1");//第一個參數表明交換機名 第二個表明知足匹配規則的表達式  第三個消息
        amqpTemplate.convertAndSend("topicExchange","topic.key2",msg+"2");
        System.out.println("send message:"+msg);
    }
}

 咱們在綁定交換機與queue時制定了匹配規則,"topic.key1"只能匹配"topic.key1","topic.#"能夠匹配所有以"topic."開頭的消息; 這樣,第一條消息就會被 topic.queue1和 topic.queue2所匹配,而第二條只能被 topic.queue2匹配到code

消息接收者

@RabbitListener(queues = MQConfig.TOPIC_QUEUE_NAME1)
    public void receiverTopic1(String message){
        System.out.println("receive topic queue1 message:"+message);
    }

    @RabbitListener(queues = MQConfig.TOPIC_QUEUE_NAME2)
    public void receiverTopic2(String message){
        System.out.println("receive topic queue2 message:"+message);
    }

 演示三 :Fanout模式交換機(exchange)模式

 建立隊列並將隊列和fanout交換機綁定

/**
     * fanout模式
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(FANOUT_EXCHANGE);
    }

    @Bean
    public Binding fanoutBinding1(){
        return BindingBuilder.bind(topicQueue1()).to(fanoutExchange());
    }

    @Bean
    public Binding fanoutBinding2(){
        return BindingBuilder.bind(topicQueue2()).to(fanoutExchange());
    }

建立消息發送者

public void sendFanout(Object message){
        String msg = RedisService.beanToString(message);
        amqpTemplate.convertAndSend("fanoutExchange","",msg);//第一個參數表明交換機名   第三個消息
        System.out.println("send fanout message:"+msg);
    }

 消息接收者

@RabbitListener(queues = MQConfig.TOPIC_QUEUE_NAME1)
    public void receiverTopic1(String message){
        System.out.println("receive queue1 message:"+message);
    }

    @RabbitListener(queues = MQConfig.TOPIC_QUEUE_NAME2)
    public void receiverTopic2(String message){
        System.out.println("receive queue2 message:"+message);
    }
相關文章
相關標籤/搜索