Exchange屬性:java
消費者從Queue中獲取消息並消費。多個消費者能夠訂閱同一個Queue,這時Queue中的消息會被平均分攤給多個消費者進行處理,而不是每一個消費者都收到全部的消息並處理。安全
Message acknowledgment:消息確認,在消息確認機制下,收到回執纔會刪除消息,未收到回執而斷開了鏈接,消息會轉發給其餘消費者,若是忘記回執,會致使消息堆積,消費者重啓後會重複消費這些消息並重復執行業務邏輯。服務器
Message durability:消息持久化,設置消息持久化能夠避免絕大部分消息丟失,好比rabbitmq服務重啓,可是採用非持久化能夠提高隊列的處理效率。若是要確保消息的持久化,那麼消息對應的Exchange和Queue一樣要設置爲持久化。架構
另外,若是須要可靠性業務,須要設置持久化和ack機制,若是系統高吞吐,能夠設置爲非持久化、noack、自動刪除機制。併發
模擬這樣一個業務場景,用戶下單成功後,須要給用戶增長積分,同時還須要給用戶發送下單成功的消息,這是在電商業務中很常見的一個業務場景。dom
若是系統是微服務架構,可能用戶下單功能在訂單服務,給用戶增長積分的功能在積分服務,給用戶發送通知消息的功能在通知服務,各個服務之間解耦,互不影響。那麼要實現上述的業務場景,消息中間件rabbitmq是一個很好的選擇。分佈式
緣由以下:ide
實現思路:微服務
用戶下單成功後,rabbitmq發送一條消息至EXCHANGE.ORDER_CREATE
交換器,該交換器綁定了兩個隊列,QUEUE.ORDER_INCREASESCORE
、QUEUE.ORDER_NOTIFY
,消費者訂閱這兩個隊列分別用來處理增長積分、發送用戶通知。若是後續日誌系統還須要記錄下單的相關日誌,那麼咱們只須要再定義一個隊列並將其綁定到EXCHANGE.ORDER_CREATE
便可。高併發
下單發rabbitmq消息
package com.robot.rabbitmq; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.UUID; import java.util.concurrent.TimeoutException; /** * @author: 會跳舞的機器人 * @date: 2017/10/13 10:46 * @description: 模擬用戶下單以後發送rabbitmq消息 */ public class OrderCreator { // 交換器名稱 private static final String EXCHANGE = "EXCHANGE.ORDER_CREATE"; // 消息內容 private static String msg = "create order success"; /** * 模擬建立訂單後發送mq消息 */ public void createOrder() { System.out.println("下單成功,開始發送rabbitmq消息"); ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.12.44"); connectionFactory.setPort(56720); connectionFactory.setUsername("baibei"); connectionFactory.setPassword("baibei"); Connection connection; Channel channel; try { connection = connectionFactory.newConnection(); channel = connection.createChannel(); // 持久化 boolean durable = true; // topic類型 String type = "topic"; // 聲明交換器,若是交換器不存在則建立之 channel.exchangeDeclare(EXCHANGE, type, durable); String messgeId = UUID.randomUUID().toString(); // deliveryMode>=2表示設置消息持久化 AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().deliveryMode(2).messageId(messgeId).build(); // 發佈消息 String routingKey = "order_create"; channel.basicPublish(EXCHANGE, routingKey, props, msg.getBytes("utf-8")); connection.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
積分系統訂閱消息
package com.robot.rabbitmq; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.ShutdownSignalException; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author: 會跳舞的機器人 * @date: 2017/10/13 16:02 * @description: rabbitmq消費者,模擬下單成功後給用戶增長積分 */ public class IncreaseScoreConsumer implements Consumer { private Connection connection; private Channel channel; // 交換器名稱 private static final String EXCHANGE = "EXCHANGE.ORDER_CREATE"; // 增長積分隊列名稱 private static final String QUEUENAME = "QUEUE.ORDER_INCREASESCORE"; public void consume() { // 初始化rabbitmq鏈接信息 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.12.44"); connectionFactory.setPort(56720); connectionFactory.setUsername("baibei"); connectionFactory.setPassword("baibei"); try { connection = connectionFactory.newConnection(); channel = connection.createChannel(); // 聲明交換器 channel.exchangeDeclare(EXCHANGE, "topic", true); // 聲明隊列 channel.queueDeclare(QUEUENAME, true, false, false, null); // 交換器與隊列綁定並設置routingKey channel.queueBind(QUEUENAME, EXCHANGE, "order_create"); // 消費消息,callback是該類,關閉自動確認消息,在完成業務邏輯後手動確認確認 channel.basicConsume(QUEUENAME, false, this); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("《積分系統》收到訂單消息:" + msg + ",給用戶增長積分......"); // 手動確認消息 channel.basicAck(envelope.getDeliveryTag(), false); /** * channel.basicReject(envelope.getDeliveryTag(), false);該方法會丟棄掉隊列中的這條消息 * channel.basicReject(envelope.getDeliveryTag(), true);該方法會把消息從新放回隊列 * 通常系統會設定一個重試次數,若是超太重試次數,則會丟棄消息,反之則會把消息再放入隊列 */ } public void handleConsumeOk(String consumerTag) { } public void handleCancelOk(String consumerTag) { } public void handleCancel(String consumerTag) throws IOException { } public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { } public void handleRecoverOk(String consumerTag) { } }
通知系統訂閱消息
package com.robot.rabbitmq; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.ShutdownSignalException; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author: 會跳舞的機器人 * @date: 2017/10/13 16:20 * @description: rabbitmq消費者,模擬下單成功後給用戶發送通知 */ public class NotifyConsumer implements Consumer { private Connection connection; private Channel channel; // 交換器名稱 private static final String EXCHANGE = "EXCHANGE.ORDER_CREATE"; // 通知用戶下單成功通知隊列名稱 private static final String QUEUENAME = "QUEUE.ORDER_NOTIFY"; public void consume() { // 初始化rabbitmq鏈接信息 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.12.44"); connectionFactory.setPort(56720); connectionFactory.setUsername("baibei"); connectionFactory.setPassword("baibei"); try { connection = connectionFactory.newConnection(); channel = connection.createChannel(); // 聲明交換器 channel.exchangeDeclare(EXCHANGE, "topic", true); // 聲明隊列 channel.queueDeclare(QUEUENAME, true, false, false, null); // 交換器與隊列綁定並設置routingKey channel.queueBind(QUEUENAME, EXCHANGE, "order_create"); // 消費消息,callback是該類,關閉自動確認消息,在完成業務邏輯後手動確認確認 channel.basicConsume(QUEUENAME, false, this); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("《通知系統》收到訂單消息:" + msg + ",開始給用戶發送通知......"); // 手動確認消息 channel.basicAck(envelope.getDeliveryTag(), false); /** * channel.basicReject(envelope.getDeliveryTag(), false);該方法會丟棄掉隊列中的這條消息 * channel.basicReject(envelope.getDeliveryTag(), true);該方法會把消息從新放回隊列 * 通常系統會設定一個重試次數,若是超太重試次數,則會丟棄消息,反之則會把消息再放入隊列 */ } public void handleConsumeOk(String consumerTag) { } public void handleCancelOk(String consumerTag) { } public void handleCancel(String consumerTag) throws IOException { } public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { } public void handleRecoverOk(String consumerTag) { } }
測試
package com.robot.rabbitmq; /** * @author: 會跳舞的機器人 * @date: 2017/10/13 16:27 * @description: */ public class Test { public static void main(String[] args) { IncreaseScoreConsumer increaseScoreConsumer = new IncreaseScoreConsumer(); increaseScoreConsumer.consume(); NotifyConsumer notifyConsumer = new NotifyConsumer(); notifyConsumer.consume(); OrderCreator orderCreator = new OrderCreator(); for (int i = 0; i < 3; i++) { orderCreator.createOrder(); } } }
輸出:
下單成功,開始發送rabbitmq消息 《積分系統》收到訂單消息:create order success,給用戶增長積分...... 《通知系統》收到訂單消息:create order success,開始給用戶發送通知...... 下單成功,開始發送rabbitmq消息 《積分系統》收到訂單消息:create order success,給用戶增長積分...... 《通知系統》收到訂單消息:create order success,開始給用戶發送通知...... 下單成功,開始發送rabbitmq消息 《積分系統》收到訂單消息:create order success,給用戶增長積分...... 《通知系統》收到訂單消息:create order success,開始給用戶發送通知......
原文轉載:https://www.jianshu.com/p/2f55cd7a3e1c做者:會跳舞的機器人