rabbitMQ 的交換器有四種類型:direct、fanout、topic、headershtml
如下是具體的代碼:java
direct:路由鍵只能所有匹配,才能進入到指定隊列中。其餘使用json
direct生產者api
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.util.Arrays; import java.util.List; /** * direct 生產者 */
public class DirectPro { public final static String EXCHANGE_NAME = "direct_exchange";//direct交換器名稱
public final static Integer SEND_NUM = 10;//發送消息次數
public static void main(String[] args) throws Exception { //建立鏈接工廠,鏈接RabbitMQ
ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("IP");//端口號、用戶名、密碼能夠使用默認的
connectionFactory.setUsername("用戶名"); connectionFactory.setPassword("密碼"); connectionFactory.setPort(5672); //建立鏈接
Connection connection = connectionFactory.newConnection(); //建立信道
Channel channel = connection.createChannel(); //在信道中設置交換器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //交換器和隊列綁定放到消費者進行 //自定義路由鍵
List<String> routeKey = Arrays.asList("key1","key2","key3"); //發送消息
for (int i=0;i<SEND_NUM;i++){ String key = routeKey.get(i%routeKey.size());//發送的key
String msg = "hello rabbitmq"+i;//發送的消息 //消息進行發送
channel.basicPublish(EXCHANGE_NAME,key,null,msg.getBytes()); System.out.println("send:"+key+"==="+msg); } channel.close(); connection.close(); } }
direct消費者dom
import com.rabbitmq.client.*; import java.io.IOException; /** * direct 消費者 */
public class DirectCon { public final static String EXCHANGE_NAME = "direct_exchange";//direct交換器名稱
public final static String QUEUE_NAME = "queue_name"; public static void main(String[] args) throws Exception { //建立鏈接工廠,鏈接RabbitMQ
ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("IP");//端口號、用戶名、密碼能夠使用默認的
connectionFactory.setUsername("用戶名"); connectionFactory.setPassword("密碼"); connectionFactory.setPort(5672); //建立鏈接
Connection connection = connectionFactory.newConnection(); //建立信道
Channel channel = connection.createChannel(); //在信道中設置交換器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //聲明隊列
channel.queueDeclare(QUEUE_NAME,false,false,false,null); //交換器和隊列綁定
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"key1"); System.out.println("waiting message....."); //聲明消費者
final Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"utf-8"); System.out.println("Received:"+envelope.getRoutingKey()+"========"+message); } }; //消費者在指定的對隊列上消費
channel.basicConsume(QUEUE_NAME,true,consumer); } }
3:執行結果:首先啓動消費者,再啓動發送者ide
生產者ui
消費者spa
fanout:消息能發送到全部隊列上,跟路由鍵沒有任何關係。code
fanout生產者:從新定義一個交換器,只需將交換器設置成fanout就能夠htm
//在信道中設置交換器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
fanout消費者:從新定義一個交換器和隊列,將交換器設置成fanout,綁定的key能夠隨便寫。
//在信道中設置交換器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//聲明隊列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//交換器和隊列綁定
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"abc");
結果
生產者都同樣
消費者
topic:按照*、#的匹配規則,進入到對應的隊列
topic生產者:只需將交換器設置成topic,路由鍵的設置必須是用點. 進行分割("key1.k","key2.k","key3.k")
topic消費者:從新定義一個交換器和隊列
//交換器和隊列綁定
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"key2.*");
這樣子就只能匹配key2.開頭的
結果:生產者都同樣
消費者
headers:是根據頭部的消息映射到隊列的。特殊的值x-match:all(所有匹配)、any(任何一個)。
生產者
//在信道中設置交換器 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS); //設置要發送headers值 Map<String, Object> heardersMap = new HashMap<String, Object>(); heardersMap.put("api", "login"); heardersMap.put("version", 1.0); heardersMap.put("radom", UUID.randomUUID().toString()); //設置消息的屬性 AMQP.BasicProperties pro = new AMQP.BasicProperties.Builder() .headers(heardersMap) .build(); //發送消息 for (int i=0;i<SEND_NUM;i++){ String msg = "hello rabbitmq"+i;//發送的消息 //消息進行發送 channel.basicPublish(EXCHANGE_NAME,"",pro,msg.getBytes()); System.out.println("send:"+msg); }
消費者
//在信道中設置交換器 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS); Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-match", "any"); arguments.put("api", "login"); arguments.put("version", 1.0); arguments.put("dataType", "json"); //交換器和隊列綁定 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName,EXCHANGE_NAME,"",arguments); System.out.println("waiting message....."); //聲明消費者 final Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"utf-8"); System.out.println(" [HeaderRecv] Received '" + properties.getHeaders() + "':'" + message + "'"); } };
結果:發送者同樣
消費者
以上就是Java對原生的RabbitMQ基本使用。