做爲中間件的槓把子選手,rabbimq
在系統架構中承擔着承上啓下的做用,常問到,大家爲什麼選用rabbimq?則答曰,爲了削峯填谷,爲了系統解耦合,爲了提升系統性能。但這事是絕對的嗎?用了這款軟件就能夠實現這個目的嗎?java
RabbitMQ 是一個由Erlang
語言開發的AMQP
的開源實現。rabbitMQ是一款基於AMQP協議的消息中間件,它可以在應用之間提供可靠的消息傳輸。在易用性,擴展性,高可用性上表現優秀。使用消息中間件利於應用之間的解耦,生產者(客戶端)無需知道消費者(服務端)的存在。並且兩端可使用不一樣的語言編寫,大大提供了靈活性。編程
AMQP,即Advanced Message Queuing Protocol,一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。後端
消息是貫穿這個這款中間件服務的脈絡,咱們不妨經過一條消息來推演窺探整個rabbimq
的設計思想,站在前人的肩膀上,看看這款軟件的先進設計。安全
即信息,生產者產生的數據,這些數據記錄着生產端產生的業務日誌,將會被投遞到後端進行處理。服務器
消息隊列(MQ)
全稱爲Message Queue
,是一種應用程序對應用程序的通訊方法。說人話就是,在應用之間放一個組件組件,以後應用雙方經過這個消息組件進行通訊。網絡
本次將經過消息的生命週期的回顧來窺探這款服務的細節多線程
rabbimq的每一次迭代都離不開三個核心概念,生產者producer
,消費者comsumer
,服務端broker
,隨着系統的迭代增長了集羣的概念,以及後續的容災機制。架構
消息的產生離不開生產者producer
,其發送發生很簡單,僞代碼以下app
# 消息發送方法 # messageBody 消息體 # exchangeName 交換器名稱 # routingKey 路由鍵 publishMsg(messageBody,exchangeName,routingKey){ ...... } # 消息發送 publishMsg("This is a warning log","exchange","log.warning");
至此,咱們一條消息發送成功了,但做爲一名開發人員,咱們理應知道這條消息去到哪了?到底作了什麼操做?後續將會遭遇到了什麼?異步
在生產者和消費者之間其實包括了不少內容,咱們須要把前面的圖進行更加深刻的展開,咱們一層一層撥開mq的心,你會發現,你會流淚
RabbitMQ Server
。namespace
。當多個不一樣的用戶使用同一個RabbitMQ server提供的服務時,能夠劃分出多個vhost
,每一個用戶在本身的vhost
建立exchange/queue
等,他們之間互不影響,互相獨立且隔離。publisher/consumer
和broker
之間的TCP鏈接。斷開鏈接的操做只會在client端進行,Broker
不會斷開鏈接,除非出現網絡故障或broker
服務出現問題。Connection
,在消息量大的時候創建TCP Connection
的開銷會比較大且效率也較低。Channel
是在connection
內部創建的邏輯鏈接,若是應用程序支持多線程,一般每一個thread
建立單獨的channel
進行通信,AMQP method包含了channel id幫助客戶端和message broker識別channel,因此channel之間是徹底隔離的。Channel做爲輕量級的Connection極大減小了操做系統創建TCP connection的開銷。exchange
和queue
之間的虛擬鏈接,binding
中能夠包含routing key。Binding信息被保存到exchange中的查詢表中,用於message的分發依據。看完了這些概念,我再給你們梳理一遍信息流:
當咱們的生產者端往Broker(RabbitMQ)
中推送消息,Broker
會根據其消息的標識送往不一樣的Virtual host
,而後Exchange
會根據消息的路由key
和交換器類型將消息分發到本身所屬的Queue
中去。
而後消費者端會經過Connection
中的Channel
獲取剛剛推送的消息,拉取消息進行消費。
這裏指的是交換機的類型
這裏有三種交換機類型,也就是有三種路由模式
消費者就是消息的處理端,會主動從消息隊列中拉取信息,釋放消息隊列中擠壓的資源
默認消息隊列裏的數據是按照順序被消費者拿走,例如:消費者1 去隊列中獲取奇數序列的任務,消費者1去隊列中獲取偶數序列的任務。
對於消息消費而言,消費者直接指定要消費的隊列便可,好比指定消費隊列A的數據。
須要注意的是,在消費者消費完成數據後,返回給rabbimq ACK消息,rabbimq會刪掉隊列中的該條信息。
萬物抱陽負陰,系統之間忽然加了箇中間件,提升系統複雜度的同時也增長了不少問題:
上面前三個問題其實就是對投遞模式的靈魂發問,也就是消息推送方知不知道數據已經推送,消息服務端在消息被拉取的時候有沒有偏移量記錄,消息消費端有沒有拉取確認機制。固然了校驗機制越複雜對於系統投遞性能損耗就越嚴重,可靠性越強,效率就會相應的打折扣
這個是最簡單的模式,也是效率最高的機制,相似於udp
,RabbitMQ默認發佈消息是不會返回任何結果給生產者的,因此存在發送過程當中丟失數據的風險。
AMQP事務保證RabbitMQ不只收到了消息,併成功將消息路由到了全部匹配的訂閱隊列,AMQP事務將使得生產者和RabbitMQ產生同步。
雖然事務使得生產者能夠肯定消息已經到達RabbitMQ中的對應隊列,可是卻會下降2~10倍的消息吞吐量。
開啓發送方確認模式後,消息會有一個惟一的ID,一旦消息被投遞給全部匹配的隊列後,會回調給發送方應用程序(包含消息的惟一ID),使得生產者知道消息已經安全到達隊列了。
若是消息和隊列是配置成了持久化,這個確認消息只會在隊列將消息寫入磁盤後纔會返回。若是RabbitMQ內部發生了錯誤致使這條消息丟失,那麼RabbitMQ會發送一條nack消息,固然我理解這個是不能保證的。
這種模式因爲不存在事務回滾,同時總體仍然是一個異步過程,因此更加輕量級,對服務器性能的影響很小。
那麼問題來了,rabbimq的消息是以什麼樣的形式存儲。默認條件下消息是存儲在內存中,不止是消息,Exchange路由等元數據信息實際都在內存中。
具體的元數據信息:
隊列元數據:隊列名稱和屬性
交換器元數據:交換器名稱、類型和屬性
綁定元數據:路由信息
內存的優勢是高性能,問題在於故障後沒法恢復。都已經2021年,RabbitMQ必然也支持持久化的存儲,也就是寫磁盤。
實現消息隊列持久化的建議同時知足如下三個條件
效果:
Exchange
,Bindings
和Queue
,同時經過重播持久化日誌來恢復消息消息是否爲持久化那還要看消息的持久化設置。也就是說,重啓服務以前那個queue
裏面尚未發出去的消息的話,重啓以後那隊列裏面是否是還存在原來的消息,這個就要取決於發生着在發送消息時對消息的設置了。
若是要在重啓後保持消息的持久化必須設置消息是持久化的標識。
設置消息的持久化:
channel.basicPublish("exchange.persistent", "persistent", MessageProperties.PERSISTENT_TEXT_PLAIN, "persistent_test_message".getBytes());
這裏的關鍵是:MessageProperties.PERSISTENT_TEXT_PLAIN
首先看一下basicPublish
的方法:
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException; void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException; void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
true
時,若是exchange
根據自身類型和消息routeKey
沒法找到一個符合條件的queue
,那麼會調用basic.return
方法將消息返回給生產者(Basic.Return + Content-Header + Content-Body)
;false
時,出現上述情形broker
會直接將消息扔掉。queue
上有消費者,則立刻將消息投遞給它,若是全部queue
都沒有消費者,直接把消息返還給生產者,不用將消息入隊列等待消費者了。
true
時,若是exchange
在將消息路由到queue(s)
時發現對於的queue
上麼有消費者,那麼這條消息不會放入隊列中。當與消息routeKey
關聯的全部queue(一個或者多個)
都沒有消費者時,該消息會經過basic.return
方法返還給生產者。這裏關鍵的是BasicProperties props
這個參數了,這裏看下BasicProperties
的定義:
public BasicProperties( String contentType,//消息類型如:text/plain String contentEncoding,//編碼 Map<String,Object> headers, Integer deliveryMode,//1:nonpersistent 2:persistent Integer priority,//優先級 String correlationId, String replyTo,//反饋隊列 String expiration,//expiration到期時間 String messageId, Date timestamp, String type, String userId, String appId, String clusterId)
這裏的deliveryMode=1
表明不持久化,deliveryMode=2
表明持久化。
上面的實現代碼使用的是MessageProperties.PERSISTENT_TEXT_PLAIN
,那麼這個又是什麼呢?
public static final BasicProperties PERSISTENT_TEXT_PLAIN = new BasicProperties("text/plain", null, null, 2, 0, null, null, null, null, null, null, null null, null);
能夠看到這其實就是講deliveryMode
設置爲2的BasicProperties
的對象,爲了方便編程而出現的一個東東。
換一種實現方式:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.deliveryMode(2); AMQP.BasicProperties properties = builder.build(); channel.basicPublish("exchange.persistent", "persistent",properties, "persistent_test_message".getBytes());
設置了隊列和消息的持久化以後,當broker
服務重啓的以後,消息依舊存在。單隻設置隊列持久化,重啓以後消息會丟失;單隻設置消息的持久化,重啓以後隊列消失,既而消息也丟失。單單設置消息持久化而不設置隊列的持久化顯得毫無心義。
不設置Exchange
的持久化對消息的可靠性來講沒有什麼影響,可是一樣若是Exchange
不設置持久化,那麼當broker
服務重啓以後,Exchange
將不復存在,那麼既而發送方rabbitmq producer就沒法正常發送消息。這裏筆者建議,一樣設置Exchange
的持久化。Exchange
的持久化設置也特別簡單,方法以下:
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException; Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException; Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException; Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException; void exchangeDeclareNoWait(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException; Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;
通常只須要:channel.exchangeDeclare(exchangeName, 「direct/topic/header/fanout」, true);
即在聲明的時候講durable
字段設置爲true
便可。
若是將Queue
的持久化標識durable
設置爲true
,則表明是一個持久的隊列,那麼在服務重啓以後,也會存在,由於服務會把持久化的Queue
存放在硬盤上,當服務重啓的時候,會從新什麼以前被持久化的Queue
。
Queue
的持久化是經過durable=true
來實現的。
通常程序中這麼使用:
Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("queue.persistent.name", true, false, false, null);
關鍵的是第二個參數設置爲true
,即durable=true
.
Channel
類中queueDeclare
的完整定義以下:
/** * Declare a queue * @see com.rabbitmq.client.AMQP.Queue.Declare * @see com.rabbitmq.client.AMQP.Queue.DeclareOk * @param queue the name of the queue * @param durable true if we are declaring a durable queue (the queue will survive a server restart) * @param exclusive true if we are declaring an exclusive queue (restricted to this connection) * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) * @param arguments other properties (construction arguments) for the queue * @return a declaration-confirm method to indicate the queue was successfully declared * @throws java.io.IOException if an error is encountered */ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
參數說明:
Queue.DeclareOk queueDeclare() throws IOException; Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException; void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException; Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;
其中須要說明的是queueDeclarePassive(String queue)
能夠用來檢測一個queue
是否已經存在。若是該隊列存在,則會返回true
;若是不存在,就會返回異常,可是不會建立新的隊列。
將queue
,exchange
,message
等都設置了持久化以後就能保證100%保證數據不丟失了嗎?
答案是否認的。
首先,從consumer
端來講,若是這時autoAck=true
,那麼當consumer
接收到相關消息以後,還沒來得及處理就crash
掉了,那麼這樣也算數據丟失,這種狀況也好處理,只需將autoAck
設置爲false
(方法定義以下),而後在正確處理完消息以後進行手動ack(channel.basicAck)
.
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
其次,關鍵的問題是消息在正確存入RabbitMQ以後,還須要有一段時間(這個時間很短,但不可忽視)才能存入磁盤之中,RabbitMQ並非爲每條消息都作fsync的處理,可能僅僅保存到cache中而不是物理磁盤上,在這段時間內RabbitMQ broker發生crash, 消息保存到cache可是還沒來得及落盤,那麼這些消息將會丟失。
那麼這個怎麼解決呢?
首先能夠引入RabbitMQ的mirrored-queue即鏡像隊列,這個至關於配置了副本,當master在此特殊時間內crash掉,能夠自動切換到slave,這樣有效的保障了HA, 除非整個集羣都掛掉,這樣也不能徹底的100%保障RabbitMQ不丟消息,但比沒有mirrored-queue的要好不少,不少現實生產環境下都是配置了mirrored-queue的,關於鏡像隊列的,咱們後續展開討論。還有要在producer引入事務機制或者Confirm機制來確保消息已經正確的發送至broker端,有關RabbitMQ的事務機制或者Confirm機制讀者們有興趣的話,請留言,咱們再詳細展開討論。
幸好本章節的主題是討論RabbitMQ的持久化而不是可靠性,否則就一發不可收拾了。RabbitMQ的可靠性涉及producer端的確認機制、broker端的鏡像隊列的配置以及consumer端的確認機制,要想確保消息的可靠性越高,那麼性能也會隨之而降,魚和熊掌不可兼得,關鍵在於選擇和取捨。
當RabbitMQ出現內存(默認是0.4)或者磁盤資源達到閾值時,會觸發流控機制,阻塞Producer的Connection,讓生產者不能繼續發送消息,直到內存或者磁盤資源獲得釋放。
RabbitMQ基於Erlang/OTP開發,一個消息的生命週期中,會涉及多個進程間的轉發,這些Erlang進程之間不共享內存,每一個進程都有本身獨立的內存空間,若是沒有合適的流控機制,可能會致使某個進程佔用內存過大,致使OOM。所以,要保證各個進程佔用的內容在一個合理的範圍,RabbitMQ的流控採用了一種信用證機制(Credit),爲每一個進程維護了四類鍵值對
A進程當前能夠發送給B的消息有100條,每發一次,值減1,直到爲0,A纔會被Block住。B消費消息後,會給A增長新的Credit,這樣A才能夠持續的發送消息。這裏只畫了兩個進程,多進程串聯的狀況下,這中影響也就是從底向上傳遞的
rabbimq的集羣設計起來,多是歷史緣由致使,我的感受是不夠先進,畢竟當年開發的時候,也沒有相關的業務需求推進啊
RabbitMQ 會將元數據存儲到內存上,若是是磁盤節點,還會存儲到磁盤上。
隊列A的實例實際只在一個RabbitMQ節點上,其它節點實際存儲的是指向該隊列的指針。雖然RabbitMQ的隊列實際只會在一個節點上,但元數據能夠存在各個節點上。舉個例子來講,當建立一個新的交換器時,RabbitMQ會把該信息同步到全部節點上,這個時候客戶端無論鏈接的那個RabbitMQ節點,均可以訪問到這個新的交換器,也就能找到交換器下的隊列
在《RabbitMQ實戰指南》中朱忠華老師的觀點是
Partition
到各個節點上,這樣才能真正達到線性擴容的目的。這個實際上是後續對站着學習kafka
的時候觸發的感受鏡像隊列,本質上就是副本機制
RabbitMQ本身也考慮到了咱們以前分析的單節點長時間故障沒法恢復的問題,因此RabbitMQ 2.6.0以後它也支持了鏡像隊列,除了發送消息,全部的操做實際都在主拷貝上,從拷貝實際只是個冷備(默認的狀況下全部RabbitMQ節點上都會有鏡像隊列的拷貝),若是使用消息確認模式,RabbitMQ會在主拷貝和從拷貝都安全的接受到消息時才通知生產者。
從這個結構上來看,若是從拷貝的節點掛了,實際沒有任何影響,若是主拷貝掛了,那麼會有一個重新選主的過程,這也是鏡像隊列的優勢,除非全部節點都掛了,纔會致使消息丟失。從新選主後,RabbitMQ會給消費者一個消費者取消通知(Consumer Cancellation),讓消費者重連新的主拷貝。
Confirm
消息等AMQQueue
調用的接口,完成消息的存儲和持久化工做,由Q1,Q2,Delta,Q3,Q4五個子隊列構成,在Backing中,消息的生命週期有四個狀態:
這裏以持久化消息爲例(能夠看到非持久化消息的生命週期會簡單不少),從Q1到Q4,消息實際經歷了一個RAM->DISK->RAM
這樣的過程,BackingQueue
這麼設計的目的有點相似於Linux的Swap
,當隊列負載很高時,經過將部分消息放到磁盤上來節省內存空間,當負載下降時,消息又從磁盤迴到內存中,讓整個隊列有很好的彈性。
全部對鏡像隊列主拷貝的操做,都會經過Guarented Multicasting(GM)同步到各個Salve節點,Coodinator負責組播結果的確認。GM是一種可靠的組播通訊協議,保證組組內的存活節點都收到消息。
至於說master和slave之間的關係應該是以下圖所示
GM的組播並非由Master
節點來負責通知全部Slave
的(目的是爲了不Master
壓力過大,同時避免Master
失效致使消息沒法最終Ack
),RabbitMQ把一個鏡像隊列的全部節點組成一個鏈表,由主拷貝發起,由主拷貝最終確認通知到了全部的Slave
,而中間由Slave
接力的方式進行消息傳播。從這個結構來看,消息完成整個鏡像隊列的同步耗時理論上是不低的,可是因爲RabbitMQ消息的消息確認自己是異步的模式,因此總體的吞吐量並不會受到太大影響。
鏡像隊列(副本)的引入其實就是對Rabbimq的高可用性的補充,從實際結果看,RabbitMQ完成設計目標上並不十分出色,主要緣由在於默認的模式下,RabbitMQ的隊列實例只存在在一個節點上(雖而後續也支持了鏡像隊列),既不能保證該節點崩潰的狀況下隊列還能夠繼續運行,也不能線性擴展該隊列的吞吐量。
至此咱們能夠看出rabbimq的一個發展脈絡,在古早時代,其推送消息相似udp送過去就送過去了,以後無論了,以後需求倒逼架構改進,要求有可靠性投遞引入了確認(ack)機制。隨着技術的進步,大規模節點的推廣,引入集羣的,出現副本,然而集羣的並不完美
曾經有位大佬跟我分享過學生時代有個常見的心態:錘子心態即手中有個錘子,看到啥都是釘子,都想錘一下。咱們學習了rabbimq,確實是想用起來但不是啥時候都能用,這個要具體問題具體分析