從圖上看,和JDK自己同樣,生產者往隊列添加數據,消費者從隊列拿數據,若是業務場景確實這麼簡單,還可使用redis的集合來代替,減小整個系統的複雜度,系統越簡單問題越少web
public class RabbitMQ {
Logger logger = LoggerFactory.getLogger(RabbitMQ.class);
private ConnectionFactory factory;
// 初始化鏈接工廠
@Before
public void init() {
factory = new ConnectionFactory();
// 設置相關參數
factory.setHost("192.168.245.128");
factory.setPort(5672);
factory.setVirtualHost("/jt");
factory.setUsername("admin");
factory.setPassword("12340101");
}
@Test
public void simpleSend() throws Exception {
// 1.獲取鏈接
Connection conn = factory.newConnection();
// 2.從鏈接獲取信道
Channel channel = conn.createChannel();
// 3.利用channel聲明一個隊列
/*
queue 表示聲明的queue對列的名字
durable 表示是否持久化
exclusive 表示當前聲明的queue是否被當前信道獨佔
true:當前鏈接建立的任何channel均可以鏈接該queue
false:只有當前channel能夠鏈接該queue
autoDelete Boolean類型:在最後鏈接使用完成後,是否刪除隊列,false
arguments 其餘聲明參數封裝到map中傳遞給mq
*/
channel.queueDeclare("simple", false, false, false, null);
// 4.發送消息
/*
exchange 交換機名稱,簡單模式使用默認交換,該值設置爲""
routingkey 當前的消息綁定的routingkey,簡單模式下,與隊列同名便可
props 消息的屬性字段對象,例如BasicProperties,能夠設置一個deliveryMode的值0 持久化,1 表示不持久化,durable配合使用
body 消息字符串的byte數組
*/
channel.basicPublish("", "simple", null, "簡單模式的消息發送".getBytes());
}
@Test
public void simpleReciever() throws Exception {
// 1.獲取鏈接
Connection conn = factory.newConnection();
// 2.獲取信道
Channel channel = conn.createChannel();
// 3.綁定隊列
channel.queueDeclare("simple", false, false, false, null);
// 4.建立一個消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 5.綁定消費者和隊列
channel.basicConsume("simple", consumer);
// 6.獲取消息
while(true) {
Delivery delivery = consumer.nextDelivery();
String msg=new String(delivery.getBody());
System.out.println(msg);
}
}
}
一個隊列由多個消費者共享,若是消費者處理速度落後於生產者,能夠不斷擴充消費,提升消息的處理能力redis
注意:這種模式隊列的數據一旦被其中一個消費者拿走,其餘消費者就不會再拿到,與下面的訂閱發佈模式不同,它提供了兩個隊列,消息有兩份spring
@Test
public void workSender() throws Exception{
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.queueDeclare("work", false, false, false, null);
for(int i=0;i<100;i++) {
channel.basicPublish("", "work", null, ("工做模式發送的第 ("+i+") 個消息").getBytes());
}
}
@Test
public void workReceiver_a() throws Exception{
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.queueDeclare("work", false, false, false, null);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
//其中第二參數表示消費者接收消息後是否自動返回回執
channel.basicConsume("work", false, consumer);
while(true) {
Delivery delivery = consumer.nextDelivery();
String msg=new String(delivery.getBody());
logger.info(msg);
Thread.sleep(50);
//手動發送回執
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
@Test
public void workReceiver_b() throws Exception{
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.queueDeclare("work", false, false, false, null);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
//其中第二參數表示消費者接收消息後是否自動返回回執
channel.basicConsume("work", false, consumer);
while(true) {
Delivery delivery = consumer.nextDelivery();
String msg=new String(delivery.getBody());
logger.info(msg);
Thread.sleep(100);
//手動發送回執
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
生產者將消息發送交換機,交換機在將消息發給N個隊列,消費者連到響應隊列取消息便可,此功能比較適合將某單一系統的簡單業務數據消息廣播給全部接口數據庫
@Test
public void fanoutSender() throws Exception {
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
// 建立交換機
/*
* 參數:
* Exchange: 自定義交換機名稱,接受端聲明交換機的名字需和它保持一致
* type: 交換機類型,取值範圍(fanout(訂閱/發佈),direct(路由模式),topic(主題))
*/
channel.exchangeDeclare("fanoutEx", "fanout");
// 發送消息
for (int i = 0; i < 100; i++) {
channel.basicPublish("fanoutEx", "", null, ("訂閱/發佈模式發送的第 (" + i + ") 個消息").getBytes());
}
}
@Test
public void fanoutReceiver() throws Exception {
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
//建立隊列
channel.queueDeclare("fanout", false, false, false, null);
//建立交換機
channel.exchangeDeclare("fanoutEx", "fanout");
//綁定隊列和交換機
channel.queueBind("fanout", "fanoutEx", "");
channel.basicQos(1);
//建立消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
//綁定消費者和隊列
channel.basicConsume("fanout", consumer);
//取數據
while(true) {
Delivery delivery = consumer.nextDelivery();
logger.info(new String(delivery.getBody()));
// 手動發送回執
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
@Test
public void routingSender() throws Exception {
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
// 建立交換機
/*
* 參數:
* Exchange: 自定義交換機名稱,接受端聲明交換機的名字需和它保持一致
* type: 交換機類型,取值範圍(fanout(訂閱/發佈),direct(路由模式),topic(主題))
*/
channel.exchangeDeclare("directEx", "direct");
// 發送消息
for (int i = 0; i < 100; i++) {
channel.basicPublish("directEx", "receiver_b", null, ("路由模式發送的第 (" + i + ") 個消息").getBytes());
}
}
@Test
public void routingReceiver_a() throws Exception{
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
//建立隊列
channel.queueDeclare("direct_a", false, false, false, null);
//建立交換機
channel.exchangeDeclare("directEx", "direct");
//綁定隊列和交換機
channel.queueBind("direct_a", "directEx", "receiver_a");
channel.basicQos(1);
//建立消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
//綁定消費者和隊列
channel.basicConsume("direct_a", consumer);
//取數據
while(true) {
Delivery delivery = consumer.nextDelivery();
logger.info(new String(delivery.getBody()));
// 手動發送回執
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
@Test
public void routingReceiver_b() throws Exception{
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
//建立隊列
channel.queueDeclare("direct_b", false, false, false, null);
//建立交換機
channel.exchangeDeclare("directEx", "direct");
//綁定隊列和交換機
channel.queueBind("direct_b", "directEx", "receiver_b");
channel.basicQos(1);
//建立消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
//綁定消費者和隊列
channel.basicConsume("direct_b", consumer);
//取數據
while(true) {
Delivery delivery = consumer.nextDelivery();
logger.info(new String(delivery.getBody()));
// 手動發送回執
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
兩個消費者,能夠更改生產者的routingKey觀察消費者獲取數據的變化。從觀察結果能夠看到,生產者的routingKey和消費者指定的routingKey徹底一致,消費者才能拿到消息數組
@Test
public void topicSender() throws Exception {
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
// 建立交換機
/*
* 參數:
* Exchange: 自定義交換機名稱,接受端聲明交換機的名字需和它保持一致
* type: 交換機類型,取值範圍(fanout(訂閱/發佈),direct(路由模式),topic(主題))
*/
channel.exchangeDeclare("topicEx", "topic");
// 發送消息
for (int i = 0; i < 100; i++) {
channel.basicPublish("topicEx", "acct.save", null, ("主題模式發送的第 (" + i + ") 個消息").getBytes());
}
}
@Test
public void topicReceiver_a() throws Exception{
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
//建立隊列
channel.queueDeclare("topic_a", false, false, false, null);
//建立交換機
channel.exchangeDeclare("topicEx", "topic");
//綁定隊列和交換機
channel.queueBind("topic_a", "topicEx", "acct.save");
channel.basicQos(1);
//建立消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
//綁定消費者和隊列
channel.basicConsume("topic_a", consumer);
//取數據
while(true) {
Delivery delivery = consumer.nextDelivery();
logger.info(new String(delivery.getBody()));
// 手動發送回執
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
@Test
public void topicReceiver_b() throws Exception{
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
//建立隊列
channel.queueDeclare("topic_b", false, false, false, null);
//建立交換機
channel.exchangeDeclare("topicEx", "topic");
//綁定隊列和交換機
//channel.queueBind("topic_b", "topicEx", "acct.update");
channel.queueBind("topic_b", "topicEx", "acct.*");
channel.basicQos(1);
//建立消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
//綁定消費者和隊列
channel.basicConsume("topic_b", consumer);
//取數據
while(true) {
Delivery delivery = consumer.nextDelivery();
logger.info(new String(delivery.getBody()));
// 手動發送回執
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
主題模式從使用上看,就是支持ANT,用*表明一個詞,#表明多個詞,不然就是精確匹配,感受路由模式就是特殊的主題模式(沒有使用ANT的通配符),具體原理如今還沒去研究,先用起來再說app
application.properties中新增的配置項工具
spring.rabbitmq.host=192.168.245.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=acct
spring.rabbitmq.password=acct
spring.rabbitmq.virtualHost=/rbAcct
spring.rabbitmq.publisher-confirms=true
Java代碼測試
RabbitConfigBean:建立交換機、隊列、綁定隊列與交換機、指定路由值ui
TestController:模擬生產者發送數據spa
RabbitClient:模擬消費者拿數據
package com.jv.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfigBean {
//整合rabbitmq的配置,以路由模式爲例
//1聲明direct類型的交換機
@Bean
public DirectExchange defaultExchange(){
return new DirectExchange("testEx");
}
//2聲明隊列
@Bean
public Queue queue01(){
return new Queue("testQueue1", true);
}
//3 綁定交換機與隊列的關係,而且指定路由key
@Bean
public Binding binding01(){
return BindingBuilder.bind(queue01()).to(defaultExchange()).
with("hello1");
}
@Bean
public Queue queue02(){
return new Queue("testQueue2", true);
}
@Bean
public Binding binding02(){
return BindingBuilder.bind(queue02()).to(defaultExchange()).
with("hello2");
}
}
package com.jv.rabbitmq.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TestController {
@Autowired
private RabbitTemplate template;
@RequestMapping("/test")
public String test() {
/*
* 在實際開發中,應該把RabbitTemplate注入到Service中,將寫數據庫等耗時的操做接隊列的消費者,提升前臺響應速度
*
* 固然還須要考慮到業務場景,若是出現臨界,數據半天沒有入庫,其餘地方查不到會不會致使投訴等
*/
template.convertAndSend("testEx", "hello1", "梅西生了三個男娃兒,巴薩歐冠出局,曼聯也出局,靠");
template.convertAndSend("testEx", "hello2", "C羅進球數後來居上,皇馬在歐冠差點被尤文翻盤,懸啊");
return "hello world!";
}
}
package com.jv.rabbitmq.client;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class RabbitClient {
@RabbitListener(queues="testQueue1")
public void process01(String msg) throws Exception{
System.out.println("接收到的消息是:"+msg);
}
@RabbitListener(queues="testQueue2")
public void process02(String msg){
System.out.println("接收到的消息是:"+msg);
}
}
SpringBoot用起來確實很爽,redis\httpclient等第三方工具都進行了封裝,只須要配置相關屬性,在你的Controller和Service注入特定對象便可操做數據了