RabbitMQ學習:RabbitMQ的六種工做模式之簡單和工做模式(三)

上一篇:RabbitMQ學習:RabbitMQ的基本概念及RabbitMQ使用場景(二) --- http://www.javashuo.com/article/p-rcygqhuj-nw.htmljava


RabbitMQ的六種工做模式

首先開啓虛擬機上的rabbitmq服務器apache

# 啓動服務
systemctl start rabbitmq-server

1、簡單模式

RabbitMQ是一個消息中間件,你能夠想象它是一個郵局。當你把信件放到郵箱裏時,可以確信郵遞員會正確地遞送你的信件。RabbitMq就是一個郵箱、一個郵局和一個郵遞員。api

  • 發送消息的程序是生產者數組

  • 隊列就表明一個郵箱。雖然消息會流經RbbitMQ和你的應用程序,但消息只能被存儲在隊列裏。隊列存儲空間只受服務器內存和磁盤限制,它本質上是一個大的消息緩衝區。多個生產者能夠向同一個隊列發送消息,多個消費者也能夠從同一個隊列接收消息.服務器

  • 消費者等待從隊列接收消息maven

建立Rabbitmq-demo 的測試項

一、pom.xml

添加 slf4j 依賴, 和 rabbitmq amqp 依賴tcp

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.qile</groupId>
  <artifactId>rabbitmq</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  
  <dependencies>
		<dependency>
			<groupId>com.rabbitmq</groupId>
			<artifactId>amqp-client</artifactId>
			<version>5.4.3</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
			<version>1.8.0-alpha2</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
			<version>1.8.0-alpha2</version>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.8.0</version>
				<configuration>
					<source>1.8</source>
					<target>1.8</target>
				</configuration>
			</plugin>
		</plugins>
	</build>

</project>

2. 生產者發送消息--HelloWorld

package rabbitmq.simple;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Test1 {

	public static void main(String[] args) throws IOException, TimeoutException {
		
		/**
		 * 1. 創建鏈接
		 * 2. 建立隊列:helloworld
		 * 3. 向隊列發送數據
		 */
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.140");
		f.setPort(5672);
		f.setUsername("admin");
		f.setPassword("admin");
		
		/*
		 * 與rabbitmq服務器創建鏈接,
		 * rabbitmq服務器端使用的是nio,會複用tcp鏈接,
		 * 並開闢多個信道與客戶端通訊
		 * 以減輕服務器端創建鏈接的開銷
		 */
		Connection con = f.newConnection();
		//建立通道
		Channel c = con.createChannel();
		
		/*
		 * 聲明隊列,會在rabbitmq中建立一個隊列
		 * 若是已經建立過該隊列,就不能再使用其餘參數來建立
		 * 
		 * 參數含義:
		 *   -queue:      隊列名稱
		 *   -durable:    隊列持久化,true表示RabbitMQ重啓後隊列仍存在
		 *   -exclusive:  排他,true表示限制僅當前鏈接可用
		 *   -autoDelete: 當最後一個消費者斷開後,是否刪除隊列
		 *   -arguments:  其餘參數
		 */
    	c.queueDeclare("helloworld",false,false,false,null);
		
    	/*
		 * 發佈消息
		 * 這裏把消息向默認交換機發送.
		 * 默認交換機隱含與全部隊列綁定,routing key即爲隊列名稱
		 * 
		 * 參數含義:
		 * 	-exchange: 交換機名稱,空串表示默認交換機"(AMQP default)",不能用 null 
		 * 	-routingKey: 對於默認交換機,路由鍵就是目標隊列名稱
		 * 	-props: 其餘參數,例如頭信息
		 * 	-body: 消息內容byte[]數組
		 */
    	c.basicPublish("","helloworld",null, 
				("Hello World!" + System.currentTimeMillis()).getBytes());
		System.out.println("消息已發出");
		
		c.close();
		con.close();
	}
}

這時Run as 獲得
ide

在rabbitmq客戶端有:

以後編寫消費者接受消息學習

三、消費者接收消息

package rabbitmq.simple;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

public class Test2 {

	public static void main(String[] args) throws IOException, TimeoutException {
		
		/**
		 * 1. 創建鏈接
		 * 2. 建立隊列:helloworld
		 * 3. 向隊列發送數據
		 */
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.140");
		f.setPort(5672);
		f.setUsername("admin");
		f.setPassword("admin");
		Connection con = f.newConnection();  //建立鏈接
		Channel c = con.createChannel();     //建立通道
		
		//定義隊列,服務器沒有這個隊列會建立,如有什麼都不作
    	c.queueDeclare("helloworld",false,false,false,null);
		
    	//收到消息後用來處理消息的回調對象
    	DeliverCallback deliverCallback = new DeliverCallback() {
			@Override
			public void handle(String consumerTag, Delivery message) throws IOException {
				byte[] a = message.getBody();
				String msg = new String(a);
				System.out.println("收到" + msg);
			}
    	};
    	
    	//消費者取消時的回調對象
    	CancelCallback cancelCallback = new CancelCallback() {
			@Override
			public void handle(String consumerTag) throws IOException {
				
			}
		};
		
    	//開始消費數據
    	c.basicConsume("helloworld",true,deliverCallback,cancelCallback);
	}

}

此時,在以前所積累的兩條消息將會在你程序運轉之時,顯示出來,這是再去運轉生產者,將會直接顯示出發送的數據
測試

2、工做模式

工做隊列(即任務隊列)背後的主要思想是避免當即執行資源密集型任務,而且必須等待它完成。相反,咱們將任務安排在稍後完成。

咱們將任務封裝爲消息並將其發送到隊列。後臺運行的工做進程將獲取任務並最終執行任務。當運行多個消費者時,任務將在它們之間分發

使用任務隊列的一個優勢是可以輕鬆地並行工做。若是咱們正在積壓工做任務,咱們能夠添加更多工做進程,這樣就能夠輕鬆擴展。

一、生產者發送消息

這裏模擬耗時任務,發送的消息中,每一個點使工做進程暫停一秒鐘,例如"Hello…"將花費3秒鐘來處理

package rabbitmq.work;

import java.util.Scanner;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Test1 {
	public static void main(String[] args) throws Exception {
		/**
		 * 1. 創建鏈接
		 * 2. 建立隊列:helloworld
		 * 3. 向隊列發送數據
		 */
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.140");
		f.setPort(5672);
		f.setUsername("admin");
		f.setPassword("admin");
		
		Connection c = f.newConnection();  //建立鏈接
		Channel ch = c.createChannel();  //建立通道
		//參數:queue,durable,exclusive,autoDelete,arguments
		ch.queueDeclare("helloworld", false,false,false,null);

		/**
		 * 模擬耗時消息
		 * 發送的字符串中,有一個點字符,消費者處理的時候就暫停1秒
		 */
		//循環輸入消息發送到rabbitmq
		while (true) {
			System.out.print("輸入消息: ");
			String msg = new Scanner(System.in).nextLine();
			//若是輸入的是"exit"則結束生產者進程
			if ("exit".equals(msg)) {
				break;
			}
			//參數:exchage,routingKey,props,body
			ch.basicPublish("", "helloworld", null, msg.getBytes());
			System.out.println("消息已發送: "+msg);
		}

		c.close();
	}
}

二、消費者接收消息

package rabbitmq.work;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

public class Test2 {
	public static void main(String[] args) throws Exception {
		
		/**
		 * 1. 創建鏈接
		 * 2. 建立隊列:helloworld
		 * 3. 向隊列發送數據
		 */
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.140");
		f.setUsername("admin");
		f.setPassword("admin");
		Connection c = f.newConnection();  //建立鏈接
		Channel ch = c.createChannel();  //建立通道
		
		ch.queueDeclare("helloworld",false,false,false,null);
		System.out.println("等待接收數據");
		
		//收到消息後用來處理消息的回調對象
		DeliverCallback callback = new DeliverCallback() {
			@Override
			public void handle(String consumerTag, Delivery message) throws IOException {
				String msg = new String(message.getBody(), "UTF-8");
				System.out.println("收到: "+msg);

				//遍歷字符串中的字符,每一個點使進程暫停一秒
				for (int i = 0; i < msg.length(); i++) {
					if (msg.charAt(i)=='.') {
						try {
							Thread.sleep(1000);
						} catch (InterruptedException e) {
						}
					}
				}
				System.out.println("處理結束");
			}
		};
		
		//消費者取消時的回調對象
		CancelCallback cancel = new CancelCallback() {
			@Override
			public void handle(String consumerTag) throws IOException {
			}
		};
		
		ch.basicConsume("helloworld", true, callback, cancel);
	}
}

3.運行測試

運行:

  • 一個生產者
  • 兩個消費者

生產者發送多條消息
如:1,2,3,4,5,...兩個消費者分別收到:

  • 消費者一:1,3,5,...
  • 消費者二:2,4,...

rabbtimq在全部消費者中輪詢分佈消息,把消息均勻發送給全部消費者。

4.消息確認

一個消費者接收消息後,在消息沒有徹底處理完時就掛掉了,那麼這時會發生什麼呢?

就如今的代碼來講,rabbitmq把消息發送給消費者後,會當即刪除消息,那麼消費者掛掉後,它沒來得及處理的消息就會丟失

若是生產者發送如下消息:

    1…

    2

    3

    4

    5

    兩個消費者分別收到:

        消費者一: 1…, 3, 5
        消費者二: 2, 4

    當消費者一收到全部消息後,要話費7秒時間來處理第一條消息,這期間若是關閉該消費者,那麼1未處理完成,3,5則沒有被處理

咱們並不想丟失任何消息, 若是一個消費者掛掉,咱們想把它的任務消息派發給其餘消費者

爲了確保消息不會丟失,rabbitmq支持消息確認(回執)。當一個消息被消費者接收到而且執行完成後,消費者會發送一個ack (acknowledgment) 給rabbitmq服務器, 告訴他我已經執行完成了,你能夠把這條消息刪除了。

若是一個消費者沒有返回消息確認就掛掉了(信道關閉,鏈接關閉或者TCP連接丟失),rabbitmq就會明白,這個消息沒有被處理完成rabbitmq就會把這條消息從新放入隊列,若是在這時有其餘的消費者在線,那麼rabbitmq就會迅速的把這條消息傳遞給其餘的消費者,這樣就確保了沒有消息會丟失。

這裏不存在消息超時, rabbitmq只在消費者掛掉時從新分派消息, 即便消費者花很是久的時間來處理消息也能夠

手動消息確認默認是開啓的,前面的例子咱們經過autoAck=ture把它關閉了。咱們如今要把它設置爲false,而後工做進程處理完意向任務時,發送一個消息確認(回執)。

package rabbitmq.work;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

public class Test2 {
	public static void main(String[] args) throws Exception {
		
		/**
		 * 1. 創建鏈接
		 * 2. 建立隊列:helloworld
		 * 3. 向隊列發送數據
		 */
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.140");
		f.setUsername("admin");
		f.setPassword("admin");
		Connection c = f.newConnection();  //建立鏈接
		Channel ch = c.createChannel();  //建立通道
		
		//聲明隊列
		ch.queueDeclare("helloworld",false,false,false,null);
		System.out.println("等待接收數據");
		
		//收到消息後用來處理消息的回調對象
		DeliverCallback callback = new DeliverCallback() {
			@Override
			public void handle(String consumerTag, Delivery message) throws IOException {
				String msg = new String(message.getBody(), "UTF-8");
				System.out.println("收到: "+msg);
				for (int i = 0; i < msg.length(); i++) {
					if (msg.charAt(i)=='.') {
						try {
							Thread.sleep(1000);
						} catch (InterruptedException e) {
						}
					}
				}
				System.out.println("處理結束");
				//發送回執
				ch.basicAck(message.getEnvelope().getDeliveryTag(), false);
			}
		};
		
		//消費者取消時的回調對象
		CancelCallback cancel = new CancelCallback() {
			@Override
			public void handle(String consumerTag) throws IOException {
			}
		};
		
		//autoAck設置爲false,則須要手動確認發送回執
		ch.basicConsume("helloworld", false, callback, cancel);
	}
}

使用以上代碼,就算殺掉一個正在處理消息的工做進程也不會丟失任何消息,工做進程掛掉以後,沒有確認的消息就會被自動從新傳遞。

忘記確認(ack)是一個常見的錯誤, 這樣後果是很嚴重的, 因爲未確認的消息不會被釋放, rabbitmq會吃掉愈來愈多的內存

可使用下面命令打印工做隊列中未確認消息的數量

rabbitmqctl list_queues name messages_ready messages_unacknowledged

當處理消息時異常中斷, 能夠選擇讓消息重回隊列從新發送. nack 操做能夠是消息重回隊列, 可使用 basicNack() 方法:

// requeue爲true時重回隊列, 反之消息被丟棄或被髮送到死信隊列
c.basicNack(tag, multiple, requeue)

5.合理地分發

rabbitmq會一次把多個消息分發給消費者, 這樣可能形成有的消費者很是繁忙, 而其它消費者空閒. 而rabbitmq對此一無所知, 仍然會均勻的分發消息

咱們可使用 basicQos(1) 方法, 這告訴rabbitmq一次只向消費者發送一條消息, 在返回確認回執前, 不要向消費者發送新消息. 而是把消息發給下一個空閒的消費者

6.消息持久化

當rabbitmq關閉時, 咱們隊列中的消息仍然會丟失, 除非明確要求它不要丟失數據

要求rabbitmq不丟失數據要作以下兩點: 把隊列和消息都設置爲可持久化(durable)

隊列設置爲可持久化, 能夠在定義隊列時指定參數durable爲true

//第二個參數是持久化參數durable
ch.queueDeclare("helloworld", true, false, false, null);

因爲以前咱們已經定義過隊列"hello"是不可持久化的, 對已存在的隊列, rabbitmq不容許對其定義不一樣的參數, 不然會出錯, 因此這裏咱們定義一個不一樣名字的隊列"task_queue"

//定義一個新的隊列,名爲 task_queue
//第二個參數是持久化參數 durable
ch.queueDeclare("task_queue", true, false, false, null);

生產者和消費者代碼都要修改

這樣即便rabbitmq從新啓動, 隊列也不會丟失. 如今咱們再設置隊列中消息的持久化, 使用MessageProperties.PERSISTENT_TEXT_PLAIN參數

//第三個參數設置消息持久化
ch.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            msg.getBytes());

下面是"工做模式"最終完成的生產者和消費者代碼

7.生產者代碼

package rabbitmq.work;

import java.util.Scanner;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class Test3 {
	public static void main(String[] args) throws Exception {
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.140");
		f.setPort(5672);
		f.setUsername("admin");
		f.setPassword("admin");
		
		Connection c = f.newConnection();
		Channel ch = c.createChannel();
		
		//第二個參數設置隊列持久化
		ch.queueDeclare("task_queue", true,false,false,null);

		while (true) {
			System.out.print("輸入消息: ");
			String msg = new Scanner(System.in).nextLine();
			if ("exit".equals(msg)) {
				break;
			}
			
			//第三個參數設置消息持久化
			ch.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes("UTF-8"));
			System.out.println("消息已發送: "+msg);
		}

		c.close();
	}
}

8.消費者代碼

package rabbitmq.work;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

public class Test4 {
	public static void main(String[] args) throws Exception {
		ConnectionFactory f = new ConnectionFactory();
		f.setHost("192.168.64.140");
		f.setUsername("admin");
		f.setPassword("admin");
		Connection c = f.newConnection();
		Channel ch = c.createChannel();
		
		//定義一個新的隊列,名爲 task_queue
		//設定第二個參數是持久化參數 durable爲true
		ch.queueDeclare("task_queue",true,false,false,null);
		
		System.out.println("等待接收數據");
		
		ch.basicQos(1); //一次只接收一條消息
		
		//收到消息後用來處理消息的回調對象
		DeliverCallback callback = new DeliverCallback() {
			@Override
			public void handle(String consumerTag, Delivery message) throws IOException {
				String msg = new String(message.getBody(), "UTF-8");
				System.out.println("收到: "+msg);
				for (int i = 0; i < msg.length(); i++) {
					if (msg.charAt(i)=='.') {
						try {
							Thread.sleep(1000);
						} catch (InterruptedException e) {
						}
					}
				}
				System.out.println("處理結束");
				//發送回執
				ch.basicAck(message.getEnvelope().getDeliveryTag(), false);
			}
		};
		
		//消費者取消時的回調對象
		CancelCallback cancel = new CancelCallback() {
			@Override
			public void handle(String consumerTag) throws IOException {
			}
		};
		
		//autoAck設置爲false,則須要手動確認發送回執
		ch.basicConsume("task_queue", false, callback, cancel);
	}
}

9.總結

相關文章
相關標籤/搜索