RabbitMQ必備核心知識

如今不少知名的互聯網公司都有用到RabbitMQ,其性能,可擴展性讓不少大公司青睞於使用它,不過想要徹底使用好RabbitMQ須要掌握其核心的一些概念,這裏就說說掌握RabbitMQ所需的必要知識java

生產者與消費者

生產者: 建立消息,而後發送到代理服務器(RabbitMQ)的程序web

消費者:鏈接到代理服務器,並訂閱到隊列上接收消息shell

消息流程

AMQP協議規定,AMQP消息必須有三部分,交換機,隊列和綁定。生產者把消息發送到交換機,交換機與隊列的綁定關係決定了消息如何路由到特定的隊列,最終被消費者接收。編程

如圖

Note: 消息是不能直接到達隊列(Queue)的api

交換機

消息實際上投遞到的是交換機,具體路由到那個隊列由交換機根據路由鍵(routing key)完成。bash

  • 當你發消息到代理服務器時,即使路由鍵是空的,RabbitMQ也會將其和使用的路由鍵進行匹配。若是路由的消息不匹配任何綁定模式,消息將會進入黑洞。

交換機在隊列與消息中間起到了中間層的做用,有了交換機咱們能夠實現更靈活的功能,RabbitMQ中有三種經常使用的交換機類型:服務器

  • direct: 若是路由鍵匹配,消息就投遞到對應的隊列
  • fanout:投遞消息給全部綁定在當前交換機上面的隊列
  • topic:容許實現有趣的消息通訊場景,使得5不一樣源頭的消息可以達到同一個隊列。topic隊列名稱有兩個特殊的關鍵字。
    • * 能夠替換一個單詞
    • # 能夠替換全部的單詞

能夠理解,direct爲1v1, fanout爲1v全部,topic比較靈活,能夠1v任意。併發

image.png

虛擬主機

每個虛擬主機(vhost)至關於mini版的RabbitMQ服務器,擁有本身的隊列,交換機和綁定,權限... 這使得一個RabbitMQ服務衆多的應用程序,而不會互相沖突。ide

rabbitMQ默認的虛擬主機爲: "/" ,通常咱們在建立Rabbit的用戶時會再給用戶分配一個虛擬主機。高併發

操做虛擬主機,除了命令行以外還有一個web管理頁面

#建立虛擬主機
rabbitmqctl add vhost [vhost_name]
#刪除虛擬主機
rabbitmqctl delete vhost [vhost_name]
#列出虛擬主機
rabbitmqctl list_vhosts
複製代碼

image.png

消息投遞策略

默認狀況下RabbitMQ的隊列和交換機在RabbitMQ服務器重啓以後會消失,緣由在於隊列和交換機的durable屬性,該屬性默認狀況下爲false.

能從AMQP服務器崩潰中恢復的消息稱爲持久化消息,若是想要從崩潰中恢復那麼消息必須

  • 投遞模式設置2,來標記消息爲持久化
  • 發送到持久化的交換機
  • 到到持久化的隊列

缺點:消息寫入磁盤性能差不少。除非特別關鍵的消息會使用

關鍵API

以上都是概念性的內容,實際咱們仍是要經過編程來實現咱們的目的,RabbitMQ的客戶端api提供了不少功能,經過看代碼,來了解它的強大之處。

基本步驟以前的RabbitMQ快速入門已經提過了,Channel類是關鍵的部分:包含了不少咱們想要的功能

image.png

消息確認

生成端能夠添加監聽事件:

channel.addConfirmListener(new ConfirmListener() {
			@Override
			public void handleNack(long deliveryTag, boolean multiple) throws IOException {
				System.err.println("-------no ack!-----------");
			}
			
			@Override
			public void handleAck(long deliveryTag, boolean multiple) throws IOException {
				System.err.println("-------ack!-----------");
			}
		});
複製代碼

消費端能夠確認消息狀態:

public class MyConsumer extends DefaultConsumer {


	private Channel channel ;
	
	public MyConsumer(Channel channel) {
		super(channel);
		this.channel = channel;
	}

	@Override
	public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
		System.err.println("-----------consume message----------");
		System.err.println("body: " + new String(body));
		try {
			Thread.sleep(2000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		if((Integer)properties.getHeaders().get("num") == 0) {
			channel.basicNack(envelope.getDeliveryTag(), false, true);
		} else {
			channel.basicAck(envelope.getDeliveryTag(), false);
		}
	}
}
複製代碼

channel.basicAck與basicNack最後一個參數指定消息是否重回隊列。

監聽不可達消息

咱們的消息生產者經過指定交換機和路由鍵來把消息送到隊列中,但有時候指定的路由鍵不存在,或者交換機不存在,那麼消息就會return,咱們能夠經過添加return listener來實現:

channel.addReturnListener(new ReturnListener() {
			@Override
			public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, BasicProperties properties, byte[] body) throws IOException {
				
				System.err.println("---------handle return----------");
				System.err.println("replyCode: " + replyCode);
				System.err.println("replyText: " + replyText);
				System.err.println("exchange: " + exchange);
				System.err.println("routingKey: " + routingKey);
				System.err.println("properties: " + properties);
				System.err.println("body: " + new String(body));
			}
		});
		

		channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
複製代碼

在basicPublish中的Mandatory要設置爲true纔會生效,不然broker會刪除該消息

消費端限流

假設MQ服務器上面囤積了成千上萬條的消息的時候,這個時候忽然鏈接消費端,那麼巨量的消息所有推過來,可是客戶端沒法一次性處理這麼多的數據。

在高併發的時候,瞬間產生的流量很大,消息很大,而MQ有個重要的做用就是限流,限流則是消費端作的。

RabbitMQ提供了一種Qos(服務質量保證)功能,即在非自動確認消息的前提下,在必定數量的消息未被消費前,不進行消費新的消息。

// prefetchSize消息的限制大小,通常設置爲0,在生產端限制
// prefetchCount 咱們一次最多消費多少條消息,通常設置爲1
// global,通常設置爲false,在消費端進行限制
channel.basicQos(int prefetchSize, int prefetchCount, boolean global) 

// 使用
channel.basicQos(0, 1, false);
channel.basicConsume(queueName, false, new MyConsumer(channel));    
複製代碼

Note: autoAck設置爲false, 必定要手工簽收消息

死信隊列(DLX)

當消息在隊列中變成死信,沒有消費者進行消費的時候,消息可能會被從新發布到另一個隊列中,這個隊列就是死信隊列。

如下狀況會致使消息進入死信隊列:

  • basic.reject/basic.nack 而且 requeue爲false(不重回隊列)的時候,消息就是死信

  • 消息TTL過時

  • 隊列達到最大的長度

死信隊列也是正常的Exchange,和通常的Exchange沒什麼區別,不過要作一點操做。

設置死信隊列包括:

  • 設置Exchange(dlx.exchange名稱隨意),設置Queue(dlx.queue),設置RoutingKey(#)
  • 建立正常的交換機,隊列,綁定,只不過加上一個參數 arguments.put("x-dead-letter-exchange","dlx.exchange")
// 這就是一個普通的交換機 和 隊列 以及路由
		String exchangeName = "test_dlx_exchange";
		String routingKey = "dlx.#";
		String queueName = "test_dlx_queue";
		
		channel.exchangeDeclare(exchangeName, "topic", true, false, null);
		
		Map<String, Object> agruments = new HashMap<String, Object>();
		agruments.put("x-dead-letter-exchange", "dlx.exchange");
		//這個agruments屬性,要設置到聲明隊列上
		channel.queueDeclare(queueName, true, false, false, agruments);
		channel.queueBind(queueName, exchangeName, routingKey);
		
		//要進行死信隊列的聲明:
		channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
		channel.queueDeclare("dlx.queue", true, false, false, null);
		channel.queueBind("dlx.queue", "dlx.exchange", "#");
複製代碼

最後

這裏主要講了一些使用RabbitMQ中常常涉及到的概念,懂了概念,在進行應用的時候纔不至於糊塗。而後列舉了MQ的Java客戶端重要的幾個API。

參考

  • 《RabbitMQ實戰》
  • RabbitMQ消息中間件繼續精講
相關文章
相關標籤/搜索