做者:threedayman 恆生LIGHT雲社區html
RabbitMQ是什麼
RabbitMQ是部署最普遍的開源消息代理。RabbitMQ有輕量級且易部署的特色。支持多種消息協議。java
爲何使用RabbitMQ
常見的使用場景有解耦、異步、削峯填谷。下面咱們經過例子來感覺下各自場景下使用MQ帶來的效益。mysql
解耦git
假設有系統A,依賴系統B、系統C、系統D,依賴關係在代碼中已經寫死,結構以下圖。程序員
假設此時又來了一個新需求,系統A須要調用系統E進行一些新的業務操做,那麼系統A的程序員又免不了一頓操做,處理接入系統E的需求。同理若是要去掉某個系統的依賴好比系統C,也須要系統A負責的開發進行處理。github
那麼此時咱們若是引入了MQ來看看會帶來什麼樣的變化。sql
系統A發送消息到MQ,系統B、C、D訂閱對應的消息進行業務處理。那麼咱們再來看看以前的場景,假設須要增長一個依賴系統E,只須要系統E的開發人員進行對應的訂閱消費便可,同理若是要取消系統C的依賴,只須要系統C取消訂閱對應的消息。數據庫
異步app
假設系統A操做耗時30ms,系統A還將同步調用系統B(300ms)、系統C(600ms)、系統D(200ms)那麼這個請求的響應時間將會達到1130ms。過長的響應時間會給客戶帶來很差的用戶體驗。異步
引入MQ以後咱們看看會發生什麼變化
系統A將消息發送給MQ(7ms)以後就返回,系統B、C、D分別監聽MQ進行業務處理。那麼咱們看到針對剛纔長耗時的同步依賴,引入MQ進行異步處理後,整體的響應時間從1130ms降到了37ms。
削峯填谷
假設咱們有個業務高峯期的請求量可以到達7000 /s而業務低谷流量只有100/s,可是咱們的mysql數據庫只能承受2000/s的請求。
在這種狀況下會致使在高峯期超過了mqsql最高的負載能力而直接打掛,而低峯期沒有將mqsql的資源合理利用起來。
引入MQ以後咱們看看會發生什麼變化
此時系統能夠按照本身最大的消費能力2000/s去拉取消息,能夠平穩度過業務高峯期,同時將一部分消息延遲到業務低谷時期進行處理。不至於出現因爲高流量致使數據庫被打掛,出現總體服務不可用的現象。
怎樣使用RabbitMQ
本小節主要針對RabbitMQ的java客戶端編寫的幾個經常使用的例子,若是您對使用RabbitMQ已熟練掌握,可跳過本小節。查看完整的RabbitMQ使用說明,請訪問官方文檔。
Hello world
咱們經過一個Hello world 的例子來感覺下RabbitMQ。首先介紹下本例中使用到的術語
- Producer:生產者,用來發送消息。
- Queue:消息隊列,用於存儲消息,消息經由生產者投遞到消息隊列,最終被投遞到消費者進行消費,消息隊列收到機器內存和硬盤資源的限制。
- Consumer:消費者,用於接收並處理消息。
本例中咱們咱們將生產Hello World的消息,經過消費者接受並打印出消息。
生產者Send 關鍵步驟見註釋說明
public class Send { //隊列名稱 private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { //建立和server之間的鏈接 connection、channel ConnectionFactory factory = new ConnectionFactory(); //請設置實際部署節點ip factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { //聲明一個queue去發送消息 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; //發佈消息 channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); } } }
消費者Recv 關鍵步驟見註釋說明
public class Recv { //隊列名稱 private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { //建立和server之間的鏈接 connection、channel ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //聲明要去消費的隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); //經過該類來處理消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }
Work Queues
在本例中咱們將介紹經過RabbitMQ分發耗時任務給多個工做者。RabbitMQ會經過輪詢(round-robin)的方式將消息投遞給消費者,這是的咱們可以很容易的擴展消費能力。
生產者NewTask 關鍵步驟見註釋說明
public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { //將隊列設置成持久化 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = String.join(" ", argv); //將消息設置成持久化 channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } }
消費者Woker 關鍵步驟見註釋說明
public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); //將隊列設置成持久化 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); //一個消費者最多同時處理一個未確認的消息 channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } //模擬耗時任務,一個.表明耗時1S private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == '.') { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } }
Publish/Subscribe
上面咱們已經介紹過RabbitMQ的核心消息模型,生產者、消費者、隊列,在本小節咱們將接觸到另外一個消息模型exchange** ,它負責從生產者中接收消息,並把消息投遞到隊列中。exchage主要有如下幾種類型**
- direct
- topic
- headers
- fanout
本例中咱們將已fanout類型做爲講解,經過名稱咱們大概也能猜到此類型exchange會廣播接收到的消息到其綁定的隊列中。
生產者EmitLog 關鍵步驟見註釋說明
public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { //建立一個exchange 並指定類型 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = argv.length < 1 ? "info: Hello World!" : String.join(" ", argv); //此處和以前發消息不同,指定具體的exchange沒有指定具體的queue channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } }
消費者ReceiveLogs 關鍵步驟見註釋說明
public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //建立fanout類型的exchange channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //獲取一個獨有的,非持久化的,自動刪除的隊列 String queueName = channel.queueDeclare().getQueue(); //經過綁定方法將exchage和queue之間簡歷關係 channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
Routing
上一個例子中exchange將接收到的信息廣播給了綁定的隊列中,本例中咱們將增長綁定的一些特定,使exchange有能力經過routingKey(全匹配)來投遞不一樣的消息到不一樣的隊列中。例如平常日誌區分error日誌進單獨的隊列。
生產者EmitLogDirect 關鍵步驟見註釋說明
public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { //聲明一個direct類型的exchange channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String severity = getSeverity(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); } } private static String getSeverity(String[] strings) { if (strings.length < 1) return "info"; return strings[0]; } private static String getMessage(String[] strings) { if (strings.length < 2) return "Hello World!"; return joinStrings(strings, " ", 1); } private static String joinStrings(String[] strings, String delimiter, int startIndex) { int length = strings.length; if (length == 0) return ""; if (length <= startIndex) return ""; StringBuilder words = new StringBuilder(strings[startIndex]); for (int i = startIndex + 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } }
消費者ReceiveLogsDirect 關鍵步驟見註釋說明
public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1) { System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); System.exit(1); } for (String severity : argv) { //創建exchange和queue之間關係並設置routingKey channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
Topics
提供更豐富的exchange到queue之間的路由規則。規則經過.分隔的routingKey,最高限制 255bytes。跟以前的全匹配routingKey不一樣,topic類型的exchange的routingKey主要增長了兩個特性。
- *表明一個單詞**。**
- **#** 表明0個或一個單詞。
生產者EmitLogTopic 關鍵步驟見註釋說明
public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = getRouting(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); } } private static String getRouting(String[] strings) { if (strings.length < 1) return "anonymous.info"; return strings[0]; } private static String getMessage(String[] strings) { if (strings.length < 2) return "Hello World!"; return joinStrings(strings, " ", 1); } private static String joinStrings(String[] strings, String delimiter, int startIndex) { int length = strings.length; if (length == 0) return ""; if (length < startIndex) return ""; StringBuilder words = new StringBuilder(strings[startIndex]); for (int i = startIndex + 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } }
消費者ReceiveLogsTopic 關鍵步驟見註釋說明
public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1) { System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); System.exit(1); } for (String bindingKey : argv) { channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
引入RabbitMQ帶來什麼挑戰
看到這,各位看官是否是越越欲試想在項目中引入RabbitMQ去優化如今的使用場景,那麼是否是咱們部署一個RabbitMQ服務,而後發送消息就高枕無憂了呢?其實在引入一箇中間件時,同時伴隨着一些問題,若是咱們對這些問題了解不夠深刻或者全面,那恭喜你將進入挖坑選手序列。爲了成爲一個靠譜的程序員,咱們要充分了解引入中間件給咱們 項目帶來的挑戰,才能在以後的應用上從容應對。下面列了下消息中間件中常見的幾類問題
- 消息丟失
- 消息重複
- 消息堆積
- RabbitMQ的可用性保證
以後的文章,咱們將逐個去講解上述問題的解決方案。 下一講:RabbitMQ消息可靠性傳輸
參考文檔
https://www.rabbitmq.com/ RabbitMQ官方文檔
tips:做者我的經驗有限,不足之處煩請指正。