模型:java
獲取 MQ 鏈接ide
public static Connection getConnection() throws IOException, TimeoutException { // 定義一個鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); // 設置服務地址 factory.setHost("127.0.0.1"); // AMQP 5672 factory.setPort(5672); // vhost factory.setVirtualHost("/vhost_ljf"); // 用戶名 factory.setUsername("ljf"); // 密碼 factory.setPassword("123456"); return factory.newConnection(); }
生產者生產消息fetch
public class Send { private static final String QUEUE_NAME = "test_simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { // 獲取一個鏈接 Connection connection = ConnectionUtils.getConnection(); // 從鏈接中獲取一個通道 Channel channel = connection.createChannel(); // 建立隊列聲明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String msg = "hello simple!"; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("--send msg: " + msg); channel.close(); connection.close(); } }
消費者接收消息spa
public class Recv { private static final String QUEUE_NAME = "test_simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { // 獲取鏈接 Connection connection = ConnectionUtils.getConnection(); // 建立通道 Channel channel = connection.createChannel(); // 隊列聲明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定義消費者 DefaultConsumer consumer = new DefaultConsumer(channel) { // 獲取到達的消息 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("recv: " + msg); } }; // 監聽隊列 channel.basicConsume(QUEUE_NAME, true, consumer); } }
簡單隊列的不足code
耦合性高,生產者一一對應消費者(若是我想有多個消費者消費隊列中的消息,這時候就不行了);blog
隊列名變動,這時候得同時變動。rabbitmq
模型隊列
爲何會出現工做隊列?ip
simple 隊列是一一對應的,並且咱們實際開發,生產者發送消息是絕不費力的,而消費者通常是要跟業務相結合的,消費者接收到消息以後就須要處理,可能須要花費時間,這時候隊列就會積壓了不少消息。內存
生產者
/** * |----C1 * P----Queue----| * |----C2 */ public class Send { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException ,InterruptedException{ // 獲取鏈接 Connection connection = ConnectionUtils.getConnection(); // 獲取 channel Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 50; i++) { String msg = "hello" + i; System.out.println("[WQ] send: " + msg); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); Thread.sleep(i*20); } channel.close(); connection.close(); } }
消費者
public class Recv1 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { // 獲取鏈接 Connection connection = ConnectionUtils.getConnection(); // 獲取 channel Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定義一個消費者 Consumer consumer = new DefaultConsumer(channel) { // 消息到達 觸發方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("[1] Recv msg: " + msg); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[1] done."); } } }; boolean autoAck = true; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
public class Recv2 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { // 獲取鏈接 Connection connection = ConnectionUtils.getConnection(); // 獲取 channel Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定義一個消費者 Consumer consumer = new DefaultConsumer(channel) { // 消息到達 觸發方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("[2] Recv msg: " + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[2] done."); } } }; boolean autoAck = true; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
現象
先運行消費者1和消費者2,再運行生產者
消費者1 和 消費者2 處理的消息數量是同樣多的。
消費者1:偶數
消費者2:奇數
這種方式叫作輪詢分發(round-robin),結果就是無論誰忙誰悠閒,都不會多給一個消息。
生產者
public class Send { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException ,InterruptedException{ // 獲取鏈接 Connection connection = ConnectionUtils.getConnection(); // 獲取 channel Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); /** * 每一個消費者:發送確認消息以前,消息隊列不發送下一個消息到消費者,一次只處理一個消息 */ int prefetchCount = 1; channel.basicQos(prefetchCount); for (int i = 0; i < 50; i++) { String msg = "hello" + i; System.out.println("[WQ] send: " + msg); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); Thread.sleep(i*5); } channel.close(); connection.close(); } }
消費者
public class Recv1 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { // 獲取鏈接 Connection connection = ConnectionUtils.getConnection(); // 獲取 channel final Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); // 保證一次只發送一個 // 定義一個消費者 Consumer consumer = new DefaultConsumer(channel) { // 消息到達 觸發方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("[1] Recv msg: " + msg); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[1] done."); // 手動回執 channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; // 自動應答 false channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
public class Recv2 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { // 獲取鏈接 Connection connection = ConnectionUtils.getConnection(); // 獲取 channel final Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); // 保證一次只發送一個 // 定義一個消費者 Consumer consumer = new DefaultConsumer(channel) { // 消息到達 觸發方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("[2] Recv msg: " + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[2] done."); // 手動回執 channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; // 自動應答 false channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
現象
消費者2 處理的消息比 消費者1 多,能者多勞。
boolean autoAck = false; // 自動應答 false channel.basicConsume(QUEUE_NAME, autoAck, consumer);
boolean autoAck = true;
(自動確認模式)一旦 rabbitmq 將消息分發給消費者,就會從內存中刪除;
這種狀況下,若是殺死正在執行的消費者,就會丟失正在處理的消息。
boolean autoAck = false;
(手動模式)若是一個消費者掛掉,就會交付給其餘消費者;
rabbitmq 支持消息應答,消費者發送一個消息應答,告訴 rabbitmq 這個消息我已經處理完成,能夠刪掉,而後 rabbitmq 就刪除內存中的消息。
消息應答默認是打開的,即爲 false;
若是 rabbitmq 掛了,消息任然會丟失。
// 聲明隊列 boolean durable = true; channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
注意:rabbitmq 不容許從新定義(不一樣參數)一個已存在的隊列