上一篇:RabbitMQ學習:RabbitMQ的基本概念及RabbitMQ使用場景(二) --- http://www.javashuo.com/article/p-rcygqhuj-nw.htmljava
RabbitMQ的六種工做模式
首先開啓虛擬機上的rabbitmq服務器apache
# 啓動服務 systemctl start rabbitmq-server
1、簡單模式
RabbitMQ是一個消息中間件,你能夠想象它是一個郵局。當你把信件放到郵箱裏時,可以確信郵遞員會正確地遞送你的信件。RabbitMq就是一個郵箱、一個郵局和一個郵遞員。api
-
發送消息的程序是生產者數組
-
隊列就表明一個郵箱。雖然消息會流經RbbitMQ和你的應用程序,但消息只能被存儲在隊列裏。隊列存儲空間只受服務器內存和磁盤限制,它本質上是一個大的消息緩衝區。多個生產者能夠向同一個隊列發送消息,多個消費者也能夠從同一個隊列接收消息.服務器
-
消費者等待從隊列接收消息maven
建立Rabbitmq-demo 的測試項
一、pom.xml
添加 slf4j 依賴, 和 rabbitmq amqp 依賴tcp
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.qile</groupId> <artifactId>rabbitmq</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.4.3</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.8.0-alpha2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.8.0-alpha2</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
2. 生產者發送消息--HelloWorld
package rabbitmq.simple; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Test1 { public static void main(String[] args) throws IOException, TimeoutException { /** * 1. 創建鏈接 * 2. 建立隊列:helloworld * 3. 向隊列發送數據 */ ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setPort(5672); f.setUsername("admin"); f.setPassword("admin"); /* * 與rabbitmq服務器創建鏈接, * rabbitmq服務器端使用的是nio,會複用tcp鏈接, * 並開闢多個信道與客戶端通訊 * 以減輕服務器端創建鏈接的開銷 */ Connection con = f.newConnection(); //建立通道 Channel c = con.createChannel(); /* * 聲明隊列,會在rabbitmq中建立一個隊列 * 若是已經建立過該隊列,就不能再使用其餘參數來建立 * * 參數含義: * -queue: 隊列名稱 * -durable: 隊列持久化,true表示RabbitMQ重啓後隊列仍存在 * -exclusive: 排他,true表示限制僅當前鏈接可用 * -autoDelete: 當最後一個消費者斷開後,是否刪除隊列 * -arguments: 其餘參數 */ c.queueDeclare("helloworld",false,false,false,null); /* * 發佈消息 * 這裏把消息向默認交換機發送. * 默認交換機隱含與全部隊列綁定,routing key即爲隊列名稱 * * 參數含義: * -exchange: 交換機名稱,空串表示默認交換機"(AMQP default)",不能用 null * -routingKey: 對於默認交換機,路由鍵就是目標隊列名稱 * -props: 其餘參數,例如頭信息 * -body: 消息內容byte[]數組 */ c.basicPublish("","helloworld",null, ("Hello World!" + System.currentTimeMillis()).getBytes()); System.out.println("消息已發出"); c.close(); con.close(); } }
這時Run as 獲得
ide
在rabbitmq客戶端有:
以後編寫消費者接受消息學習
三、消費者接收消息
package rabbitmq.simple; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import com.rabbitmq.client.Delivery; public class Test2 { public static void main(String[] args) throws IOException, TimeoutException { /** * 1. 創建鏈接 * 2. 建立隊列:helloworld * 3. 向隊列發送數據 */ ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setPort(5672); f.setUsername("admin"); f.setPassword("admin"); Connection con = f.newConnection(); //建立鏈接 Channel c = con.createChannel(); //建立通道 //定義隊列,服務器沒有這個隊列會建立,如有什麼都不作 c.queueDeclare("helloworld",false,false,false,null); //收到消息後用來處理消息的回調對象 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { byte[] a = message.getBody(); String msg = new String(a); System.out.println("收到" + msg); } }; //消費者取消時的回調對象 CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }; //開始消費數據 c.basicConsume("helloworld",true,deliverCallback,cancelCallback); } }
此時,在以前所積累的兩條消息將會在你程序運轉之時,顯示出來,這是再去運轉生產者,將會直接顯示出發送的數據
測試
2、工做模式
工做隊列(即任務隊列)背後的主要思想是避免當即執行資源密集型任務,而且必須等待它完成。相反,咱們將任務安排在稍後完成。
咱們將任務封裝爲消息並將其發送到隊列。後臺運行的工做進程將獲取任務並最終執行任務。當運行多個消費者時,任務將在它們之間分發。
使用任務隊列的一個優勢是可以輕鬆地並行工做。若是咱們正在積壓工做任務,咱們能夠添加更多工做進程,這樣就能夠輕鬆擴展。
一、生產者發送消息
這裏模擬耗時任務,發送的消息中,每一個點使工做進程暫停一秒鐘,例如"Hello…"將花費3秒鐘來處理
package rabbitmq.work; import java.util.Scanner; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Test1 { public static void main(String[] args) throws Exception { /** * 1. 創建鏈接 * 2. 建立隊列:helloworld * 3. 向隊列發送數據 */ ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setPort(5672); f.setUsername("admin"); f.setPassword("admin"); Connection c = f.newConnection(); //建立鏈接 Channel ch = c.createChannel(); //建立通道 //參數:queue,durable,exclusive,autoDelete,arguments ch.queueDeclare("helloworld", false,false,false,null); /** * 模擬耗時消息 * 發送的字符串中,有一個點字符,消費者處理的時候就暫停1秒 */ //循環輸入消息發送到rabbitmq while (true) { System.out.print("輸入消息: "); String msg = new Scanner(System.in).nextLine(); //若是輸入的是"exit"則結束生產者進程 if ("exit".equals(msg)) { break; } //參數:exchage,routingKey,props,body ch.basicPublish("", "helloworld", null, msg.getBytes()); System.out.println("消息已發送: "+msg); } c.close(); } }
二、消費者接收消息
package rabbitmq.work; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import com.rabbitmq.client.Delivery; public class Test2 { public static void main(String[] args) throws Exception { /** * 1. 創建鏈接 * 2. 建立隊列:helloworld * 3. 向隊列發送數據 */ ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setUsername("admin"); f.setPassword("admin"); Connection c = f.newConnection(); //建立鏈接 Channel ch = c.createChannel(); //建立通道 ch.queueDeclare("helloworld",false,false,false,null); System.out.println("等待接收數據"); //收到消息後用來處理消息的回調對象 DeliverCallback callback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { String msg = new String(message.getBody(), "UTF-8"); System.out.println("收到: "+msg); //遍歷字符串中的字符,每一個點使進程暫停一秒 for (int i = 0; i < msg.length(); i++) { if (msg.charAt(i)=='.') { try { Thread.sleep(1000); } catch (InterruptedException e) { } } } System.out.println("處理結束"); } }; //消費者取消時的回調對象 CancelCallback cancel = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }; ch.basicConsume("helloworld", true, callback, cancel); } }
3.運行測試
運行:
- 一個生產者
- 兩個消費者
生產者發送多條消息
如:1,2,3,4,5,...兩個消費者分別收到:
- 消費者一:1,3,5,...
- 消費者二:2,4,...
rabbtimq在全部消費者中輪詢分佈消息,把消息均勻發送給全部消費者。
4.消息確認
一個消費者接收消息後,在消息沒有徹底處理完時就掛掉了,那麼這時會發生什麼呢?
就如今的代碼來講,rabbitmq把消息發送給消費者後,會當即刪除消息,那麼消費者掛掉後,它沒來得及處理的消息就會丟失
若是生產者發送如下消息: 1… 2 3 4 5 兩個消費者分別收到: 消費者一: 1…, 3, 5 消費者二: 2, 4 當消費者一收到全部消息後,要話費7秒時間來處理第一條消息,這期間若是關閉該消費者,那麼1未處理完成,3,5則沒有被處理
咱們並不想丟失任何消息, 若是一個消費者掛掉,咱們想把它的任務消息派發給其餘消費者
爲了確保消息不會丟失,rabbitmq支持消息確認(回執)。當一個消息被消費者接收到而且執行完成後,消費者會發送一個ack (acknowledgment) 給rabbitmq服務器, 告訴他我已經執行完成了,你能夠把這條消息刪除了。
若是一個消費者沒有返回消息確認就掛掉了(信道關閉,鏈接關閉或者TCP連接丟失),rabbitmq就會明白,這個消息沒有被處理完成,rabbitmq就會把這條消息從新放入隊列,若是在這時有其餘的消費者在線,那麼rabbitmq就會迅速的把這條消息傳遞給其餘的消費者,這樣就確保了沒有消息會丟失。
這裏不存在消息超時, rabbitmq只在消費者掛掉時從新分派消息, 即便消費者花很是久的時間來處理消息也能夠
手動消息確認默認是開啓的,前面的例子咱們經過autoAck=ture把它關閉了。咱們如今要把它設置爲false,而後工做進程處理完意向任務時,發送一個消息確認(回執)。
package rabbitmq.work; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import com.rabbitmq.client.Delivery; public class Test2 { public static void main(String[] args) throws Exception { /** * 1. 創建鏈接 * 2. 建立隊列:helloworld * 3. 向隊列發送數據 */ ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setUsername("admin"); f.setPassword("admin"); Connection c = f.newConnection(); //建立鏈接 Channel ch = c.createChannel(); //建立通道 //聲明隊列 ch.queueDeclare("helloworld",false,false,false,null); System.out.println("等待接收數據"); //收到消息後用來處理消息的回調對象 DeliverCallback callback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { String msg = new String(message.getBody(), "UTF-8"); System.out.println("收到: "+msg); for (int i = 0; i < msg.length(); i++) { if (msg.charAt(i)=='.') { try { Thread.sleep(1000); } catch (InterruptedException e) { } } } System.out.println("處理結束"); //發送回執 ch.basicAck(message.getEnvelope().getDeliveryTag(), false); } }; //消費者取消時的回調對象 CancelCallback cancel = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }; //autoAck設置爲false,則須要手動確認發送回執 ch.basicConsume("helloworld", false, callback, cancel); } }
使用以上代碼,就算殺掉一個正在處理消息的工做進程也不會丟失任何消息,工做進程掛掉以後,沒有確認的消息就會被自動從新傳遞。
忘記確認(ack)是一個常見的錯誤, 這樣後果是很嚴重的, 因爲未確認的消息不會被釋放, rabbitmq會吃掉愈來愈多的內存
可使用下面命令打印工做隊列中未確認消息的數量
rabbitmqctl list_queues name messages_ready messages_unacknowledged
當處理消息時異常中斷, 能夠選擇讓消息重回隊列從新發送. nack 操做能夠是消息重回隊列, 可使用 basicNack() 方法:
// requeue爲true時重回隊列, 反之消息被丟棄或被髮送到死信隊列 c.basicNack(tag, multiple, requeue)
5.合理地分發
rabbitmq會一次把多個消息分發給消費者, 這樣可能形成有的消費者很是繁忙, 而其它消費者空閒. 而rabbitmq對此一無所知, 仍然會均勻的分發消息
咱們可使用 basicQos(1) 方法, 這告訴rabbitmq一次只向消費者發送一條消息, 在返回確認回執前, 不要向消費者發送新消息. 而是把消息發給下一個空閒的消費者
6.消息持久化
當rabbitmq關閉時, 咱們隊列中的消息仍然會丟失, 除非明確要求它不要丟失數據
要求rabbitmq不丟失數據要作以下兩點: 把隊列和消息都設置爲可持久化(durable)
隊列設置爲可持久化, 能夠在定義隊列時指定參數durable爲true
//第二個參數是持久化參數durable ch.queueDeclare("helloworld", true, false, false, null);
因爲以前咱們已經定義過隊列"hello"是不可持久化的, 對已存在的隊列, rabbitmq不容許對其定義不一樣的參數, 不然會出錯, 因此這裏咱們定義一個不一樣名字的隊列"task_queue"
//定義一個新的隊列,名爲 task_queue //第二個參數是持久化參數 durable ch.queueDeclare("task_queue", true, false, false, null);
生產者和消費者代碼都要修改
這樣即便rabbitmq從新啓動, 隊列也不會丟失. 如今咱們再設置隊列中消息的持久化, 使用MessageProperties.PERSISTENT_TEXT_PLAIN
參數
//第三個參數設置消息持久化 ch.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
下面是"工做模式"最終完成的生產者和消費者代碼
7.生產者代碼
package rabbitmq.work; import java.util.Scanner; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class Test3 { public static void main(String[] args) throws Exception { ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setPort(5672); f.setUsername("admin"); f.setPassword("admin"); Connection c = f.newConnection(); Channel ch = c.createChannel(); //第二個參數設置隊列持久化 ch.queueDeclare("task_queue", true,false,false,null); while (true) { System.out.print("輸入消息: "); String msg = new Scanner(System.in).nextLine(); if ("exit".equals(msg)) { break; } //第三個參數設置消息持久化 ch.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes("UTF-8")); System.out.println("消息已發送: "+msg); } c.close(); } }
8.消費者代碼
package rabbitmq.work; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import com.rabbitmq.client.Delivery; public class Test4 { public static void main(String[] args) throws Exception { ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setUsername("admin"); f.setPassword("admin"); Connection c = f.newConnection(); Channel ch = c.createChannel(); //定義一個新的隊列,名爲 task_queue //設定第二個參數是持久化參數 durable爲true ch.queueDeclare("task_queue",true,false,false,null); System.out.println("等待接收數據"); ch.basicQos(1); //一次只接收一條消息 //收到消息後用來處理消息的回調對象 DeliverCallback callback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { String msg = new String(message.getBody(), "UTF-8"); System.out.println("收到: "+msg); for (int i = 0; i < msg.length(); i++) { if (msg.charAt(i)=='.') { try { Thread.sleep(1000); } catch (InterruptedException e) { } } } System.out.println("處理結束"); //發送回執 ch.basicAck(message.getEnvelope().getDeliveryTag(), false); } }; //消費者取消時的回調對象 CancelCallback cancel = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }; //autoAck設置爲false,則須要手動確認發送回執 ch.basicConsume("task_queue", false, callback, cancel); } }