這種模式只須要將隊列綁定到交換機.上便可,是不須要設置路由鍵的,如圖(生產者先將消息給交換機,而後交換機將消息所有發給隊列。每一個消費者接收到的消息如出一轍(不會像以前同樣將消息平均分配給消費者))java
生產者代碼ide
package item.com.fanout; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLog { //定義交換機名稱 private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws Exception { //建立鏈接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPassword("admin"); connectionFactory.setUsername("admin"); //建立一個鏈接 Connection connection = connectionFactory.newConnection(); //創建一個頻道 Channel channel = connection.createChannel(); //交換聲明 channel.exchangeDeclare(交換機名字,交換機類型) channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //發送消息 for (int i = 0; i <10 ; i++) { String message=i+""; // channel.basicPublish(交換機名稱,routingKey,props,消息) channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8")); } channel.close();; connection.close(); } }
消費者代碼(可寫多個)ui
package item.com.fanout; import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogs1 { //定義交換機名稱 private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws Exception{ //建立鏈接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setHost("127.0.0.1"); //建立一個鏈接 Connection connection = connectionFactory.newConnection(); //創建一個頻道 Channel channel = connection.createChannel(); //消息接收和隊列綁定,不和交換機綁定 //獲取一個非持久的隊列 String queueName = channel.queueDeclare().getQueue(); //將隊列綁定到交換機上 channel.queueBind(queueName,EXCHANGE_NAME,""); // channel.queueBind(隊列名稱, 交換機名稱, routingKey); Consumer consumer = new DefaultConsumer(channel) { //重寫 handleDelivery =》new handleDelivery而後回車 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("接收消息:"+message); } }; //消息消費 channel.basicConsume(queueName, true, consumer); // channel.basicConsume(queueName, 是否制動接收, consumer); } }
注意,先運行接收者而後在運行消息發送者,能夠看到兩個消費者接收到的消息同樣spa
根據RoutingKey匹配消息路由到指定的隊列code
首先-生產者代碼。blog
package item.com.direct; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLogDirect { //定義交換機名稱 private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception{ //建立鏈接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPassword("admin"); connectionFactory.setUsername("admin"); //建立一個鏈接 Connection connection = connectionFactory.newConnection(); //創建一個頻道 Channel channel = connection.createChannel(); //交換聲明 channel.exchangeDeclare(交換機名字,交換機類型) channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //定義不一樣的消息 =》根據 routingKey String message = "Hello World!"; String severity = "info";//routingKey channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8")); severity = "warning"; channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8")); severity = "error"; channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8")); channel.close(); connection.close(); } }
生產者代碼(此處代碼接收了三個routingKey)rabbitmq
package item.com.direct; import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogsDirect1 { //定義交換機名稱 private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws Exception { //建立鏈接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPassword("admin"); connectionFactory.setUsername("admin"); //建立一個鏈接 Connection connection = connectionFactory.newConnection(); //創建一個頻道 Channel channel = connection.createChannel(); //交換聲明 channel.exchangeDeclare(交換機名字,交換機類型) channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //消息接收和隊列綁定,不和交換機綁定 //獲取一個非持久的隊列 String queueName = channel.queueDeclare().getQueue(); //將隊列綁定到交換機上,一個交換機同時綁定三個queue channel.queueBind(queueName, EXCHANGE_NAME, "info"); channel.queueBind(queueName, EXCHANGE_NAME, "warning"); channel.queueBind(queueName, EXCHANGE_NAME, "error"); Consumer consumer = new DefaultConsumer(channel) { //重寫 handleDelivery =》new handleDelivery而後回車 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; //消息消費 channel.basicConsume(queueName,true,consumer); // channel.basicConsume(queueName, 是否制動接收, consumer); } }
和direct相比,多了模糊查詢進行篩選,功能比direct更增強大隊列
*能夠代替一個單詞
#能夠替代零個或多個單詞路由
生產者代碼get
package item.com.topic; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLogTopic { //定義交換機名稱 private static final String EXCHANGE_NAME="topic_logs"; public static void main(String[] args) throws Exception { //建立鏈接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); //建立一個鏈接 Connection connection = connectionFactory.newConnection(); //創建一個頻道 Channel channel = connection.createChannel(); //交換聲明 channel.exchangeDeclare(交換機名字,交換機類型) channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //定義不一樣的消息 =》根據 routingKey String message = "Animal World"; //定義多個routingKey(路由鍵) String[] routingKeys = new String[9]; routingKeys[0] = "quick.orange.rabbit"; routingKeys[1] = "lazy.orange.elephant"; routingKeys[2] = "quick.orange.fox"; routingKeys[3] = "lazy.brown.fox"; routingKeys[4] = "lazy.pink.rabbit"; routingKeys[5] = "quick.brown.fox"; routingKeys[6] = "orange"; routingKeys[7] = "quick.orange.male.rabbit"; routingKeys[8] = "lazy.orange.male.rabbit"; //發送消息 for (int i = 0; i <routingKeys.length ; i++) { channel.basicPublish(EXCHANGE_NAME,routingKeys[i],null,message.getBytes("UTF-8")); } channel.close(); connection.close(); } }
消費者代碼1
package item.com.topic; import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogsTopic1 { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String queueName = channel.queueDeclare().getQueue(); //指定bindingKey String bindingKey1 = "*.*.rabbit"; channel.queueBind(queueName, EXCHANGE_NAME, bindingKey1); String bindingKey2 = "lazy.#"; channel.queueBind(queueName, EXCHANGE_NAME, bindingKey2); Consumer consumer=new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println( " [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName,true,consumer); } }
消費者2代碼
package item.com.topic; import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogsTopic2 { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String queueName = channel.queueDeclare().getQueue(); //指定bindingKey String bindingKey = "*.orange.*"; channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); Consumer consumer=new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println( " [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName,true,consumer); } }
輸出結果【根據routingKey 進行相應的模糊查詢】