RabbitMQ如何保證消息的可達性

1、RabbitMQ簡介

AMQP,即Advanced Message Queuing Protocol,一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。前端

RabbitMQ,是一個消息代理和隊列服務器,它實現了AMQP標準協議。spring

分佈式消息隊列有不少應用場景,好比異步處理、應用解耦、流量削峯等。數據庫

一、異步處理

用戶註冊後須要發送短信和郵件,傳統作法是先將用戶信息寫入數據庫,而後發送短信、發送郵件,都完成後返回。後端

若是用到消息隊列,能夠先將用戶信息寫入數據庫,而後將註冊信息寫入消息隊列,發送短信、發送郵件或者還有其餘的業務邏輯都訂閱此消息,完成發送。緩存

二、應用解耦

仍是上面的例子,若是在一個大型分佈式網站中,用戶系統、短信系統、郵件系統可能都是獨立的系統服務。安全

這時候,在用戶註冊成功後,你能夠經過RPC遠程調用不一樣的服務接口,但更好的作法仍是經過消息隊列,訂閱本身感興趣的數據,往後就算增長或者刪減功能,主業務都不用變更。bash

三、流量削峯

通常在秒殺或者團購活動中,流量激增,應用面臨壓力過大。能夠在應用前端加入消息隊列,經過設置隊列最大長度來限制活動人數。這時候,後端服務器就能夠遊刃有餘的處理數據了。服務器

2、消息通訊

在AMQP協議中,有幾個基本概念,咱們必須先搞明白。網絡

一、Virtual host

虛擬主機,每個虛擬主機中包含全部的AMQP基本組件,用戶、隊裏、交換器等都是在虛擬主機裏面建立。典型的用法是,若是公司的多個產品只想用一個服務器,就能夠把他們劃分到不一樣的虛擬主機中,裏面的任何信息都是獨立存在,互不干擾。app

二、Connection

鏈接,應用程序和服務器之間的TCP鏈接。

三、Channel

通道,當你的應用程序和服務器鏈接以後,就會建立TCP鏈接。一旦打開了TCP鏈接,就能夠建立一個Channel通道,因此說Channel通道是一個TCP鏈接的內部邏輯單元。 這是由於,建立和銷燬TCP鏈接是比較昂貴的開銷,每一次訪問都創建新的TCP鏈接的話,不只是巨大浪費,並且還容易形成系統性能瓶頸。

四、Queue

隊列,全部的消息最終都會被送到這裏,等待着被感興趣的人取走。

五、Exchange

交換器,消息到達服務的第一站就是交換器,而後根據分發規則,匹配路由鍵,將消息放到對應隊列中。值得注意的是,交換器的類型不止一種。

  • Direct 直連交換器,只有在消息中的路由鍵和綁定關係中的鍵一致時,交換器才把消息發到相應隊列

  • Fanout 廣播交換器,只要消息被髮送到廣播交換器,它會將消息發到全部的隊列

  • Topic 主題交換器,根據路由鍵,通配規則(*和#),將消息發到相應隊列

六、Binding

綁定,交換器和隊列之間的綁定關係,綁定中就包含路由鍵,綁定信息被保存到交換器的查詢表中,交換器根據它分發消息。

瞭解到這些組件相關概念後,咱們總結一下來看看,一條消息在RabbitMQ中是如何流轉的。

3、持久化和發送方確認

一、持久化

事實上,上圖所示只是一個最基本的消息流轉過程,交換器和隊列這些組件還有一個比較重要的屬性:持久化。

默認狀況下,重啓RabbitMQ服務器以後,咱們建立的交換器和隊列都會消失不見,固然了,若是裏面還有將來得及消費的數據,也將難於倖免。 持久化交換器和隊列,爲的是在AMQP服務器重啓以後,從新建立它們並綁定關係,在RabbitMQ中,設置durable屬性爲true便可。

不過,除了這些還不夠。雖然保證了交換器和隊列是安全的,但那些還將來得及消費的數據就變得朝不保夕。因此,咱們還要設置消息的投遞模式爲持久的。

這樣,若是RabbitMQ服務器重啓的話,咱們的策略和相關數據纔會確保無憂。因此,咱們說能從AMQP服務器崩潰中恢復的消息,稱之爲持久化消息。那麼,它必須保證如下三點:

  • 設置投遞模式爲持久的
  • 交換器爲持久的
  • 隊列爲持久的

二、發送方確認

到目前爲止,咱們已經保證了消息的安全性。可是,還有另一個問題。因爲發佈操做是不返回任何信息給生產者的,咱們怎麼知道服務器是否正確接收消息並持久化到硬盤上了呢?

爲此,咱們能夠將通道設置爲事務模式。事務是AMQP標準中的一部分,但RabbitMQ有更好的作法,那就是發送方確認模式,publisher confirm。若是設置了confirm模式,發佈的消息會被分配一個惟一的ID號,等消息被投遞給匹配的隊列後,通道會發送一個發送方確認模式給生產者(包含消息的惟一ID)。

4、與Spring整合實例

廢話了這麼多,只是爲了下面的代碼部分作下鋪墊。畢竟,瞭解到上面內容以後,代碼其實已經快要躍然紙上了。

一、配置文件

配置文件中咱們首先要聲明RabbitMQ服務器的信息,IP地址、端口號、用戶名密碼等,但尤其重要的是,設置發佈確認模式。

<bean id="rabbitConnectionFactory"
	class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
	<constructor-arg value="127.0.0.1"/>
	<property name="username" value="shiqizhen"/>
	<property name="password" value="shiqizhen"/>
	<property name="port" value="5672"></property>
	<property name="virtualHost" value="shiqizhen"></property>
	<property name="publisherConfirms" value="true"></property>
	<property name="publisherReturns" value="true"></property>
</bean>
複製代碼

接着,還要聲明交換器和隊列,記得它們是持久化的哦,durable爲true。

<rabbit:admin connection-factory="rabbitConnectionFactory"/>
    
//隊列的名字、持久化、不要自動刪除、不是獨享隊列
<rabbit:queue name="userInfoQueue" durable="true" auto-delete="false" exclusive="false"/>
//交換器,類型爲direct。並綁定交換器和隊列的關係,路由鍵爲10086
<rabbit:direct-exchange name="user-exchange" durable="true" auto-delete="false">
	<rabbit:bindings>
		<rabbit:binding queue="userInfoQueue" key="10086"/>
	</rabbit:bindings>
</rabbit:direct-exchange>
複製代碼

最後,配置消費者和消息模板

//配置消費者 ref爲bean的引用 queues指明瞭消費者與隊列的關係
//重要的是acknowledge 確認模式爲手動確認
<rabbit:listener-container connection-factory="rabbitConnectionFactory" acknowledge="manual">
        <rabbit:listener ref="consumerListener" queues="userInfoQueue" method="onMessage" />
</rabbit:listener-container>

//配置Spring RabbitMQ消息模板 
<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
	<constructor-arg ref="rabbitConnectionFactory"></constructor-arg>
	<property name="confirmCallback" ref="publisherConfirm"></property>
	<property name="returnCallback" ref="returnMsgCallBack"></property>
	<property name="mandatory" value="true"></property>
</bean>
複製代碼

二、生產者

上面咱們聲明瞭rabbitTemplate,直接用它的send方法發送消息便可。不過它有幾個參數必須先要了解下。

  • exchange 交換器名稱,消息發到哪一個交換器上
  • routingKey 路由鍵,交換器怎樣分發消息到對應隊列
  • Message 消息體對象,它包含消息的主體和消息屬性。消息屬性包含不少附屬信息,好比消息內容類型、消息ID、用戶ID等。
  • CorrelationData 消息相關數據,實際它只有一個ID的屬性。不過很重要,在發佈方確認的回調方法裏,會帶有這個參數。咱們能夠根據它很直觀的看到哪條消息發送成功或失敗。
@Controller
public class IndexController {
	
	@Autowired
	RabbitTemplate rabbitTemplate;

	@RequestMapping("/send_msg")
	@ResponseBody
	public User send_msg() {	
		String exchange = "user-exchange";
		String routingKey = "10086";
		
		User user = new User();
		String id = IdUtil.getId();
		user.setUid(id);
		user.setUsername("小小沙彌");
		user.setPassword("1234");
		user.setCreatetime(DateUtil.getDateTime(new Date()));
		
		CorrelationData correlation = new CorrelationData(id);
		Message message = new Message(JSONObject.toJSONBytes(user, SerializerFeature.WriteNullStringAsEmpty), new MessageProperties());
		logger.info("已發送消息到RabbitMQ服務器:{}",JSONObject.toJSONString(user));
		rabbitTemplate.send(exchange, routingKey,message,correlation);	
		return user;
	}
}
複製代碼

三、消費者

消費者就是上面咱們配置的listener ref引用的Bean。還記得咱們把確認模式設置了手動確認,因此在消費者端有個很重要的動做,就是確認消息。

  • channel.basicAck(deliveryTag, false) 第一個參數是RabbitMQ內部產生的消息ID,第二個參數表明是否批量確認消息。經過這個指令咱們告訴生產者端,消息已經被正確消費了,RabbitMQ就會將此消息在磁盤上刪除。
  • channel.basicReject(deliveryTag, false) 拒絕消息。若是消費到的消息不是咱們想要的,或者處理的時候報錯,咱們能夠將消息拒絕。但值得注意的是第二個參數。若是設置爲false,說明拒絕消息並將消息從服務器上刪除;若是設置爲true,說明拒絕消息並將消息從新放回隊列。若是你的消費者只有一個,最好不要把它設置爲true,不然消息會一直重試,直到把消費者端服務器搞死。若是由於處理失敗而拒絕的話,最好將消息刪除,同時將消息記錄到日誌文件或者數據庫中。
@Service
public class ConsumerListener implements ChannelAwareMessageListener{

	Logger logger = LoggerFactory.getLogger(this.getClass());
	
	public void onMessage(Message message, Channel channel) throws Exception {
		
		logger.info("消費者監聽到RabbitMQ消息...");
		MessageProperties properties = message.getMessageProperties();
		String msg = new String(message.getBody(),"utf-8");
		logger.info("交換器:{},路由鍵:{}",properties.getReceivedExchange(),properties.getReceivedRoutingKey());
		logger.info("消息內容:{}",msg);	
		long deliveryTag = properties.getDeliveryTag();
		channel.basicAck(deliveryTag, false);//確認信息,false爲不批量確認
		//channel.basicReject(deliveryTag, true);//true爲重入隊列 false爲刪除消息
	}
}
複製代碼

四、發送方確認

咱們發送消息給RabbitMQ,第一站就是交換器。RabbitMQ是否能正確接收消息,咱們就靠它來反饋。這裏的CorrelationData就是在生產者端設置的,咱們能夠將它當成消息ID,也能夠直接把消息寫入這裏。

@Component
public class PublisherConfirm implements ConfirmCallback{

	Logger logger = LoggerFactory.getLogger(this.getClass());
	
	public void confirm(CorrelationData correlationData, boolean ack, String cause) {
		if (ack) {
			logger.info("消息投遞成功!");
		}else {
			logger.warn("消息投遞失敗,緣由:{},消息ID:{}",cause,correlationData.getId());
		}
	}
}
複製代碼

若是咱們把交換器的名字寫錯,那麼在這裏,你就會獲得如下信息:

22:57:51,635  WARN PublisherConfirm:19 - 消息投遞失敗,緣由:
channel error; protocol method: #method<channel.close>
(reply-code=404, reply-text=NOT_FOUND - no exchange 'user-exchange_xxx' in vhost 'shiqizhen', class-id=60, method-id=40),

消息ID:516387069669408768
  22:57:51,638 ERROR CachingConnectionFactory:1278 - Channel shutdown: 
channel error; protocol method:
 #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'user-exchange_xxx' in vhost 'shiqizhen', class-id=60, method-id=40)
複製代碼

五、返回回調

除了設置RabbitMQ的發送方確認,在Spring中還有一個publisherReturns值的咱們注意。雖然咱們將消息發送到了交換器,但交換器是否能正確將消息分發到對應隊列,還要打個問號。若是消息沒法發送到指定的隊列,那麼publisherReturns就會發揮做用。記住,若是想應用這個特性,須要將mandatory設置爲true。

@Component
public class ReturnMsgCallBack implements ReturnCallback{

	Logger logger = LoggerFactory.getLogger(this.getClass());
	
	public void returnedMessage(Message message, int replyCode, 
					String replyText, String exchange, String routingKey) {
		logger.info("消息內容:{}",new String(message.getBody()));
		logger.info("回覆文本:{},回覆代碼:{}",replyText,replyCode);
		logger.info("交換器名稱:{},路由鍵:{}",exchange,routingKey);	
	}
}
複製代碼

若是咱們不當心寫錯了路由鍵的名字,那就會調用到這裏。

23:24:27,813  INFO ReturnMsgCallBack:16 - 消息內容:{"createtime":"2018-11-25 23:24:24","password":"1234","role":null,"uid":"516393749815754752","username":"小小沙彌"}
23:24:27,814  INFO ReturnMsgCallBack:17 - 回覆文本:NO_ROUTE,回覆代碼:312
23:24:27,814  INFO ReturnMsgCallBack:18 - 交換器名稱:user-exchange,路由鍵:10086_xxx

//這裏是發送方確認打印的信息 說投遞到交換器成功
23:24:27,814  INFO PublisherConfirm:17 - 消息投遞成功!
複製代碼

有個問題,如同第一個例子,若是寫錯了路由鍵的名稱,發送方確認會打印ack爲false的異常信息,但爲何不會調用到publisherReturns呢?

若是路由鍵錯誤,說明消息壓根就沒有被接收到。這確定是一個嚴重錯誤,因此RabbitMQ直接把當前通道關閉了。

Channel shutdown: 
channel error; protocol method:
reply-code=404, reply-text=NOT_FOUND - no exchange 'user-exchange_xxx' in vhost ...
複製代碼

5、監聽RabbitMQ服務器狀態

若是你的RabbitMQ服務不是一個集羣,那麼當網絡故障或其餘緣由致使RabbitMQ服務停掉的時候,咱們怎麼作呢?固然,你能夠在Send方法中加入try/catch,根據catch信息返回你的狀態。但有個更好的思路,能夠結合使用。 在建立RabbitMQ服務鏈接的時候,咱們要配置一個Bean,CachingConnectionFactory它有個方法addConnectionListener,咱們能夠利用它來監聽服務器的鏈接狀態。

public class RabbitMQConnectionListener implements ConnectionListener{
	public void onCreate(Connection connection) {
		System.out.println("服務器已啓動...");
	}
	public void onClose(Connection connection) {
		System.out.println("服務器已關閉...");
	}
}
複製代碼

並在合適的位置,好比Spring容器初始化方法裏,加入這麼一句rabbitConnectionFactory.addConnectionListener(new RabbitMQConnectionListener());

這樣,咱們就能夠掌握RabbitMQ服務器的鏈接狀態了,那麼咱們就能夠根據此狀態,在生產者方調用send方法的時候,判斷此狀態。若是未鏈接,能夠先將消息保存到數據庫或者緩存中。當鏈接到RabbitMQ,咱們先把緩存的消息拿出來發送,再將此狀態重置爲已鏈接。

6、總結

本文簡單介紹了AMQP協議標準中的相關概念,以及RabbitMQ在Spring中如何正確配置使用持久化消息、發送方模式和返回回調等機制。並在最後,介紹了在Spring中如何監聽RabbitMQ的服務器鏈接狀態。總而言之一句話,咱們將要怎樣使用RabbitMQ,才能保證消息不會丟失。但願本文對你使用RabbitMQ有所幫助!

相關文章
相關標籤/搜索