import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; /** * @author yezixun * @date 2019/11/7 14:35 */ public class ConnectionUtil { public static Connection getConnection() throws Exception { //定義鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置服務地址 factory.setHost("localhost"); //端口 factory.setPort(5672); //設置帳號信息,用戶名、密碼、vhost //默認帳號爲 factory.setVirtualHost("testhost"); factory.setUsername("admin"); factory.setPassword("admin"); // 經過工廠獲取鏈接 return factory.newConnection(); } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.zhonglouguairen.util.ConnectionUtil; /** * @author yezixun * @date 2019/11/7 14:38 */ public class Send { //隊列名稱 private final static String QUEUE_NAME = "q_test_01"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); // 從鏈接中建立通道 Channel channel = connection.createChannel(); // 聲明(建立)隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 消息內容 String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); //關閉通道和鏈接 channel.close(); connection.close(); } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.zhonglouguairen.util.ConnectionUtil; /** * @author yezixun * @date 2019/11/7 14:48 */ public class Recv { //隊列名稱 private final static String QUEUE_NAME = "q_test_01"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); // 從鏈接中建立通道 Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列 channel.basicConsume(QUEUE_NAME, true, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.zhonglouguairen.util.ConnectionUtil; /** * @author yezixun * @date 2019/11/7 15:26 */ public class Send { private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //發送0-99的數字 for (int i = 0; i < 100; i++) { // 消息內容 String message = "" + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); //線程沉睡 數字越大睡得越久 Thread.sleep(i * 10); } channel.close(); connection.close(); } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.zhonglouguairen.util.ConnectionUtil; /** * @author yezixun * @date 2019/11/7 15:25 */ public class Recv2 { private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 同一時刻服務器只會發一條消息給消費者 channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列 // true 自動確認:不管是否成功 都認爲成功消費 // false 手動確認:消費者成功消費後反饋,不反饋消息將一直處於不可用狀態 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); // 休眠1秒 模擬數據量 接收間隔比Recv1長 模擬性能(壓力)弱的服務器 Thread.sleep(1000); //下面這行註釋掉表示使用自動確認模式 開啓表示手動 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.zhonglouguairen.util.ConnectionUtil; /** * @author yezixun * @date 2019/11/7 15:23 */ public class Recv { private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 同一時刻服務器只會發一條消息給消費者 channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列 // true 自動確認:不管是否成功 都認爲成功消費 // false 手動確認:消費者成功消費後反饋,不反饋消息將一直處於不可用狀態 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [y] Received '" + message + "'"); //休眠 模擬數據量 接收間隔比Recv2短 模擬性能(壓力)強的服務器 Thread.sleep(10); // 返回確認狀態,註釋掉表示使用自動確認模式 開啓表示手動 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
消費者從隊列中獲取消息,服務端如何知道消息已經被消費呢?java
模式1:自動確認 只要消息從隊列中獲取,不管消費者獲取到消息後是否成功消息,都認爲是消息已經成功消費。服務器
模式2:手動確認 消費者從隊列中獲取消息後,服務器會將該消息標記爲不可用狀態,等待消費者的反饋,若是消費者一直沒有反饋,那麼該消息將一直處於不可用狀態。性能
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.zhonglouguairen.util.ConnectionUtil; /** * 發佈訂閱模式 * @author yezixun * @date 2019/11/7 15:54 */ public class Send { private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明exchange channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 消息內容 String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.zhonglouguairen.util.ConnectionUtil; /** * @author yezixun * @date 2019/11/7 15:55 */ public class Recv { private final static String QUEUE_NAME = "test_queue_work1"; private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 同一時刻服務器只會發一條消息給消費者 channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [Recv] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.zhonglouguairen.util.ConnectionUtil; /** * @author yezixun * @date 2019/11/7 15:55 */ public class Recv2 { private final static String QUEUE_NAME = "test_queue_work2"; private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 同一時刻服務器只會發一條消息給消費者 channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [Recv2] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.zhonglouguairen.util.ConnectionUtil; /** * 路由模式 * @author yezixun * @date 2019/11/7 16:26 */ public class Send { private final static String EXCHANGE_NAME = "test_exchange_direct"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 消息內容 String message = "添加"; channel.basicPublish(EXCHANGE_NAME, "select",null, message.getBytes()); //String message = "刪除"; //channel.basicPublish(EXCHANGE_NAME, "delete",null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.zhonglouguairen.util.ConnectionUtil; /** * @author yezixun * @date 2019/11/7 15:23 */ public class Recv { private final static String QUEUE_NAME = "test_queue_direct_1"; private final static String EXCHANGE_NAME = "test_exchange_direct"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //綁定隊列到交換機 刪除和修改 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"update"); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"delete"); // 同一時刻服務器只會發一條消息給消費者 channel.basicQos(1); //定義隊列消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列 // true 自動確認:不管是否成功 都認爲成功消費 // false 手動確認:消費者成功消費後反饋,不反饋消息將一直處於不可用狀態 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [y] Received '" + message + "'"); // 返回確認狀態,註釋掉表示使用自動確認模式 開啓表示手動 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.zhonglouguairen.util.ConnectionUtil; /** * @author yezixun * @date 2019/11/7 15:23 */ public class Recv2 { private final static String QUEUE_NAME = "test_queue_direct_2"; private final static String EXCHANGE_NAME = "test_exchange_direct"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //綁定隊列到交換機 查詢和增長 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"select"); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"insert"); // 同一時刻服務器只會發一條消息給消費者 channel.basicQos(1); //定義隊列消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列 // true 自動確認:不管是否成功 都認爲成功消費 // false 手動確認:消費者成功消費後反饋,不反饋消息將一直處於不可用狀態 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [y] Received '" + message + "'"); // 返回確認狀態,註釋掉表示使用自動確認模式 開啓表示手動 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.zhonglouguairen.util.ConnectionUtil; /** * @author yezixun * @date 2019/11/7 16:45 */ public class Send { private final static String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明exchange channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 消息內容 String message = "Hello World!!"; channel.basicPublish(EXCHANGE_NAME, "routekey.", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.zhonglouguairen.util.ConnectionUtil; /** * @author yezixun * @date 2019/11/7 16:46 */ public class Recv { private final static String QUEUE_NAME = "test_queue_topic_work_1"; private final static String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routekey.*"); // 同一時刻服務器只會發一條消息給消費者 channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [Recv_x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.zhonglouguairen.util.ConnectionUtil; /** * @author yezixun * @date 2019/11/7 16:46 */ public class Recv2 { private final static String QUEUE_NAME = "test_queue_topic_work_2"; private final static String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] argv) throws Exception { // 獲取到鏈接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*"); // 同一時刻服務器只會發一條消息給消費者 channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [Recv2_x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
————————————————.net
版權聲明:本文中大部分爲CSDN博主「niaobirdfly」的原創 詳細信息參考原做者線程
原文連接:https://blog.csdn.net/hellozpc/article/details/814369803d