本文源於朱忠華的《RabbitMQ實戰指南》java
消息隊列中間件有兩種傳遞模式:點對點 和 發佈/訂閱
點對點依靠隊列的原理;發佈/訂閱則能夠用於一對多的廣播node
消息中間件的做用:解耦、冗餘(存儲)、擴展性、削峯、可恢復性、順序保證、緩衝、異步通訊python
RabbitMQ的具體特色:
可靠性,持久化、傳輸確認、發佈確認
靈活的路由,多個交換器能夠綁定
擴展性,能夠組成集羣
高可用性,隊列鏡像
多種協議,AMPQ,STOMP,MQTT
多語言客戶端,支持全部經常使用語言
管理界面
插件機制linux
編譯安裝Erlangc++
下載頁面:http://www.erlang.org/downloads
下載地址:http://erlang.org/download/otp_src_20.3.tar.gz [84M]
安裝Erlang到/opt/erlang
yum install gcc perl ncurses-devel
wget http://erlang.org/download/otp_src_20.3.tar.gz
tar zxf otp_src_20.3.tar.gz
cd otp_src_20.3
./configure --prefix=/opt/erlang
##APPLICATIONS DISABLED:
crypto : No usable OpenSSL found
jinterface : No Java compiler found
odbc : ODBC library - link check failed
orber : No C++ compiler found
ssh : No usable OpenSSL found
ssl : No usable OpenSSL found
##APPLICATIONS INFORMATION:
wx : wxWidgets not found, wx will NOT be usable
##DOCUMENTATION INFORMATION:
documentation :
xsltproc is missing.
fop is missing.
The documentation can not be built
能夠看到,還缺乏一些包,需要補齊
yum install openssl-devel gcc-c++ unixODBC-devel
./configure --prefix=/opt/erlang --without-javac
make
make install添加環境變量
vi /etc/profile 在末尾添加
ERLANG_HOME=/opt/erlang
export PATH=$PATH:$ERLANG_HOME/bin
export ERLANG_HOME
讓配置生效
source /etc/profile驗證安裝
erlgit
安裝RabbitMQgithub
直接將下載的安裝包解壓到相應的目錄下便可
下載頁面:http://www.rabbitmq.com/releases/rabbitmq-server/
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.15/rabbitmq-server-generic-unix-3.6.15.tar.xz [4.6M]
tar xJf rabbitmq-server-generic-unix-3.6.15.tar.xz -C /opt
cd /opt/
mv rabbitmq_server-3.6.15/ rabbitmq添加環境變量
添加環境變量
vi /etc/profile 在末尾添加
export PATH=$PATH:/opt/rabbitmq/sbin
export RABBITMQ_HOME=/opt/rabbitmq
讓配置生效
source /etc/profileweb
運行RabbitMQ正則表達式
rabbitmq-server -detached #detached以守護進程方式在後臺運行
rabbitmqctl status #查看服務狀態
rabbitmqctl cluster_status #查看集羣信息算法
用戶和權限
默認用戶用戶名及密碼均爲guest 該賬戶只能經過本地網絡訪問,遠程網絡訪問受限
#添加新用戶root:root123
rabbitmqctl add_user root root123
#爲root用戶配置全部權限
rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
#提高root爲管理員
rabbitmqctl set_user_tags root administrator
消息發送:客戶端與MQ服務器創建一個鏈接connection->在鏈接上建立一個信道channel->建立一個交換器exchange和一個隊列queue,並經過路由鍵進行綁定->發送消息->關閉資源
生產者/消費者模型,相似於交換機
發佈到MQ的消息包含兩部分:消息體(payload)和標籤(label),消息體是實際負載,而標籤是MQ處理該消息的依據,如一個交換器的名稱和一個路由鍵
broker:消息中間件的服務節點,能夠當作是一個MQ服務器
多個消費者能夠訂閱同一個隊列,這時消息會平均分攤,輪詢給多個消費者,這樣一個消費者不會獲得所有消息。RabbitMQ不支持隊列層面的廣播消費
Exchange交換器,共有四種類型,不一樣的類型對應不一樣的路由策略
RoutingKey路由鍵,生產者將消息發給交換器時,會指定RoutingKey,用來指定該消息的路由規則,這個RoutingKeyf須要與交換器類型和綁定鍵BindingKey聯合使用才能最終生效
在交換器類型和綁定鍵固定的狀況下,生產者在發送消息時,經過指定RoutingKey來決定消息流向哪裏
Binding綁定,經過綁定將交換器與隊列關聯起來,在綁定時通常會指定綁定鍵BindingKey,這樣,MQ就能夠正確將消息路由到隊列
生產者將消息發送給交換器時,須要一個RoutingKey,當BindingKey和RoutingKey相匹配時,消息會被路由到對應的隊列中。綁定多個隊列到同一個交換器時,容許使用相同的BindingKey。BindingKey並非在全部狀況都生效,它依賴於交換器類型,fanout類型的交換器會無視BindingKey,fanout會將消息發送到全部與本身綁定的隊列中。
交換器至關於投遞包裹的郵箱,RoutingKey至關於填寫在包裹上的地址,BindingKey至關於包裹的目的地,當RoutingKey與BindingKey相匹配時,消息就會正確送達
大多數狀況下,習慣性地將BindingKey寫成RoutingKey,二者合稱爲路由鍵,特別是在使用direct類型的交換機時
RabbitMQ經常使用的交換器類型有fanout、direct、topic、header這四種
fanout 廣播
發送到該交換器的消息將路由到全部與該交換器綁定的隊列中
direct 直投
將消息路由到BindingKey和RoutingKey徹底匹配的隊列中
headers 比對headers屬性
不依賴路由鍵的匹配來路由消息,而是根據發送的消息內容中的headers屬性(鍵值對)進行匹配,在綁定隊列和交換器時也指定一組鍵值對,二者匹配時,即路由
headers類型性能會不好,也不實用,不多用到
topic 模式匹配
將消息路由到BindingKey和RoutingKey相匹配的隊列中,可使用模式匹配規則
匹配規則:
一、RoutingKey和BindingKey均爲一個點號.分隔的字符,分隔開的叫單詞
二、BindingKey能夠有*#用於模糊匹配,*匹配一個單詞,#匹配0個或多個單詞
三、沒有匹配上的將丟棄,或返回發送者(需設置mandatory參數)
生產者發送消息流程:
一、生產者鏈接到RabbitMQ Broker,創建一個鏈接connection,開啓一個信道channel
二、生產者聲明一個交換器,並設置相關屬性,如交換機類型、是否持久化等
三、生產者聲明一個隊列並設置相關屬性,如是否排他、是否持久化、是否自動刪除等
四、生產者經過路由鍵將交換器和隊列綁定起來
五、生產者發送消息至RabbitMQ Broker,其中包含路由鍵、交換器等信息
六、相應的交換器根據接收到的路由鍵查找相匹配的隊列
七、若是找到,將消息存入相應的隊列中
八、若是沒有找到,根據生產者配置的屬性選擇丟棄或者退回給生產者
九、關閉信道
十、關閉鏈接
消費者接收消息的過程
一、消費者鏈接到RabbitMQ Broker,創建一個鏈接connection,開啓一個信道channel
二、消費者向RabbitMQ Broker請求消費相應隊列中的消息,可能會設置相應的回調函數,以及作一些準備工做
三、等待RabbitMQ Broker迴應並投遞相應隊列中的消息,消費者接收消息
四、消費者確認ack接收到的消息
五、RabbitMQ從隊列中刪除相應已經被確認的消息
六、關閉信道
七、關閉鏈接
客戶端與MQ Broker創建的connection是一條tcp鏈接,amqp信道channel是connection上的虛擬鏈接。引入channel主要是爲了複用tcp鏈接。當每一個channel流量不是很大時,複用connection能夠有效節省tcp鏈接資源;當每一個channel流量很大時,複用connection會產生性能瓶頸,此時須要開闢多個connection
消費者接收到消息並正確消費後,會向Broker發送確認,即Basic.Ack命令
amqp://userName:password@ipAddress:portNumber/virtualHost
channel.exchangeDeclare(exchangeName,"direct",true);
#聲明一個名爲exchangeName,類型爲direct,持久化非自動刪除的交換器
channel.queueDeclare(queueName,true,false,false,null);
#聲明一個名爲queueName,持久化,非排他、非自動刪除的隊列
channel.queueBind(queueName,exchangeName,routingKey)
#將隊列與交換器使用routingKey綁定
聲名交換器exchangeDeclare的參數說明:
exchange,交換器名稱
type,交換器類型,如fanout,direct,topic
durable,是否持久化,持久化能夠將交換器存盤,在服務器重啓後不會丟失相關信息
autoDelete,是否自動刪除。自動刪除的前提是至少有一個隊列或交換器與這個交換器綁定,以後全部與這個交換器綁定的隊列或交換器都與此解綁,並非與此鏈接的客戶端都斷開
internal,是不是內置的,內置的交換器客戶端沒法直接發送消息到這個交換器中,只能經過交換器路由到交換器的方式
argument,其餘一些結構化參數
交換器刪除
exchangeDelete(交換器名稱,isUnused):isUnused用來設置在交換器沒有被使用時刪除,爲false時,強制刪除
隊列queue聲明參數說明
不帶任何參數時,queueDeclare方法默認建立一個由RabbitMQ命名的隊列,屬性爲排他、自動刪除、非持久化
帶參數時:
queue,隊列名稱
durable,是否持久化。持久化的隊列會存盤,在服務器重啓時能夠保證不丟失相關信息
exclusive,是否排他,若是一個隊列聲明爲排他,該隊列僅對首聲聲明它的鏈接可見,並在鏈接斷開後自動刪除
#排他隊列基於鏈接可見,同一個鏈接的不一樣信道能夠同時訪問同一鏈接建立的排他隊列;若是一個鏈接已聲明一個排他隊列,其餘鏈接不容許創建同名排他隊列;即便該隊列是持久化的,一旦鏈接關閉或客戶端退出,該排他隊列都會被自動刪除;該隊列適用於同一個客戶端同時發送和讀取消息的場景
autoDelete,是否自動刪除。自動刪除前提是:至少有一個消費者鏈接到這個隊列,以後全部與這個隊列鏈接的消費者都斷開時,才自動刪除
arguments,其餘參數,如x-message-ttl,x-expires,x-max-length,x-max-length-bytes,x-dead-letter-exchange,x-dead-letter-routing-key,x-max-priority等
隊列刪除
queueDelete(queue,ifUnused,ifEmpty)
清空隊列
queuePurge
隊列與交換器綁定
queueBind(queue,exchange,routingKey)
解綁
queueUnbind(queue,exchange,routingKey)
交換器綁定交換器
exchangeBind(destination,source,routingKey) 綁定後,消息從source交換器轉發到destination交換器
RabbitMQ的消息存儲在隊列中,交換器的使用並不會耗費服務器性能,可是隊列會。衡量MQ的QPS看隊列的便可,性能指標:流量、內存、網卡佔用
消息的投遞模式delivery mode爲2時,消息會被持久化(存入磁盤);消息的優化級priority;消息的類型content-type:text/plain;消息還能夠帶headers頭;帶過時時間expiration
發送消息的參數:
exchange,交換器名稱,若設置爲空,則消息會發送至默認交換器中
routingKey,路由鍵,交換器根據路由鍵將消息存儲到相應隊列中
props,消息基本屬性集,14個屬性:contentType、contentEncodeing、headers、deliveryMode、priority、correlationId、replyTo、expiration、messageId、timestamp、type、userId、appId、clusterId
body,消息體
mandatory和immediate
消費消息:推模式Push、拉模式Pull
推模式中,能夠經過持續訂閱方式消費消息。不一樣的訂閱採用不一樣的消費者標籤consumerTag來區分,同一個channel中的消費者也須要經過消費者標籤做區分
basicConsume參數:
queue,隊列名稱
qutoAck,是否自動確認,建議爲false,不自動確認
consumerTag,消費者標籤,區分多個消費者
noLocal,爲true時表示不能將同一個connection中生產者發送的消息傳送給這個connection中的消費者
exclusive,是否排他
arguments,其餘參數
callback,消費者的回調函數
拉模式中,參數僅須要queue和autoAck兩個參數
若是隻想從隊列得到單條消息而不是持續訂閱,建議使用拉模式進行消費,但不能將拉模式放入一個循環裏替代推模式,由於這樣作會嚴重影響RabbitMQ的性能
爲保證消息從隊列可靠達到消費者,MQ提供了消息確認機制。消費者在訂閱隊列時,能夠指定autoAck參數,當autoAck爲false時,MQ會等待消費者顯式回覆確認信號後才從內存中移去消息。當autoAck設置爲true時,MQ會自動把發送出去的消息置爲確認,而後從內存中刪除,而無論消費者是否真的消費到了這些消息
採用消息確認機制後,只要設置了autoAck爲false,消費者就有足夠時間處理消息,不用擔憂處理過程當中消費者進程掛掉後消息的丟失,由於MQ會一直等待直到消費者顯式調用Basic.Ack命令爲止
查看消息狀態命令:rabbitmqctl list_queues name messages_ready messages_unacknowledged
消息的拒絕:在消費者接收到消息後,若是想拒絕,能夠調用channel.basickReject方法,basicReject(deliveryTag,requeue),deliveryTag能夠看做是消息的編號,是64位長整型值;requeue爲true時,MQ會將該消息存入隊列,爲false時,消息會從隊列中移除
mandatory和immediate以及備份交換器Alternate Exchange
當mandatory爲true時,交換器沒法根據自身類型和路由鍵打開符合條件的隊列時,會調用Basic.Return將消息返回給生產者;爲false時將不會返回給生產者。生產者獲取被退回的消息可使用channel.addReturnListener添加ReturnListener監聽器實現
當immediate參數設置爲true時,若是交換器將消息路由到隊列時,發現隊列上並不存在任何消費者,那麼這條消息將不會存入隊列中。當於路由鍵匹配的全部隊列都沒有消費者時,消息會經過Basic.Return返回至生產者
總結:mandatory告訴服務器至少將消息路由到一個隊列中,不然將消息返回生產者;immediate告訴服務器,若是該消息關聯的隊列上有消費者,當即投遞,沒有消費者,則返回消息給生產者,不用將消息存入隊列等待(immediate參數在3.0版本中已去掉,建議採用TTL和DLX的方法替代)
備份交換器Alternate Exchange,簡稱AE
在設置了madatory參數後,未被路由的消息會存入備份交換器,這樣能夠簡化生產者代碼邏輯,不用添加ReturnListener,而是在須要的時候處理這些消息。在聲明交換器時添加alternate-exchange參數啓用,也能夠經過策略policy的方法實現
若是備份交換器不存在,客戶端和服務端都不會有異常出現,消息丟失
若是備份交換器沒有綁定隊列,客戶端和服務端都不會有異常出現,消息丟失
若是備份交換器沒有任何匹配隊列,客戶端和服務端都不會有異常出現,消息丟失
若是備份交換器和mandatory參數一塊兒使用,那麼mandatory參數無效
過時時間TTL
設置方法一:經過隊列屬性設置,隊列中全部消息都有相同過時時間
設置方法二:對消息自己單獨設置
若二者同時設置,以TTL較小者的值爲準。時間超過TTL值時,會變成死信Dead Message
經過隊列屬性設置TTL的方法是,channel。queueDeclare方法中加入x-message-ttl參數實現,單位毫秒
也能夠經過Policy的方式設置TTL
rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues
還能夠經過調用HTTP API接口設置
curl -i -u root:root -H "content-type:application/json" -X PUT -d'{"auto_delete":false,"durable":true,"arguments":{"x-message-ttl":60000}}' http://localhost:15672/api/queues/{vhost}/{queuename}
若不設置TTL,則消息永不過時;若設置爲0,表示除非能夠直接將消息投遞到消費者,不然消息當即丟棄
針對單條消息的TTL方法爲,在channel.basicPublish中加入expiration屬性,單位毫秒
也能夠用HTTP API接口
curl -i -u root:root -H "content-type:application/json" -X POST -d '{"properties":{"expiration":"60000"},"routing_key":"routingKey","payload":"my body","payload_encoding":"string"}' http://localhost:15672/api/exchanges/{vhost}/{exchangename}/publish
第一種設置方法的消息到期後,會直接從隊列中抹去;第二種方法設置的,在投遞前進行判斷
隊列的TTL
隊列聲明時經過x-expires參數設置,當隊列未使用(沒有任何消費者、沒有被從新聲明、過時時間段內未調用過Basic.Get命令)時,會被刪除。服務器重啓後,持久化的隊列過時時間會從新計算,x-expires單位爲毫秒,不能設置爲0
死信隊列DLX
DLX,Dead-Letter-Exchange,死信交換器,死信郵箱。當消息在一個隊列中變成死信後,能被從新發送到另外一個交換器中,這個交換器就是DLX,綁定DLX的隊列就是死信隊列
聲明隊列時經過x-dead-letter-exchange參數爲該隊列添加DLX
也能夠經過Policy設置
rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"dlx_exchange"}' --apply-to queues
消息變成死信的狀況:消息被拒,且設置requeue參數爲false、消息過時、隊列達到最大長度
DLX能夠用來分析異常,或者配合TTL實現延遲隊列的功能
延遲隊列
延遲隊列存儲的是延遲消息,MQ自己沒有直接支持延遲隊列的功能,但能夠經過DLX和TTL模擬
優化級隊列
優化級高的消息具有優先被消費的特權。經過設置隊列的x-max-priority參數實現,配置一個隊列的最大優先級,而後在發送消息時,爲每條消息指定當前的優先級priority
RPC實現
遠程過程調用,客戶端發送請求消息,服務端回覆響應消息。爲了接收響應消息,須要在請求中發送一個回調隊列(replyTo)
replyTo,設置一個回調隊列
correlationId,關聯請求request和其調用RPC以後的回覆response
處理流程:
一、客戶端啓動時,建立一個匿名的回調隊列,名稱由MQ自動建立
二、客戶端爲RPC請求設置2個屬性,replyTo用來告知RPC服務端回覆請求時的目的隊列,即回調隊列;correlationId用來標記一個請求
三、請求發送到rpc_queue隊列中
四、RPC服務端監聽rpc_queue隊列中的請求,當請求到來時,處理請求且把帶有結果的消息發送給客戶端,接收隊列就是replyTo設定的回調隊列
五、客戶端監聽回調隊列,當有消息時,檢查correlationId屬性,若與請求匹配,即說明爲結果
持久化
提升可靠性,在異常(重啓、關閉、宕機)時的數據防丟失。MQ持久化包括:交換器持久化、隊列持久化、消息持久化
交換器持久化經過聲明時將durable置爲true實現,能夠防止重啓後交換器元數據的丟失,但消息不會丟失,只是不能再將消息發送到該交換器中
隊列持久化經過聲明時將durable置爲true實現,若不設置持久化,重啓後,隊列會丟失,隊列中的消息也會丟失。隊列持久化能夠保證自己元數據不丟失,但並不保證內部存儲的消息不丟失
消息持久化經過將投遞模式BasicProperties中deliveryMode屬性設置爲2實現
設置了隊列和消息持久化,重啓後,消息依舊存在
單單隻設置隊列持久化,重啓後,消息會丟失
單單隻設置消息持久化,重啓後,隊列消失,繼而消息也丟失
因此,單單設置消息持久化而不設置隊列持久化毫無心義
消息持久化會嚴重影響RabbitMQ的性能
持久化還須要autoAck配合,同時,緩存落盤也有風險,爲真正確保消息不丟失,可使用RabbitMQ的鏡像隊列機制,實際生產環境中關鍵業務隊列通常都會設置鏡像隊列
生產者確認
事務機制確認;發送方確認publisher confirm
事務機制:channel.txSelect用於將當前信道設置成事務模式;channel.txCommit用於提交事務;channel.txRollback用於事務回滾
事務機制可以解決消息發送方和MQ之間消息確認的問題,只有消息成功被MQ接收,事務才能提交成功
採用事務機制會嚴重下降MQ消息吞吐量
發送方確認機制:生產者將信道設置成confirm模式,消息被投遞到匹配的隊列後,會發送確認給生產者,包含消息的惟一ID;若消息和隊列是可持久化的,那麼確認消息會在消息寫入磁盤後發出,要提升效率,須要使用批量confirm或異步conifrm,不然QPS只有2000左右(低7倍)
事務機制和publisher confirm機制是互斥的;事務機制和publisher confirm機制確認的是消息可以正確的發送至MQ,若是找不到匹配的隊列,消息依然會被丟棄
消息分發
隊列有多個消費者時,消息將以輪詢roud-robin分發方式發送給消費者。channel.basicQos容許限制信道上的消費者所能保持的最大未確認消息數量,當達到設置的值時,MQ將再也不發送消息給對應的客戶端,直到消費者確認值小於Qos設置的值時
Basic.Qos的使用對拉模式的消費方式無效
無特殊須要,最好只使用global爲false的設置,默認
消息的順序性
嚴格意義上講,消息不能保證順序性。若要保證消息順序性,須要業務方作進一步處理,如在消息體內添加全局有序標識ID來實現
目前大多數主流消息中間件都沒有消息去重機制,也不保證"剛好一次",去重處理一般是在業務端實現,如引入GUID
提高數據可靠性的途徑:設置mandatory參數或者備份交換器(immediate參數已被淘汰);設置publisher confirm機制或者事務機制;設置交換器、隊列和消息都爲持久化;設置消費端對應的autoAck參數爲false,並在消費完消息後再進行消息確認
多租戶與權限
虛擬主機virtual host,簡稱vhost
vhost間是絕對隔離的
建立vhost
rabbitmqctl add_vhost {vhost}
rabbitmqctl add_vhost dong顯示vhost
rabbitmqctl list_vhosts [vhostinfoitem...] #可選值爲name tracing
rabbitmqctl list_vhosts
rabbitmqctl list_vhosts name tracing
rabbitmqctl trace_on #開啓/的trace刪除vhost
#刪除一個vhost同時會刪除其下的全部隊列、交換器、綁定關係、用戶權限、參數和策略
rabbitmqctl delete_vhost vhost1
MQ中權限以vhost爲單位
受權命令
rabbitmqctl set_permissions [-p vhost] {user} {conf} {write} {read}
#conf,write,read分別爲匹配用戶可配置,可寫,可讀權限的正則表達式
#可配置:隊列和交換器的建立、刪除
#可寫:發佈消息
#可讀:與消息有關的操做,讀取消息、清空隊列等
rabbitmqctl list_users
rabbitmqctl set_permissions -p dong root "^queue.*" ".*" ".*"顯示用戶權限
rabbitmqctl list_permissions [-p vhost]
rabbitmqctl list_user_permissions {username}
rabbitmqctl -p dong list_permissions
rabbitmqctl list_permissions -p dong清除權限
rabbitmqctl clear_permissions [-p vhost] {username}
rabbitmqctl clear_permissions -p dong root
rabbitmqctl工具標準語法:[]表示可選參數 {}表示必選參數
rabbitmqctl [-n node] [-t timeout] [-q] {command} [command options...]
[-n node],默認節點是rabbit@hostname,hostname是主機名
[-q],啓用quiet模式
[-t timeout],超時時間,秒,只適用list_xxx類型的命令
用戶管理
單個用戶能夠跨多個vhost進行受權
建立用戶
rabbitmqctl add_user root root123更新密碼
rabbitmqctl change_password {username} {newpassword}清除密碼
rabbitmqctl clear_password {username} #清除後用戶不能使用密碼登陸經過密碼驗證用戶
rabbitmqctl authenticate_user {username} {password}刪除用戶
rabbitmqctl delete_user {username}查詢用戶
rabbitmqctl list_users
#會顯示用戶角色tags
#none,無任何角色,新建立的用戶默認爲none
#management,能夠訪問web管理頁面
#policymaker,包含management權限,並能夠管理策略policy和參數parameter
#monitoring,包含management權限,並可看到全部鏈接、信道及節點相關信息
#administrator,最高權限,能夠管理用戶、虛擬主機、權限、策略、參數等設置用戶角色
rabbitmqctl set_user_tags {username} {tag ...} #設置後任何以前的身份都會被刪除
rabbitmqctl set_user_tags admin policymaket monitoring
Web端管理
啓用RabbitMQ management插件,RabbitMQ提供不少插件,默認存放於$RABBITMQ_HOME/plugins目錄下
rabbitmq-plugins語法:
rabbitmq-plugins [-n node] {command} [command options...]
啓用插件:
rabbitmq-plugins enable [plugin-name]
關閉插件:
rabbitmq-plugins disable [plugin-name]啓用management插件
rabbitmq-plugins enable rabbitmq_management查看插件啓用狀況
rabbitmq-plugins list
#[E*]爲顯示啓動 [e*]爲隱式啓動
應用與集羣管理
應用管理
rabbitmqctl stop [pid_file]
若使用rabbitmq-server -detach這個命令啓動RabbitMQ,則不會生成pid_file
rabbitmqctl shutdown #阻塞直到erlang虛擬機進程退出
rabbitmqctl stop_app 中止MQ服務,但不中止Erlang虛擬機
rabbitmqctl reset #重置MQ,刪除全部,恢復到初始狀態
#從原來所在集羣中刪除此節點,從管理數據庫中刪除全部的配置數據,如用戶、vhost,刪除全部持久化消息
#執行reset前密碼中止MQ應用
rabbitmqctl force_reset 強制重置還原到最初狀態,執行前須要中止MQ應用
rabbitmqctl start_app
rabbitmqctl wait [pid_file] 等待MQ應用的啓動,會等到pid_file的建立,而後等待pid_file中所表明的進程啓動
rabbitmqctl rotate_logs {suffix} 指示輪換日誌文件,suffix爲日誌文件後綴
集羣管理
#將節點加入指定集羣,執行前須要中止MQ應用並重置節點
rabbitmqctl join_cluster {cluster_node} [--ram]
#顯示集羣狀態
rabbitmqctl cluster_status
#修改集羣節點類型,執行前須要中止MQ應用
rabbitmqctl change_cluster_node_type {disc|ram}
#將節點從集羣中刪除,容許離線執行
rabbitmqctl forget_cluster_node [--offline]
#在集羣節點應用啓動前諮詢clusternode節點的最新信息,並更新相應的集羣信息,但不加入集羣
rabbitmqctl update_cluster_nodes {clusternode}
#無條件的啓動節點
rabbitmqctl force_boot
#指示未同步隊列queue的slave鏡像能夠同步master鏡像的內容,同步期間隊列會被阻塞,直到同步完成
rabbitmqctl sync_queue [-p vhost] {queue}
#取消隊列queue同步鏡像的操做
rabbitmqctl cancel_sync_queue [-p vhost] {queue}
#設置集羣名稱
rabbitmqctl set_cluster_name {name}
服務端狀態
#顯示隊列詳細信息
rabbitmqctl list_queues [-p vhost] [queueinfoitem...]
其中queueinfoitem包含的值以下:
name,隊列名稱
durable,隊列是否持久化
auto_delete,是否自動刪除
arguments,隊列參數
policy,應用到隊列上的策略名稱
pid,隊列關聯的Erlang進程ID
owner_pid,處理排他隊列鏈接的Erlang進程ID,若此隊列是非排他的,則值爲空
exclusive,隊列是否排他
exclusive_sonsumer_pid,訂閱到此排他隊列的消費者相關的信道關聯的Erlang進程ID,若爲非排他,此值爲空
exclusive_consumer_tag,訂閱到此排他隊列的消費者的consumerTag,若爲非排他,此值爲空
messages_ready,準備發送給客戶端的消息個數
messages_unacknowledged,發送給客戶端但還沒有應答的消息個數
messages,準備發送給客戶端和末應答消息的總和
messages_ready_ram,駐留內存的messages_ready消息個數
messages_unacknowledged_ram,駐留內存的messages_unacknowledged消息個數
messages_ram,駐留內存中的消息總數
messages_persistent,隊列中持久化消息的個數,若爲非持久化隊列,則爲0
messages_bytes,隊列中全部消息大小總和(不包括消息屬性和其餘開銷)
messages_bytes_ready messages_bytes_unacknowledged messages_bytes_ram messages_bytes_persistent
disk_reads,從隊列啓動開始,已從磁盤中讀取該隊列消息總次數
disk_writes,從隊列啓動開始,已向磁盤寫入消息總次數
consumer,消費者數目
consumer_utilisation,能馬上投遞給消費者的比率
memory,與隊列相關的Erlang進程所消耗內存字節數,包括棧、堆及內部結構
slave_pids,鏡像隊列,列出全部slave鏡像的pid
synchronised_slave_pids,鏡像隊列,列出全部已同步的slave鏡像pid
state,隊列狀態
默認無參數顯示隊列名稱和消息個數#顯示交換器詳細信息
rabbitmqctl list_exchanges [-p vhost] [exchangeinfoitem...]
name,交換器名稱
type,類型
durable,是否持久化
auto_delete,是否自動刪除
internal,是不是內置的
arguments,其餘一些結構化參數,如alternate-exchange
policy,應用到交換器上的策略#顯示綁定關係
rabbitmqctl list_bindings [-p vhost] [bindinginfoitem...]
source_name,綁定中消息來源
source_kind,來源的類型
destination_name,目的名稱
destination_kind,目的類型
routing_key,綁定的路由鍵,默認沒有名稱的交換器會綁定全部隊列,且路由鍵爲隊列名稱
arguments,綁定的參數
#顯示鏈接信息
rabbitmqctl list_connections [connectioninfoitem...]
返回TCP/IP鏈接統計信息
pid,與鏈接相關的Erlang進程ID
name,鏈接的名稱
port,服務器的端口
host,反向DNS獲取的服務器主機名稱或者IP地址,或者未啓用
peer_port,對端端口,客戶端的端口
peer_host,客戶端IP,或者未啓用
ssl,是否啓用SSL
ssl_protocol,SSL協議,如tlsv1
ssl_key_exchange,SSL密鑰交換算法,如rsa
ssl_cipher,SSL加密算法,如aes_256_cbc
ssl_hash,SSL哈希算法,如sha
peer_cert_subject,對端SSL安全證書主題,基於RFC4514形式
peer_cert_issuer,對端SSL證書發行者,基於RFC4514
peer_cert_validity,對端SSL證書有效期
state,鏈接狀態,包括starting/tuning/opening/running/flow/blocking/blocked/closing/closed
channels,該鏈接的信道個數
protocol,使用的AMQP協議版本,如{0,9,1}
auth_mechanism,使用的SASL認證機制,如PLAIN/AMQPLAIN/EXTERNAL/BABBIT-CR-DEMO等
user,與鏈接相關的用戶
vhost,與鏈接相關的vhost
timeout,鏈接超時/協商的心跳間隔,秒
frame_max,最大傳輸的幀大小,單位B
channel_max,此鏈接上信道的最大數量,若爲0表示無上限
client_properties,在創建鏈接期間由客戶端發送的信息屬性
recv_oct,收到的字節數
recv_cnt,收到的數據包個數
send_oct,發送的字節數
send_cnt,發送的數據包個數
send_pend,發送隊列大小
connected_at,鏈接創建的時間戳
默認顯示user、peer_host、peer_port、state這幾項
#顯示信道信息
rabbitmqctl list_channels [channelinfiitem...]
pid,與鏈接相關的Erlang進程ID
connection,信道所屬的鏈接的Erlang進程ID
name,信道名稱
number,信道序號
user,與信道相關的用戶名稱
vhost,與信道相關的vhost
transactional,信道是否處於事務模式
consumer_count,信道中消費者個數
messages_unacknowledged,已投遞但未ack的消息個數
messages_uncommitted,已接收但還未提交事務的消息個數
acks_uncommitted,已ack收到,但還未提交事務的消息個數
messages_unconfirmed,已發送但未確認消息個數,若信道未處於publisher confirm模式下,此值爲0
perfetch_count,新消費者的Qos個數限制,0表示不限
global_prefetch_count,整個信道的Qos個數限制,0表示不限
默認顯示pid,user,consumer_count、messages_unacknowledged這幾項
#顯示消費者信息
rabbitmqctl list_consumers [-p vhost]
已訂閱隊列名稱、相關信道進程標識、consumerTag、是否須要消費端確認、prefetch_count、參數列表
#顯示Broker狀態
rabbitmqctl status
當前Erlang節點上運行的應用程序、版本、OS名稱、內存及文件描述符等信息
#節點健康檢查
rabbitmqctl node_health_check
#運行環境
rabbitmqctl environment
顯示程序環境中每一個變量的名稱和值
#服務器狀態報告
rabbitmqctl report
爲全部服務器狀態生成狀態報告
#執行任意表達式
rabbitmqctl eval {expr}
執行Erlang表達式,如rabbitmqctl eval 'node().' #返回rabbitmqctl鏈接的節點的名稱
用戶、Parameter、vhost、權限等要以經過rabbitmqctl建立和刪除;交換器、隊列、綁定關係的建立和刪除沒有相關的rabbitmqctl命令
曲線救國方案:
#建立名稱爲exchange2的交換器
rabbitmqctl eval 'rabbit_exchange:declare({resource,<<"/">>,exchange,<<"exchange2">>},direct,true,false,false,[]).'
##declare(XName,Type,Durable,AutoDelete,Internal,Args).
##XName,交換器命名細節,{resource,VHost,exchange,Name}
##Type,類型,direct、headers、topic、fanout
##Durable,持久化;AutoDelete,自動刪除;Internal,是否爲內置交換器;Args,參數
##刪除rabbitmqctl eval 'rabbit_exchange:delete({resource,<<"/">>,exchange,<<"exchange2">>},false).'
#建立名稱爲queue2的隊列
rabbitmqctl eval 'rabbit_amqqueue:declare({resource,<<"/">>,queue,<<"queue2">>},true,false,[],none).'
##declare(QueueName,Durable,AutoDelete,Args,Owner).
##QueueName,隊列命名細節,{resource,VHost,queue,Name}
##Owner,隊列的獨佔模式,通常爲none,排他
##刪除rabbitmqctl eval 'rabbit_amqqueue:internal_delete({resource,<<"/">>,queue,<<"queue2">>}).'
#將交換器exchange2與隊列queue2綁定,路由鍵爲rk2
rabbitmqctl eval 'rabbit_binding:add({binding,{resource,<<"/">>,exchange,<<"exchange2">>},<<"rk2">>,{resource,<<"/">>,queue,<<"queue2">>},[]}).'
##add({binding,Source,Key,Destination,Args}).
##刪除rabbitmqctl eval 'rabbit_binding:remove({binding,{resource,<<"/">>,exchange,<<"exchange2">>},<<"rk2">>,{resource,<<"/">>,queue,<<"queue2">>},[]}).'
若要刪除全部的交換器、隊列及綁定關係,只須要刪除對應的vhost就能夠一鍵搞定
HTTP API接口管理
RabbitMQ Management插件不只提供Web管理界面,還提供了HTTP API接口
POST方法建立的是沒法用具體名稱的資源,如綁定和發佈消息等
建立名爲queue33的隊列
%2F是默認vhost:/的轉義
curl -i -u admin:admin -H"content-type:application/json" -XPUT -d'{"auto_delete":false,"durable":true,"node":"rabbit@localhost"}' http://127.0.0.1:15672/api/queues/%2F/queue33
獲取隊列queue33的信息
curl -i -u admin:admin -XGET http://127.0.0.1:15672/api/queues/%2F/queue33
刪除隊列queue33
curl -i -u admin:admin -XDELETE http://127.0.0.1:15672/api/queues/%2F/queue33
向交換器exchange2發送一條消息
curl -i -u admin:admin -XPOST -d'{"properties":{},"routing_key":"rk2","payload":"my_body","payload_encoding":"string"}' http://127.0.0.1:15672/api/exchanges/%2F/exchange2/publish
清空隊列queue2
curl -i -u admin:admin -XDELETE http://127.0.0.1:15672/api/queues/%2F/queue2/contents
從隊列中消費消息
curl -i -u admin:admin -XPOST -d'{"count":5,"requeue":false,"encoding":"auto","truncate":50000}' http://127.0.0.1:15672/api/queues/%2F/queue2/get
#獲取到的結果[{"payload_bytes":7,"redelivered":false,"exchange":"exchange2","routing_key":"rk2","message_count":0,"properties":[],"payload":"my_body","payload_encoding":"string"}]
#count表示最大能獲取的消息個數
#requeue表示獲取到消息後,消息是否從隊列中刪除,爲true不刪除,但會設置redelivered標識會被設置
#encoding表示編碼格式,auto或base64
#truncate,可選參數,若是消息payload超過指定大小會被截斷
健康檢查
GET /api/healthchecks/node
#正常返回{"status":"ok"},異常返回{"status":"failed","reason":"string"} 返回碼均爲200獲取HTTP API接口列表 GET /api/ 相似幫助文檔
rabbitmqadmin工具
rabbitmqadmin是management插件提供的功能,包裝了HTTP API接口,更易於使用
位置:/opt/rabbitmq/var/lib/rabbitmq/mnesia/rabbit@localhost-plugins-expand/rabbitmq_management-3.6.15/priv/www/cli/rabbitmqadmin
下載:wget http://127.0.0.1:15672/cli/rabbitmqadmin
#rabbitmqadmin須要python的支持
chmod +x rabbitmqadmin
#建立隊列queue1
./rabbitmqadmin -u admin -p admin declare queue name=queue1
#顯示隊列
./rabbitmqadmin list queues
#刪除隊列
./rabbitmqadmin -u admin -p admin delete queue name=queue1
#其它經常使用命令
./rabbitmqadmin -V / list exchanges
#列出全部交換器
./rabbitmqadmin list queues / name queue2 messages message_stats.publish_details.rate
./rabbitmqadmin declare exchange name=my-new-exchange type=fanout
#建立交換器
./rabbitmqadmin publish exchange=exchange2 routing_key=rk2 payload="hello world"
#發送消息
./rabbitmqadmin get queue=queue2 requeue=false
#接收消息
./rabbitmqadmin export rabbit.definitions.json
#導出配置
./rabbitmqadmin -q import rabbit.definitions.json
#導入配置
#關閉全部鏈接
./rabbitmqadmin -f tsv -q list connections name | while read conn ; do ./rabbitmqadmin -q close connection name="${conn}" ; done
一般默認配置就能夠
三種配置方案:
環境變量,能夠配置節點名稱、RabbitMQ配置文件地址、節點內部通訊端口等
配置文件,定義RabbitMQ服務和插件,如TCP監聽端口、網絡相關設置、內存限制、磁盤限制等
運行時參數和策略,集羣層面的服務設置
環境變量
以RABBITMQ_開頭,能夠在環境中設置,也能夠在rabbitmq-env.conf中定義。在非shell環境中配置,須要將RABBITMQ_前綴去除
優先級:Shell環境>rabbitmq-env.conf>默認配置
RABBITMQ_NODENAME=rabbit@node2 rabbitmq-server -detached
rabbitmq-env.conf默認位於$RABBITMQ_HOME/etc/rabbitmq/目錄下
配置示例:
NODENAME=rabbit@node1
NODE_PORT=5672
#rabbitmq.conf不須要添加.conf後綴
CONFIG_FILE=/opt/rabbitmq/etc/rabbitmq/rabbitmq
若是沒有特殊需求,不建議更改RabbitMQ的環境變量,在實際生產環境中,若對配置和日誌的目錄有特殊要求,能夠參考下面的配置
#配置文件的地址
CONFIG_FILE=/apps/conf/rabbitmq/rabbitmq
#環境變量的配置文件地址
CONF_ENV_FILE=/apps/conf/rabbitmq/rabbitmq-env.conf
#服務日誌的地址
LOG_BASE=/apps/logs/rabbitmq
#Mnesia的路徑
MNESIA_BASE=/apps/dbdat/rabbitmq/mnesia
配置文件
能夠從日誌中獲取相關的文件位置信息,以下
=INFO REPORT==== 1-Jun-2018::15:30:12 ===
node : rabbit@localhost
home dir : /root
config file(s) : /opt/rabbitmq/etc/rabbitmq/rabbitmq.config (not found)
cookie hash : EX4yjV7OOErONodYs7ez4w==
log : /opt/rabbitmq/var/log/rabbitmq/rabbit@localhost.log
sasl log : /opt/rabbitmq/var/log/rabbitmq/rabbit@localhost-sasl.log
database dir : /opt/rabbitmq/var/lib/rabbitmq/mnesia/rabbit@localhost也能夠從進程信息中查看相關路徑信息
ps -ef|grep rabbit
官方配置文件示例:https://github.com/rabbitmq/rabbitmq-server/blob/master/docs/rabbitmq.config.example
官方高級配置示例:https://github.com/rabbitmq/rabbitmq-server/blob/master/docs/advanced.config.example
配置加密
配置文件中的敏感配置項能夠被加密,格式爲:{encrypted,加密的值}形式包裹
加密須要設置口令,config_entry_decoder中的passphrase配置就是口令 config_entry_decoder,[{passphrase,<<"zzhpassphrase">>}]
生成加密字符串
rabbitmqctl encode '<<"guest">>' zzhpassphrase
獲得{encrypted,<<"dRrG+PV2Jxu+yoQh6EKzGiv6U6cFxnFWvMx23g8ij17Tp5Ng5fdHFJ4Va754h2iI5DMi2ZN+wgnniPWXHt44XA==">>}
解密:rabbitmqctl encode --decode '{encrypted,<<"dRrG+PV2Jxu+yoQh6EKzGiv6U6cFxnFWvMx23g8ij17Tp5Ng5fdHFJ4Va754h2iI5DMi2ZN+wgnniPWXHt44XA==">>}' zzhpassphrase
獲得:<<"guest">>
默認,加密機制PBKDF2從口令中派生了密鑰,默認的hash算法是SHA512,默認的迭代次數是1000,默認的加密算法爲AES_256_CBC
優化網絡配置
MQ支持的全部協議都是基於TCP層面的,默認MQ會在全部接口的5672端口下監聽
提升吞吐量能夠禁用Nagle算法、增大TCP緩衝區大小。每一個TCP鏈接都有緩衝區,緩衝區越大,吞吐量也越高,但每一個鏈接上消費的內存也就越多,從而使總體服務內存增大。linux中,默認會自動調節TCP緩衝區的大小,通常爲80KB到120KB之間
文件句柄數,每一個節點上鍊接的數目乘以1.5能夠粗略估算,如要支撐10萬個TCP鏈接,文件句柄數城要15萬
參數及策略
運行時參數能夠經過rabbitmqctl工具或者management插件的http api接口進行修改
vhost級別的Parameter由一個組件名稱component name、名稱name、值value組成;global級別的參數,由名稱和值組成。兩者的值都是JSON類型的
vhost級別的參數對應的rabbitmqctl命令有三種:set_parameters、list_parameters、clear_patameter
#設置參數
rabbitmqctl set_parameters [-p vhost] {component_name} {name} {value}
rabbitmqctl list_parameters
rabbitmqctl set_parameters federation-upstream f1 '{"uri":"amqp://root:root123@192.168.0.2:5672\",\"ack-mode\":\"on-confirm\"}'
#顯示參數
rabbitmqctl list_parameters [-p vhost]
#消除參數
rabbitmqctl clear_parameter [-p vhost] {componenet_name} {key}
rabbitmqctl clear_parameter -p / federation-upstream f1
HTTP API接口:
#設置參數
PUT /api/parameters/{component_name}/vhost/name
#清除參數
DELETE /api/parameters/{componenet_name}/vhost/name
#顯示參數
GET /api/parameters
客戶端建立交換器或者隊列時可設置一些參數,如x-message-ttl、x-expires、x-max-length等,這些參數一但建立就不能刪除,只能刪除交換器或隊列從新建立。爲解決這個問題,能夠引入Policy,它是一種特殊的Parameter用法,Policy是vhost級別的,一個Policy能夠匹配一個或多個隊列,或者交換器。Policy支持動態修改一些屬性參數。一般Policy用來配置Federation、鏡像、備份交換器、死信等功能。
pattern,正則匹配,匹配相關的隊列或交換器
apply to,指定policy做用於哪一方,能夠是隊列、交換器、或同時
priority,定義優先級,如有多個policy做用於同一個交換品茶或者隊列,那麼priority最大的那個policy才起做用
#設置policy
rabbitmqctl set_policy [-p vhost] [--priority priority] [--apply-to apply-to] {name} {pattern} {definition}
其中name pattern definition是必填項
#設置默認vhost中全部以^amp.開頭的交換器爲聯邦交換器
rabbitmqctl set_policy --apply-to exchanges --priority 1 p1 "^amp." '{"federation-upstream":"f1"}'
curl -i -u admin:admin -XPUT -d'{"pattern":"^amq\.","definition":{"federation-upstream":"f11"},"priority":1,"apply-to":"exchanges"}' http://127.0.0.1:15672/api/policies/%2F/p12
#顯示policy
rabbitmqctl list_policies [-p vhost]
curl -i -u admin:admin -XGET http://127.0.0.1:15672/api/policies/%2F
#消除policy rabbitmqctl clear_policy [-p vhost] {name} rabbitmqctl clear_policy p12 curl -i -u admin:admin -XDELETE http://127.0.0.1:15672/api/policies/%2F/p12 若兩個或者多個policy都做用到同一交換器或隊列上,且這些policy優先級都同樣,則參數項最多的policy具備決定權;若參數同樣多,則最後添加的policy具備決定權