作最簡單的事情,一個生產者對應一個消費者,RabbitMQ至關於一個消息代理,負責將A的消息轉發給Bjava
應用場景: 將發送的電子郵件放到消息隊列,而後郵件服務在隊列中獲取郵件併發送給收件人面試
在多個消費者之間分配任務(競爭的消費者模式),一個生產者對應多個消費者,通常適用於執行資源密集型任務,單個消費者處理不過來,須要多個消費者進行處理spring
應用場景: 一個訂單的處理須要10s,有多個訂單能夠同時放到消息隊列,而後讓多個消費者同時處理,這樣就是並行了,而不是單個消費者的串行狀況數據庫
一次向許多消費者發送消息,一個生產者發送的消息會被多個消費者獲取,也就是將消息將廣播到全部的消費者中。緩存
應用場景: 更新商品庫存後須要通知多個緩存和多個數據庫,這裏的結構應該是:服務器
有選擇地(Routing key)接收消息,發送消息到交換機而且要指定路由key ,消費者將隊列綁定到交換機時須要指定路由key,僅消費指定路由key的消息數據結構
應用場景: 如在商品庫存中增長了1臺iphone12,iphone12促銷活動消費者指定routing key爲iphone12,只有此促銷活動會接收到消息,其它促銷活動不關心也不會消費此routing key的消息併發
根據主題(Topics)來接收消息,將路由key和某模式進行匹配,此時隊列須要綁定在一個模式上,#
匹配一個詞或多個詞,*
只匹配一個詞。intellij-idea
應用場景: 同上,iphone促銷活動能夠接收主題爲iphone的消息,如iphone十二、iphone13等iphone
若是咱們須要在遠程計算機上運行功能並等待結果就可使用RPC,具體流程能夠看圖。應用場景:須要等待接口返回數據,如訂單支付
與發佈者進行可靠的發佈確認,發佈者確認是RabbitMQ擴展,能夠實現可靠的發佈。在通道上啓用發佈者確認後,RabbitMQ將異步確認發送者發佈的消息,這意味着它們已在服務器端處理。
應用場景: 對於消息可靠性要求較高,好比錢包扣款
代碼中沒有對後面兩種模式演示,有興趣能夠本身研究
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Sender { private final static String QUEUE_NAME = "simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 聲明隊列 // queue:隊列名 // durable:是否持久化 // exclusive:是否排外 即只容許該channel訪問該隊列 通常等於true的話用於一個隊列只能有一個消費者來消費的場景 // autoDelete:是否自動刪除 消費完刪除 // arguments:其餘屬性 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //消息內容 String message = "simplest mode message"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("[x]Sent '" + message + "'"); //最後關閉通關和鏈接 channel.close(); connection.close(); } } import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Receiver { private final static String QUEUE_NAME = "simplest_queue"; public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { // 獲取鏈接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Receiver1 { private final static String QUEUE_NAME = "queue_work"; public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 同一時刻服務器只會發送一條消息給消費者 channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } } import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Receiver2 { private final static String QUEUE_NAME = "queue_work"; public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 同一時刻服務器只會發送一條消息給消費者 channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } } import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Sender { private final static String QUEUE_NAME = "queue_work"; public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 100; i++) { String message = "work mode message" + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("[x] Sent '" + message + "'"); Thread.sleep(i * 10); } channel.close(); connection.close(); } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Receive1 { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 訂閱消息的回調函數 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; // 消費者,有消息時出發訂閱回調函數 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } } import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Receive2 { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 訂閱消息的回調函數 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received2 '" + message + "'"); }; // 消費者,有消息時出發訂閱回調函數 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } } import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Sender { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = "publish subscribe message"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Receiver1 { private final static String QUEUE_NAME = "queue_routing"; private final static String EXCHANGE_NAME = "exchange_direct"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 指定路由的key,接收key和key2 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2"); channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } } import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Receiver2 { private final static String QUEUE_NAME = "queue_routing2"; private final static String EXCHANGE_NAME = "exchange_direct"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 僅接收key2 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2"); channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } } import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Sender { private final static String EXCHANGE_NAME = "exchange_direct"; private final static String EXCHANGE_TYPE = "direct"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 交換機聲明 channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE); // 只有routingKey相同的纔會消費 String message = "routing mode message"; channel.basicPublish(EXCHANGE_NAME, "key2", null, message.getBytes()); System.out.println("[x] Sent '" + message + "'"); // channel.basicPublish(EXCHANGE_NAME, "key", null, message.getBytes()); // System.out.println("[x] Sent '" + message + "'"); channel.close(); connection.close(); } }
public class Receiver1 { private final static String QUEUE_NAME = "queue_topic"; private final static String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 能夠接收key.1 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.*"); channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } } public class Receiver1 { private final static String QUEUE_NAME = "queue_topic"; private final static String EXCHANGE_NAME = "exchange_topic"; public static void main(String[] args) throws IOException, InterruptedException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 能夠接收key.1 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.*"); channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Sender { private final static String EXCHANGE_NAME = "exchange_topic"; private final static String EXCHANGE_TYPE = "topic"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE); String message = "topics model message with key.1"; channel.basicPublish(EXCHANGE_NAME, "key.1", null, message.getBytes()); System.out.println("[x] Sent '" + message + "'"); String message2 = "topics model message with key.1.2"; channel.basicPublish(EXCHANGE_NAME, "key.1.2", null, message2.getBytes()); System.out.println("[x] Sent '" + message2 + "'"); channel.close(); connection.close(); } }
一、直連交換機(Direct exchange):
具備路由功能的交換機,綁定到此交換機的時候須要指定一個routing_key,交換機發送消息的時候須要routing_key,會將消息發送道對應的隊列
二、扇形交換機(Fanout exchange):
廣播消息到全部隊列,沒有任何處理,速度最快
三、主題交換機(Topic exchange):
在直連交換機基礎上增長模式匹配,也就是對routing_key進行模式匹配,*
表明一個單詞,#
表明多個單詞
四、首部交換機(Headers exchange):
忽略routing_key,使用Headers信息(一個Hash的數據結構)進行匹配,優點在於能夠有更多更靈活的匹配規則
這麼多種隊列模式中都有其應用場景,你們能夠根據應用場景示例中進行選擇。
原文連接:https://blog.csdn.net/qq_3282...
版權聲明:本文爲CSDN博主「我思知我在」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處連接及本聲明。
近期熱文推薦:
1.600+ 道 Java面試題及答案整理(2021最新版)
2.終於靠開源項目弄到 IntelliJ IDEA 激活碼了,真香!
3.阿里 Mock 工具正式開源,幹掉市面上全部 Mock 工具!
4.Spring Cloud 2020.0.0 正式發佈,全新顛覆性版本!
以爲不錯,別忘了隨手點贊+轉發哦!