RabbitTMQ實戰 高效部署分佈式消息隊列筆記

1、概念部分

1.各類常見MQ比較

ActiveMQ性能和穩定性較差 適用於中小型企業
kaffka不支持事務,對消息的重複、丟失、錯誤沒有嚴格要求 性能最高 內存存儲
RocketMQ java開發的,商業版支持分佈式事務
RabbitMQ Erlang開發的,基於AMQP協議實現,穩定性可靠性很高,性能不錯
RabbitMQ提供可靠性投遞(confirm)、返回模式(return)java

2.AMQP協議

1)AMQP(Advanced Message Queuing Protocal,高級消息隊列協議)
2)Server:又稱Broker,接受客戶端的連接,實現AMQP實體服務
3)Channel:網絡信道,幾乎全部的操做都在Channel中進行,Channel是進行消息讀寫的通道。客戶端能夠創建多Channel,每一個Channel表明一個會話任務。 用程序會和代理服務器之間建立一條TCP鏈接,即AMQP信道。信道是創建在真實TCP鏈接內的虛擬鏈接,每條信道會被指派一個惟一ID。創建和銷燬TCP會話開銷很大,信道方式不會給系統TCP棧形成額外負擔。
4)Messaage:消息,由標籤Properties和承載數據body組成。
5)Virtual host:虛擬主機,用於進行邏輯隔離,是最上層消息路由。
一個Virtual host裏面能夠有若干個Exchange和Queue,同一個Virtual host裏面Exchange和Queue不能同名。
6)Exchange:交換機,接收消息,根據路由鍵發送消息到綁定的隊列。
7)Binding:綁定,Exchange和Queue之間的虛鏈接,能夠設置 routing key。
8)Routing key:路由規則。
9)Queue:消息隊列。數據庫

3.基本操做命令

rabbitmq-server 服務器和應用啓動
rabbitmqctl stop 服務器和應用關閉
rabbitmqctl stop_app 關閉應用
rabbitmqctl start_app 啓動應用
rabbitmqctl status 節點狀態
rabbitmqctl add_vhost vhostname 建立虛擬主機
rabbitmqctl delete_vhost vhostname 刪除虛擬主機
rabbitmqctl list_vhosts 查看虛擬主機服務器

4.Exchange交換機

1)屬性

Type:交換機類型direct、topic、fanout、headers
Durability:是否須要持久化,true爲持久化
Auto Delete:當綁定到Exchanges上的隊列刪除後,自動刪除該Exchanges
Internal:當前Exchanges是否用於RabbitMQ內部使用,默認False
Arguments:擴展參數網絡

Direct Exchange
須要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵徹底匹配。
Fanout Exchange
發送到交換機的消息都會被轉發到與該交換機綁定的全部隊列上。Fanout交換機轉發消息是最快的。
Topic Exchange
對象隊列上須要預先綁定Topic。Exchange將路由鍵和若干Topic進行模糊匹配。符合的topic所關聯的隊列都會收到消息。
符號「#」匹配一個或多個詞,符號「」匹配很少很多一個詞。所以「audit.#」可以匹配到「audit.irs.corporate」,可是「audit.」 只會匹配到「audit.irs」。
Default Exchange
當建立一個隊列時,他會自動綁定到默認交換機,並以隊列名作爲路由鍵。架構

5.Queue 消息隊列

Durability:是否持久化,Durable:是,Transient:否
Auto delete:最後一個監聽移除後,Queue是否自動刪除
exclusive:隊列是否私有併發

6.虛擬主機

虛擬主機本質上是一個mini版的RabbitMQ服務器,不一樣虛擬主機是隔離的app

2、操做部分

1.消費訂閱方式

1)經過AMQP的basic.consume命令訂閱 信道置爲自動接收模式 推模式
2)經過AMQP的basic.get命令 主動拉取一條數據 拉模式分佈式

2.消息確認方式

1)經過AMQP的basic.ack命令,顯式地向RabbitMQ發送確認消息
2)訂閱隊列時設置auto_ack爲true,消費者接到消息就視爲確認
3)經過AMQP的basic.reject命令,拒絕接收消息
當reject的參數requeue=true時,消息會發送給其餘消費者
當reject的參數requeue=false時,消息會被從隊列上刪除性能

若是消費者訂閱了隊列,消息會馬上發送給訂閱的消費者。
若是消息到達了無人訂閱的隊列,消息會在隊列中等待。
若是程序崩潰,未能確認,消息會被髮送給其餘消費者,在消息確認前不會向故障消費者發送消息。
隊列中有多個消費者時,消息會以輪詢的方式發送給消費者,每條消息只會發送給一個消費者。fetch

3.隊列建立

生產者和消費者均可以建立隊列 queue.declare
exclusive=true則隊列爲私有的
auto-delete=true 最後一個監聽移除後,Queue自動刪除

4.持久化

1)RabbitMQ重啓後,交換機和隊列都會清除。須要設置每一個交換機和隊列的Durable屬性爲true,則重啓後不會清除。
2)RabbitMQ重啓後,消息數據都會清除。須要設置投遞模式(delivery mode)爲2,持久化。
即RabbitMQ的消息持久化,須要將交換機/隊列/消息都設置爲持久化才能夠。
3)持久化機制
持久化將致使RabbitMQ的性能至少降低10倍,在集羣中持久化工做的不是很好。
RabbitMQ在收到消息後,會先將消息寫入到日誌文件,再響應客戶端已經收到消息。等收到消費者的確認消費消息後,會在日誌文件中將其標記爲等待垃圾收集。

5.配置文件

默認位於/etc/rabbitmq/rabbitmq.config
使用Mnesia存儲數據

6.雙向通訊

默認狀況下,RabbitMQ時發後即忘的單向流程,經過在AMQP消息頭中設置reply_to字段,能夠實現雙向通訊。
經過這種方式能夠實現RPC過程調用。

3、集羣

主備模式:在併發和數據量不大的狀況下,是一種簡單易用的HA架構。也稱爲Warrent模式。
鏡像模式:最經典的時Mirror鏡像模式,能夠保證數據100%不丟失。
多活模式:數據中心異地部署時的主流模式,通常用federation插件同步數據

1.RabbitMQ內建集羣

RabbitMQ內建集羣不能保證消息的萬無一失。即便已經設置了消息持久化、當一個節點崩潰後,節點上的隊列消息也會消失。
RabbitMQ默認不會將隊列的內容複製到整個集羣,隊列默認僅存在於隊列所屬的那個節點上。
RabbitMQ會記錄交換器/綁定/隊列/虛擬主機/集羣的元數據,元數據在全部節點上都會存在一份,但隊列消息數據只會有一份。
能夠建立內存節點(RAM Node)和磁盤節點(disk Node),可是集羣中至少要有一個磁盤節點。
若是磁盤節點所有崩潰了,集羣仍可工做,但不能建立交換器/隊列/綁定等元數據操做了。

2.鏡像隊列

鏡像節點在集羣中的其餘節點上擁有從隊列,一旦主節點不可用,最老的從節點將成爲新的主節點。
聲明隊列時經過x-ha-policy參數設定鏡像節點。
鏡像的主從節點切換後,會發送<消費者取消通知>,客戶端應該支持該通知,以便從新附加到新的主節點上。

3.保證信息可靠性投遞

1)使用AMQP事務
2)發送方確認模式

4.插件的安裝與使用

1)啓用插件
rabbitmq-plugins enable XXXX
rabbitmqctl stop
rabbitmq-server -detached
2)關閉插件
rabbitmq-plugins disable XXXX
rabbitmqctl stop
rabbitmq-server -detached

5.性能保證

1)qos服務質量保證

在auto_ack爲false的前提下 對consume或channel設置qos值進行限流
咱們能夠經過設置Prefetch count來限制Queue每次發送給每一個消費者的消息數,好比咱們設置prefetchCount=1, 則Queue每次給每一個消費者發送一條消息;消費者處理完這條消息後Queue會再給該消費者發送一條消息。

2)消費端重回隊列

TTL(Time To Live,生存時間) 能夠在消息或隊列上設置過時時間

3)死信隊列DLX(DeadLetterExchange)

死信的產生
a.消息被拒絕(base.reject/basic.nack) 且requeue=false
b.TTL過時
c.隊列達到最大長度
當消息在一個隊列中變成死信,它能被從新publish到另外一個Exchange,這個Exchange就是DLX
死信隊列也是一個正常的交換機、隊列、綁定,原隊列經過設置"x-dead-letter-exchange":"死信隊列名",將另外一個隊列指明爲本身的死信隊列。

4)Return Listener消息機制

Return Listener用於處理一些不可路由的消息!
當咱們發送消息時,當前的exchange不存在或者指定的路由key路由不到,這個時候若是咱們須要監聽這種不可達的消息,就要使用Return Listener.
當Mandatory爲true時,監聽器會接收到路由不可達的消息,並進行後續處理;當爲false時,broker會自動刪除該消息。能夠在不存在的時候自動建立。

3、業務場景

1.迅速消息發送

不進行落庫操做,不作可靠性保障,追求最大吞吐量。

2.確認消息發送

→業務數據與消息數據同時入庫(先業務後消息)
→發送消息給MQ
→MQ發送確認送達ack
→更新消息數據庫狀態
→定時補償重發超時的任務。

3.批量消息發送

能夠進行批量化發送,針對某一原子業務去處理,但不保證可靠性,須要補償機制。

4.延遲消息發送

加載延遲發送插件,設置delayTime屬性。

5.順序消息

→數據入庫 參照確認消息發送
→發送的順序消息必須發送到同一隊列,該隊列只能由一個消費者。
→須要統一提交,(能夠是一個大消息,也能夠拆成多個小消息),全部消息的會話ID保持一致。
→添加消息屬性:順序號,SIZE等。
→消費者獲取消息後,先落庫。
→消費者向本身發送延遲處理消息,預估消息所有到達後,再進行統一順序處理。
→定時補償任務

6.事務消息發送

保證性能的前提下,補償機制最合適。

7.保證消息冪等

經過統一消息ID,保證冪等性

相關文章
相關標籤/搜索