上圖表示一個消費者消費消息以後,不講消息直接存儲到隊列,而是使用兩個消費者各自聲明一個隊列,將各自的對應的隊列與交換機綁定。這樣每一個消費者都讀取的是自身所對應的隊列的全部消息,大達到了一個生產者生產消息,全部消費者都能消費的目的。java
將交換機類型設置爲fanout便可實現Publish/Subscribe服務器
package com.wangx.rabbitmq.sp; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer { private static final String EXCHANGE_NAME = "exchange"; public static void main(String[] args) throws IOException, TimeoutException { //建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置服務器主機 factory.setHost("127.0.0.1"); //設置用戶名 factory.setUsername("wangx"); //設置密碼 factory.setPassword("wangx"); //設置VirtualHost factory.setVirtualHost("/wangx"); Connection connection = null; Channel channel = null; try { connection = factory.newConnection(); channel = connection.createChannel(); //聲明交換機 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String message = "Hello World!"; //發送消息 for (int i = 0; i < 10; i++) { //發送消息 channel.basicPublish(EXCHANGE_NAME, "", null, (message + i).getBytes()); System.out.println(" [x] Sent '" + message + i + "'"); } }catch (Exception e) { }finally { channel.close(); connection.close(); } } }
與普通消息生產者不一樣的地方在於,發佈訂閱時必需要顯示的聲明一個交換機,而且在發送消息的時候,沒有隊列,必須設置交換機名稱。ide
package com.wangx.rabbitmq.sp; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer1 { /** * 隊列名字 */ private static String QUEUE_NAME = "queue1"; private static final String EXCHANGE_NAME = "exchange"; public static void main(String[] args){ //建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置服務器主機 factory.setHost("127.0.0.1"); //設置用戶名 factory.setUsername("wangx"); //設置密碼 factory.setPassword("wangx"); //設置VirtualHost factory.setVirtualHost("/wangx"); Connection connection = null; try { //建立鏈接 connection = factory.newConnection(); //建立消息通道 final Channel channel = connection.createChannel(); //聲明交換機 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //聲明隊列 channel.queueDeclare(QUEUE_NAME,false, false, false, null); //綁定隊列與交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); //消息服務器每次只向消費者發送一條消息 // channel.basicQos(1); Consumer consumer = new DefaultConsumer(channel){ //重寫DefaultConsumer中handleDelivery方法,在方法中獲取消息 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{ try { //消息沉睡一秒 Thread.sleep(1000); String message = new String(body, "UTF-8"); System.out.println("consumer1 收到消息 '" + message + "'"); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("consumer1 消息消費完成...."); } } }; //監聽消息 channel.basicConsume(QUEUE_NAME, true,consumer); }catch (Exception e) { e.printStackTrace(); }finally { } } }
這裏仍然使用的是兩個不一樣的消費者,而且將兩個不一樣的消費者分別聲明不一樣的消息隊列,而後將聲明的隊列與交換機進行綁定,使用channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "fanout");方法。啓動兩個消費者,將兩個不一樣的消息隊列註冊並綁定上去。啓動消息生產者發送消息,將會看到,兩個不一樣的消費者均能接收到生產者發送的全部消息。學習
結構圖:ui
發送消息時指定不一樣的key,交換機分發消息是根據key分發消息到不一樣的消費者隊列中。spa
將交換機模式改成DIRECT,在消費端設置了不一樣的key,至關於爲消息分個類,以便於在交換機分發消息時,將消息分發給持有該key的消費者,生產者代碼以下:code
package com.wangx.rabbitmq.routing; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer { private static final String EXCHANGE_NAME = "exchange-routing"; public static void main(String[] args) throws IOException, TimeoutException { //建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置服務器主機 factory.setHost("127.0.0.1"); //設置用戶名 factory.setUsername("wangx"); //設置密碼 factory.setPassword("wangx"); //設置VirtualHost factory.setVirtualHost("/wangx"); Connection connection = null; Channel channel = null; try { connection = factory.newConnection(); channel = connection.createChannel(); //聲明交換機 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String message = "Hello World!"; //發送消息 for (int i = 0; i < 10; i++) { if ( i % 2 == 0) { //發送消息,指定key channel.basicPublish(EXCHANGE_NAME, "key2", null, (message + i).getBytes()); System.out.println(" 偶數消息 '" + message + i + "'"); } else { //發送消息 channel.basicPublish(EXCHANGE_NAME, "key1", null, (message + i).getBytes()); System.out.println(" 奇數消息 '" + message + i + "'"); } } }catch (Exception e) { }finally { channel.close(); connection.close(); } } }
消費者代碼以下:blog
package com.wangx.rabbitmq.routing; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer1 { /** * 隊列名字 */ private static String QUEUE_NAME = "queue1"; private static final String EXCHANGE_NAME = "exchange-routing"; public static void main(String[] args){ //建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置服務器主機 factory.setHost("127.0.0.1"); //設置用戶名 factory.setUsername("wangx"); //設置密碼 factory.setPassword("wangx"); //設置VirtualHost factory.setVirtualHost("/wangx"); Connection connection = null; try { //建立鏈接 connection = factory.newConnection(); //建立消息通道 final Channel channel = connection.createChannel(); //聲明交換機 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //聲明隊列 channel.queueDeclare(QUEUE_NAME,false, false, false, null); //綁定隊列與交換機,指定接收的消息的key channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2"); //消息服務器每次只向消費者發送一條消息 // channel.basicQos(1); Consumer consumer = new DefaultConsumer(channel){ //重寫DefaultConsumer中handleDelivery方法,在方法中獲取消息 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{ try { //消息沉睡一秒 Thread.sleep(1000); String message = new String(body, "UTF-8"); System.out.println("consumer1 收到消息 '" + message + "'"); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("consumer1 消息消費完成...."); } } }; //監聽消息 channel.basicConsume(QUEUE_NAME, true,consumer); }catch (Exception e) { e.printStackTrace(); }finally { } } }
兩個消費者一個使用接收奇數,一個接收偶數消息(根據發送消息時判斷奇偶設置不一樣的key),接收時接收相應的key便可。rabbitmq
這樣就能夠指定接收想要接收的類型的消息了,至關於前面學習的mq的消息的過濾。隊列
將路由鍵和某模式進行匹配。此時隊列須要綁定要一個模式上。符號「#」匹配一個或多個詞,符號「*」匹配很少很多一個詞。所以「audit.#」可以匹配到「audit.irs.corporate」,可是「audit.*」 只會匹配到「audit.irs」。
生產者實現:
package com.wangx.rabbitmq.topic; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer { private static final String EXCHANGE_NAME = "exchange-topic"; public static void main(String[] args) throws IOException, TimeoutException { //建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置服務器主機 factory.setHost("127.0.0.1"); //設置用戶名 factory.setUsername("wangx"); //設置密碼 factory.setPassword("wangx"); //設置VirtualHost factory.setVirtualHost("/wangx"); Connection connection = null; Channel channel = null; try { connection = factory.newConnection(); channel = connection.createChannel(); //聲明交換機 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String message = "Hello World!"; //發送消息,綁定模式 channel.basicPublish(EXCHANGE_NAME, "key.one", null, ("one -" + message).getBytes()); channel.basicPublish(EXCHANGE_NAME, "key.two.msg", null, ("two -" + message).getBytes()); }catch (Exception e) { }finally { channel.close(); connection.close(); } } }
這裏發送消息時,綁定了key.one和key.two.msg兩種模式。接下來使用*號和#號兩種通配符得消費者,以下:
Consumer1
package com.wangx.rabbitmq.topic; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer1 { /** * 隊列名字 */ private static String QUEUE_NAME = "queue-topic"; private static final String EXCHANGE_NAME = "exchange-topic"; public static void main(String[] args){ //建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置服務器主機 factory.setHost("127.0.0.1"); //設置用戶名 factory.setUsername("wangx"); //設置密碼 factory.setPassword("wangx"); //設置VirtualHost factory.setVirtualHost("/wangx"); Connection connection = null; try { //建立鏈接 connection = factory.newConnection(); //建立消息通道 final Channel channel = connection.createChannel(); //聲明交換機 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //聲明隊列 channel.queueDeclare(QUEUE_NAME,false, false, false, null); //綁定隊列與交換機,使用通配符key.* 表示只能匹配key下的一個路徑, channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.*"); //消息服務器每次只向消費者發送一條消息 // channel.basicQos(1); Consumer consumer = new DefaultConsumer(channel){ //重寫DefaultConsumer中handleDelivery方法,在方法中獲取消息 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{ try { //消息沉睡一秒 Thread.sleep(1000); String message = new String(body, "UTF-8"); System.out.println("consumer1 收到消息 '" + message + "'"); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("consumer1 消息消費完成...."); } } }; //監聽消息 channel.basicConsume(QUEUE_NAME, true,consumer); }catch (Exception e) { e.printStackTrace(); }finally { } } }
因此在本例中只能匹配到key.one的消息,控制檯打印以下:
consumer1 收到消息 'one -Hello World!'
consumer1 消息消費完成....
Consumer2實現:
package com.wangx.rabbitmq.topic; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer2 { /** * 隊列名字 */ private static String QUEUE_NAME = "queue-topic-2"; private static final String EXCHANGE_NAME = "exchange-topic"; public static void main(String[] args){ //建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置服務器主機 factory.setHost("127.0.0.1"); //設置用戶名 factory.setUsername("wangx"); //設置密碼 factory.setPassword("wangx"); //設置VirtualHost factory.setVirtualHost("/wangx"); Connection connection = null; try { //建立鏈接 connection = factory.newConnection(); //建立消息通道 final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //聲明隊列 channel.queueDeclare(QUEUE_NAME,false, false, false, null); //這裏使用#號通配符,表示可以匹配到key下的任意路徑 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.#"); Consumer consumer = new DefaultConsumer(channel){ //重寫DefaultConsumer中handleDelivery方法,在方法中獲取消息 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{ try { //消息沉睡100ms Thread.sleep(100); String message = new String(body, "UTF-8"); System.out.println("consumer2 收到消息 '" + message + "'"); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("consumer2 消息消費完成...."); } } }; //監聽消息,第二個參數爲true時表示自動確認 channel.basicConsume(QUEUE_NAME, true,consumer); }catch (Exception e) { e.printStackTrace(); }finally { } } }
這裏使用了key.#表示匹配全部key.下的全部路徑,因此可以接收到全部以key開頭的消息,控制檯打印以下:
consumer2 收到消息 'one -Hello World!' consumer2 消息消費完成.... consumer2 收到消息 'two -Hello World!' consumer2 消息消費完成....
使用topic模式既能夠輕易的實現fanout模式,也能夠實現routing模式,同時提供了通配符的狀況下,使得匹配更加靈活,使用方式更加簡潔。