原理:生產者將消息交給默認的交換機,交換機獲取消息後交給綁定這個生產者的隊列(投遞規則爲隊列名稱和routing key 相同的隊列),監聽當前隊列的消費者獲取信息並執行消費邏輯。java
場景:有一個oa系統,用戶經過接收手機驗證碼進行註冊,頁面上點擊獲取驗證碼後,將驗證碼放到消息隊列,而後短信服務從隊列中獲取到驗證碼,併發送給用戶。mysql
實現:git
生產者:github
public class Producter { public static void main(String[] args) throws Exception { // 1. 建立出連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("/"); // 2. 經過連接工廠建立連接對象 Connection connection = factory.newConnection(); // 3. 經過連接對象建立出channel Channel channel = connection.createChannel(); // 4. 經過channel發佈消息 /** * 四個參數: * 第一個參數是交換機的名稱 * 第二個參數是路由鍵 * 第三個參數標識消息的一些額外的屬性 * 第四個是消息的具體的內容 */ String message = "字節"; for(int i = 0;i < 5;i ++){ channel.basicPublish("","byte001",null,message.getBytes()); } // 5. 釋放資源,釋放channel 和 連接對象 channel.close(); connection.close(); } }
消費者:sql
public class Consumer { public static void main(String[] args) throws Exception { // 1. 建立出連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("/"); // 2. 經過連接工廠建立連接對象 Connection connection = factory.newConnection(); // 3. 經過連接對象建立出channel Channel channel = connection.createChannel(); // 4. 建立出消息隊列 /** * 第一個參數是消息隊列的名稱 * 第二個參數表示消息是否持久化 * 第三個參數標識消息隊列是否被channel獨佔 * 第四個參數標識是否自動刪除消息隊列,當消息隊列沒有綁定交換機後是否自動刪除 * 第五個是消息隊列擴展參數 */ String queueName = "byte001"; channel.queueDeclare(queueName, true, false, false, null); // 5. 建立消費者,對消息進行處理 DefaultConsumer consumer = new DefaultConsumer(channel) { /** * consumerTag 用來標識.能夠再監聽隊列時候設置 * envelope 信封,經過envelope能夠經過這個獲取到不少東西 * properties 額外的消息屬性 * body:消息體 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body, "UTF-8"); System.out.println("獲取到的消息:"+s); } }; // 6. 經過channel消費者和消息隊列關聯 /** * 第一個參數是消息隊列的名字 * 第二個參數是否自動簽收(即消費消息後告知服務器已被消費) * 第三個參數是消費者 */ channel.basicConsume(queueName, true, consumer); } }
原理:生產者將消息交給交換機,交換機交給綁定的隊列,隊列有多個消費者監聽,一條消息只能由一個消費者消費,這樣就造成了資源競爭,誰的資源空閒大,爭搶到的可能性就大。數據庫
場景:有一個電商平臺,有兩個訂單服務,用戶下單的時候,任意一個訂單服務消費用戶的下單請求生成訂單便可。不用兩個訂單服務同時消費用戶的下單請求。緩存
實現:服務器
生產者:併發
public class Producter { public static final String QUEUE_NAME = "byte002"; public static void main(String[] args) throws Exception { // 1. 建立出連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("/"); // 2. 經過連接工廠建立連接對象 Connection connection = factory.newConnection(); // 3. 經過連接對象建立出channel Channel channel = connection.createChannel(); // 申明隊列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); // 4. 經過channel發佈消息 /** * 四個參數: * 第一個參數是交換機的名稱 * 第二個參數是路由鍵 * 第三個參數標識消息的一些額外的屬性 * 第四個是消息的具體的內容 */ String message = "字節"; for(int i = 0;i < 100;i ++){ channel.basicPublish("",QUEUE_NAME,null,(message+i).getBytes()); } // 5. 釋放資源,釋放channel 和 連接對象 channel.close(); connection.close(); } }
消費者1:ide
public class Consumer { public static final String QUEUE_NAME = "byte002"; public static void main(String[] args) throws Exception { // 1. 建立出連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("/"); // 2. 經過連接工廠建立連接對象 Connection connection = factory.newConnection(); // 3. 經過連接對象建立出channel Channel channel = connection.createChannel(); // 4. 建立出消息隊列 /** * 第一個參數是消息隊列的名稱 * 第二個參數表示消息是否持久化 * 第三個參數標識消息隊列是否被channel獨佔 * 第四個參數標識是否自動刪除消息隊列,當消息隊列沒有綁定交換機後是否自動刪除 * 第五個是消息隊列擴展參數 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); // 告訴服務器,在咱們沒有確認當前消息時不要給咱們發送新的消息 // 5. 建立消費者,對消息進行處理 DefaultConsumer consumer = new DefaultConsumer(channel) { /** * consumerTag 用來標識.能夠再監聽隊列時候設置 * envelope 信封,經過envelope能夠經過這個獲取到不少東西 * properties 額外的消息屬性 * body:消息體 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body, "UTF-8"); System.out.println("消費者1收到的內容:"+s); try { Thread.sleep(10); // 模擬消費耗時 } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag(),false); // 參數2false爲確認收到消息,true爲拒絕收到消息 } }; // 6. 經過channel消費者和消息隊列關聯 /** * 第一個參數是消息隊列的名字 * 第二個參數是否自動簽收(即消費消息後告知服務器已被消費) * 第三個參數是消費者 */ channel.basicConsume(QUEUE_NAME, false, consumer); } }
消費者2:
public class Consumer2 { public static final String QUEUE_NAME = "byte002"; public static void main(String[] args) throws Exception { // 1. 建立出連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("/"); // 2. 經過連接工廠建立連接對象 Connection connection = factory.newConnection(); // 3. 經過連接對象建立出channel Channel channel = connection.createChannel(); // 4. 建立出消息隊列 /** * 第一個參數是消息隊列的名稱 * 第二個參數表示消息是否持久化 * 第三個參數標識消息隊列是否被channel獨佔 * 第四個參數標識是否自動刪除消息隊列,當消息隊列沒有綁定交換機後是否自動刪除 * 第五個是消息隊列擴展參數 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); // 告訴服務器,在咱們沒有確認當前消息時不要給咱們發送新的消息 // 5. 建立消費者,對消息進行處理 DefaultConsumer consumer = new DefaultConsumer(channel) { /** * consumerTag 用來標識.能夠再監聽隊列時候設置 * envelope 信封,經過envelope能夠經過這個獲取到不少東西 * properties 額外的消息屬性 * body:消息體 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body, "UTF-8"); System.out.println("消費者2收到的內容:"+s); try { Thread.sleep(500); // 模擬消費耗時 } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag(),false); // 參數2false爲確認收到消息,true爲拒絕收到消息 } }; // 6. 經過channel消費者和消息隊列關聯 /** * 第一個參數是消息隊列的名字 * 第二個參數是否自動簽收(即消費消息後告知服務器已被消費) * 第三個參數是消費者 */ channel.basicConsume(QUEUE_NAME, false, consumer); } }
保證資源競爭的代碼就是這一行channel.basicQos(1);若是不加這一行,咱們會發現兩個消費者是輪詢消費消息的。
原理:生產者將消息扔給交換機,交換機類型是fanout,不一樣的隊列註冊到交換機上,不一樣的消費註冊在不一樣的隊列上。全部消費者都會收到消息。
場景:有一個商城,咱們新添加一個商品後,可能同時須要去更新緩存和數據庫。
實現:
生產者:
public class Producter { // 定義交換機的名字 public static final String EXCHANGE_NAME="byte003"; public static void main(String[] args) throws Exception { // 1. 建立出連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("/"); // 2. 經過連接工廠建立連接對象 Connection connection = factory.newConnection(); // 3. 經過連接對象建立出channel Channel channel = connection.createChannel(); // 定義一個交換機,類型是fanout channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); // 由於消息先發到交換機,交換機沒有保存功能,因此若是沒有消費者,消息會丟失 channel.basicPublish(EXCHANGE_NAME,"",null,"發佈訂閱模式的消息".getBytes()); channel.close(); connection.close(); } }
消費者1:
public class Consumer1 { public static final String EXCHANGE_NAME="byte003"; public static void main(String[] args) throws Exception { // 1. 建立出連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("/"); // 2. 經過連接工廠建立連接對象 Connection connection = factory.newConnection(); // 3. 經過連接對象建立出channel Channel channel = connection.createChannel(); String queueName = "queue003"; channel.queueDeclare(queueName,false,false,false,null); // 綁定隊列到交換機 channel.queueBind(queueName,EXCHANGE_NAME,""); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { /** * consumerTag 用來標識.能夠再監聽隊列時候設置 * envelope 信封,經過envelope能夠經過這個獲取到不少東西 * properties 額外的消息屬性 * body:消息體 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body, "UTF-8"); System.out.println("消費者1:"+s); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(queueName,false,consumer); } }
消費者2:
public class Consumer2 { public static final String EXCHANGE_NAME="byte003"; public static void main(String[] args) throws Exception { // 1. 建立出連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("/"); // 2. 經過連接工廠建立連接對象 Connection connection = factory.newConnection(); // 3. 經過連接對象建立出channel Channel channel = connection.createChannel(); String queueName = "queue004"; channel.queueDeclare(queueName,false,false,false,null); // 綁定隊列到交換機 channel.queueBind(queueName,EXCHANGE_NAME,""); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { /** * consumerTag 用來標識.能夠再監聽隊列時候設置 * envelope 信封,經過envelope能夠經過這個獲取到不少東西 * properties 額外的消息屬性 * body:消息體 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body, "UTF-8"); System.out.println("消費者2:"+s); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(queueName,false,consumer); } }
須要注意的一點就是交換機沒有保存功能,若是沒有消費者,則消息會丟失。
原理:生產者將消息發送給交換機,消息攜帶具體的routingkey。交換機類型是direct,接收到消息中的routingkey,比對與之綁定的隊列的routingkey,分發到不一樣的隊列上。
場景:仍是同樣,有一個商城,新添加了一個商品,實時性不是很高,只須要添加到數據庫便可,不用刷新緩存。
實現:
生產者:
public class Producter { // 定義交換機的名字 public static final String EXCHANGE_NAME="byte004"; public static void main(String[] args) throws Exception { // 1. 建立出連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("/"); // 2. 經過連接工廠建立連接對象 Connection connection = factory.newConnection(); // 3. 經過連接對象建立出channel Channel channel = connection.createChannel(); // 定義一個交換機,類型是direct channel.exchangeDeclare(EXCHANGE_NAME,"direct"); // 由於消息先發到交換機,交換機沒有保存功能,因此若是沒有消費者,消息會丟失 channel.basicPublish(EXCHANGE_NAME,"key1",null,"發佈路由模式的消息".getBytes()); channel.close(); connection.close(); } }
消費者1:
public class Consumer1 { public static final String EXCHANGE_NAME="byte004"; public static void main(String[] args) throws Exception { // 1. 建立出連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("/"); // 2. 經過連接工廠建立連接對象 Connection connection = factory.newConnection(); // 3. 經過連接對象建立出channel Channel channel = connection.createChannel(); String queueName = "queue005"; channel.queueDeclare(queueName,false,false,false,null); // 綁定隊列到交換機 /** * 參數3是routingkey,只有和它同樣的routingkey的消息纔會被當前消費者收到 */ channel.queueBind(queueName,EXCHANGE_NAME,"key1"); // 若是要接收多個routingkey的消息,在執行一次上面的代碼便可,以下 channel.queueBind(queueName,EXCHANGE_NAME,"key2"); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { /** * consumerTag 用來標識.能夠再監聽隊列時候設置 * envelope 信封,經過envelope能夠經過這個獲取到不少東西 * properties 額外的消息屬性 * body:消息體 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body, "UTF-8"); System.out.println("消費者1:"+s); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(queueName,false,consumer); } }
消費者2:
public class Consumer2 { public static final String EXCHANGE_NAME="byte004"; public static void main(String[] args) throws Exception { // 1. 建立出連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("/"); // 2. 經過連接工廠建立連接對象 Connection connection = factory.newConnection(); // 3. 經過連接對象建立出channel Channel channel = connection.createChannel(); String queueName = "queue006"; channel.queueDeclare(queueName,false,false,false,null); // 綁定隊列到交換機 channel.queueBind(queueName,EXCHANGE_NAME,"key2"); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { /** * consumerTag 用來標識.能夠再監聽隊列時候設置 * envelope 信封,經過envelope能夠經過這個獲取到不少東西 * properties 額外的消息屬性 * body:消息體 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body, "UTF-8"); System.out.println("消費者2:"+s); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(queueName,false,consumer); } }
原理:路由模式的一種,路由功能添加了模糊匹配。星號(*)表明1個單詞,#號(#)表明一個或多個單詞。具體可參考路由模式。
場景:仍是同樣,有一個商城,新添加了一個商品,實時性不是很高,只須要添加到數據庫便可,數據庫包含了主數據庫mysql1和從數據庫mysql2的內容,不用刷新緩存。
實現:
生產者:
public class Producter { // 定義交換機的名字 public static final String EXCHANGE_NAME="byte004"; public static void main(String[] args) throws Exception { // 1. 建立出連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("/"); // 2. 經過連接工廠建立連接對象 Connection connection = factory.newConnection(); // 3. 經過連接對象建立出channel Channel channel = connection.createChannel(); // 定義一個交換機,類型是topic channel.exchangeDeclare(EXCHANGE_NAME,"topic"); // 由於消息先發到交換機,交換機沒有保存功能,因此若是沒有消費者,消息會丟失 channel.basicPublish(EXCHANGE_NAME,"key.1.2",null,"發佈路由模式的消息".getBytes()); channel.close(); connection.close(); } }
消費者1:
public class Consumer1 { public static final String EXCHANGE_NAME="byte004"; public static void main(String[] args) throws Exception { // 1. 建立出連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("/"); // 2. 經過連接工廠建立連接對象 Connection connection = factory.newConnection(); // 3. 經過連接對象建立出channel Channel channel = connection.createChannel(); String queueName = "queue005"; channel.queueDeclare(queueName,false,false,false,null); // 綁定隊列到交換機 /** * 參數3是routingkey,只有和它同樣的routingkey的消息纔會被當前消費者收到 */ channel.queueBind(queueName,EXCHANGE_NAME,"key.*"); // 若是要接收多個routingkey的消息,在執行一次上面的代碼便可,以下 channel.queueBind(queueName,EXCHANGE_NAME,"abc.#"); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { /** * consumerTag 用來標識.能夠再監聽隊列時候設置 * envelope 信封,經過envelope能夠經過這個獲取到不少東西 * properties 額外的消息屬性 * body:消息體 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body, "UTF-8"); System.out.println("消費者1:"+s); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(queueName,false,consumer); } }
消費者2:
public class Consumer2 { public static final String EXCHANGE_NAME="byte004"; public static void main(String[] args) throws Exception { // 1. 建立出連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("/"); // 2. 經過連接工廠建立連接對象 Connection connection = factory.newConnection(); // 3. 經過連接對象建立出channel Channel channel = connection.createChannel(); String queueName = "queue006"; channel.queueDeclare(queueName,false,false,false,null); // 綁定隊列到交換機 channel.queueBind(queueName,EXCHANGE_NAME,"key.#"); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { /** * consumerTag 用來標識.能夠再監聽隊列時候設置 * envelope 信封,經過envelope能夠經過這個獲取到不少東西 * properties 額外的消息屬性 * body:消息體 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body, "UTF-8"); System.out.println("消費者2:"+s); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume(queueName,false,consumer); } }
代碼已上傳:
github地址: https://github.com/binzh303/zhixie-code-example
gitee地址: https://gitee.com/javaXiaoCaiJi/zhixie-code-example
若是文章對您有幫助,請記得點贊關注喲~ 歡迎你們關注個人公衆號:字節傳說,每日推送技術文章供你們學習參考。