exchanger分發消息一共有4種方式:headers,direct,fanout,topic;headers方式採用header匹配方式,同direct,性能不好,因此不被常用。java
demo: 參考服務器
引入依賴 <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.1.0</version> </dependency>
生產者:架構
package org.study.rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); //設置 RabbitMQ 地址 factory.setHost("localhost"); //創建到代理服務器到鏈接 Connection conn = factory.newConnection(); //得到信道 Channel channel = conn.createChannel(); //聲明交換器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); String routingKey = "hola"; //發佈消息 byte[] messageBodyBytes = "quit".getBytes(); channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes); channel.close(); conn.close(); } }
消費者ide
package org.study.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); //創建到代理服務器到鏈接 Connection conn = factory.newConnection(); //得到信道 final Channel channel = conn.createChannel(); //聲明交換器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); //聲明隊列 String queueName = channel.queueDeclare().getQueue(); String routingKey = "hola"; //綁定隊列,經過鍵 hola 將隊列和交換器綁定起來 channel.queueBind(queueName, exchangeName, routingKey); while(true) { //消費消息 boolean autoAck = false; String consumerTag = ""; channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); System.out.println("消費的路由鍵:" + routingKey); System.out.println("消費的內容類型:" + contentType); long deliveryTag = envelope.getDeliveryTag(); //確認消息 channel.basicAck(deliveryTag, false); System.out.println("消費的消息體內容:"); String bodyStr = new String(body, "UTF-8"); System.out.println(bodyStr); } }); } } }