首先添加依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
添加配置java
在application.properties中添加如下配置spring
#rabbitmq spring.rabbitmq.host=118.24.103.51 spring.rabbitmq.port=5672 #服務器帳號密碼 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ #消費者數量 spring.rabbitmq.listener.simple.concurrency= 10 spring.rabbitmq.listener.simple.max-concurrency= 10 #\u6D88\u8D39\u8005\u6BCF\u6B21\u4ECE\u961F\u5217\u83B7\u53D6\u7684\u6D88\u606F\u6570\u91CF spring.rabbitmq.listener.simple.prefetch= 1 #\u6D88\u8D39\u8005\u81EA\u52A8\u542F\u52A8 spring.rabbitmq.listener.simple.auto-startup=true #\u6D88\u8D39\u5931\u8D25\uFF0C\u81EA\u52A8\u91CD\u65B0\u5165\u961F spring.rabbitmq.listener.simple.default-requeue-rejected= true #\u542F\u7528\u53D1\u9001\u91CD\u8BD5 spring.rabbitmq.template.retry.enabled=true spring.rabbitmq.template.retry.initial-interval=1000 spring.rabbitmq.template.retry.max-attempts=3 spring.rabbitmq.template.retry.max-interval=10000 spring.rabbitmq.template.retry.multiplier=1.0
演示一 :direct模式交換機(exchange)模式
建立消息隊列服務器
import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MQConfig { public static final String QUEUE_NAME = "queue"; @Bean public Queue queue(){ return new Queue(QUEUE_NAME,true);//第一個參數是隊列名 第二是是否持久化 } }
消息發送者 app
import com.hz1202.miaosha.service.RedisService; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class MQSender { @Autowired private AmqpTemplate amqpTemplate; public void send(Object message){ String msg = RedisService.beanToString(message); amqpTemplate.convertAndSend(MQConfig.QUEUE_NAME,msg); System.out.println("send message:"+msg); } }
消息接收者spring-boot
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service public class MQReceiver { @RabbitListener(queues = MQConfig.QUEUE_NAME) public void receiver(String message){ System.out.println("receiveMessage:"+message); } }
注意:用戶guest是rabbitMQ的默認用戶 密碼爲guest 可是 guest不支持遠程登陸,要讓guest支持遠程登陸 請在rabbitMQ 安裝目錄下你的 /etc/rabbitmq文件夾中的rabbitmq.config(沒有的話本身建立)配置文件中加入 如下代碼oop
[{rabbit, [{loopback_users, []}]}].
演示二 :Topic模式交換機(exchange)模式
建立消息隊列fetch
/** * topic 模式 */ @Bean public Queue topicQueue1(){ return new Queue("topic.queue1",true);//第一個參數是隊列名 第二是是否持久化 } @Bean public Queue topicQueue2(){ return new Queue("topic.queue2",true);//第一個參數是隊列名 第二是是否持久化 } @Bean public TopicExchange topicExchange(){ return new TopicExchange("topicExchange"); } @Bean public Binding topicBinding(){ return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.key1"); } @Bean public Binding topicBinding2(){ return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.#"); }
流程說明:咱們先建立了兩個queue 分別命名爲 topic.queue1 和 topic.queue2 , 而後再建立一個交換機 命名爲 topicExchange ,最後將兩個queue和交換機綁定,同時制定了匹配規則 ,"#"表明所有匹配ui
消息發送者 spa
import com.hz1202.miaosha.service.RedisService; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class MQSender { @Autowired private AmqpTemplate amqpTemplate; public void sendTopic(Object message){ String msg = RedisService.beanToString(message); amqpTemplate.convertAndSend("topicExchange","topic.key1",msg+"1");//第一個參數表明交換機名 第二個表明知足匹配規則的表達式 第三個消息 amqpTemplate.convertAndSend("topicExchange","topic.key2",msg+"2"); System.out.println("send message:"+msg); } }
咱們在綁定交換機與queue時制定了匹配規則,"topic.key1"只能匹配"topic.key1","topic.#"能夠匹配所有以"topic."開頭的消息; 這樣,第一條消息就會被 topic.queue1和 topic.queue2所匹配,而第二條只能被 topic.queue2匹配到code
消息接收者
@RabbitListener(queues = MQConfig.TOPIC_QUEUE_NAME1) public void receiverTopic1(String message){ System.out.println("receive topic queue1 message:"+message); } @RabbitListener(queues = MQConfig.TOPIC_QUEUE_NAME2) public void receiverTopic2(String message){ System.out.println("receive topic queue2 message:"+message); }
演示三 :Fanout模式交換機(exchange)模式
建立隊列並將隊列和fanout交換機綁定
/** * fanout模式 */ @Bean public FanoutExchange fanoutExchange(){ return new FanoutExchange(FANOUT_EXCHANGE); } @Bean public Binding fanoutBinding1(){ return BindingBuilder.bind(topicQueue1()).to(fanoutExchange()); } @Bean public Binding fanoutBinding2(){ return BindingBuilder.bind(topicQueue2()).to(fanoutExchange()); }
建立消息發送者
public void sendFanout(Object message){ String msg = RedisService.beanToString(message); amqpTemplate.convertAndSend("fanoutExchange","",msg);//第一個參數表明交換機名 第三個消息 System.out.println("send fanout message:"+msg); }
消息接收者
@RabbitListener(queues = MQConfig.TOPIC_QUEUE_NAME1) public void receiverTopic1(String message){ System.out.println("receive queue1 message:"+message); } @RabbitListener(queues = MQConfig.TOPIC_QUEUE_NAME2) public void receiverTopic2(String message){ System.out.println("receive queue2 message:"+message); }