RabbitMQ是一個由erlang開發的AMQP(Advanced Message Queue )的開源實現,官網地址:http://www.rabbitmq.com。RabbitMQ做爲一個消息代理,主要負責接收、存儲和轉發消息,它提供了可靠的消息機制和靈活的消息路由,並支持消息集羣和分佈式部署,經常使用於應用解耦,耗時任務隊列,流量削鋒等場景。本系列文章將系統介紹RabbitMQ的工做機制,代碼驅動和集羣配置,本篇主要介紹RabbitMQ中一些基本概念,經常使用的RabbitMQ Control命令,最後寫一個C#驅動的簡單栗子。先看一下RabbitMQ的基本結構:html
上圖是RabbitMQ的一個基本結構,生產者Producer和消費者Consumer都是RabbitMQ的客戶端,Producer負責發送消息,Consumer負責消費消息。node
接下來咱們結合這張圖來理解RabbitMQ的一些概念:web
Broker(Server):接受客戶端鏈接,實現AMQP消息隊列和路由功能的進程,咱們能夠把Broker叫作RabbitMQ服務器。數據庫
Virtual Host:一個虛擬概念,一個Virtual Host裏面能夠有若干個Exchange和Queue,主要用於權限控制,隔離應用。如應用程序A使用VhostA,應用程序B使用VhostB,那麼咱們在VhostA中只存放應用程序A的exchange,queue和消息,應用程序A的用戶只能訪問VhostA,不能訪問VhostB中的數據。vim
Exchange:接受生產者發送的消息,並根據Binding規則將消息路由給服務器中的隊列。ExchangeType決定了Exchange路由消息的行爲,例如,在RabbitMQ中,ExchangeType有Direct、Fanout、Topic和Header四種,不一樣類型的Exchange路由規則是不同的(這些之後會詳細介紹)。windows
Queue:消息隊列,用於存儲還未被消費者消費的消息,隊列是先進先出的,默認狀況下先存儲的消息先被處理。瀏覽器
Message:就是消息,由Header和Body組成,Header是由生產者添加的各類屬性的集合,包括Message是否被持久化、由哪一個Message Queue接受、優先級是多少等,Body是真正傳輸的數據,內容格式爲byte[]。bash
Connection:鏈接,對於RabbitMQ而言,其實就是一個位於客戶端和Broker之間的TCP鏈接。服務器
Channel:信道,僅僅建立了客戶端到Broker之間的鏈接Connection後,客戶端仍是不能發送消息的。須要在Connection的基礎上建立Channel,AMQP協議規定只有經過Channel才能執行AMQP的命令,一個Connection能夠包含多個Channel。之因此須要Channel,是由於TCP鏈接的創建和釋放都是十分昂貴的。app
由於RabbitMQ是用erlang語言開發的,因此咱們在安裝RabbitMQ前必需要安裝erlang支持。
1 安裝erlang
首先下載erlang,直接下載最新版本,當前下載的是 OTP 21.3 Windows 64-bit Binary File ,下載完成後一直下一步安裝便可。
2 安裝RabbitMQ
下載Windows平臺的RabbtMQ,當前下載的是 rabbitmq-server-3.7.14.exe ,下載完成後一直下一步安裝便可。
3 安裝Web管理插件
打開RabbitMQ Command Prompt,執行命令 rabbitmq-plugins enable rabbitmq_management 便可完成Web監控插件的安裝。
安裝完成後,打開瀏覽器輸入 http://127.0.0.1:15672/ 使用默認帳號[ name:guest / password:guest ]登陸後界面以下,使用這個UI插件咱們能夠輕鬆的查看RabbitMQ中的交換機(exchange),隊列(queue)等內容,也能夠對exchange,queue,用戶等進行添加、修改、刪除操做。
到這一步Windows平臺安裝RabbitMQ完成了。 打開服務管理器,RabbitMQ已經在正常運行了,以下:
這裏虛擬機系統爲Centos7,採用的安裝方式是yum安裝,爲了簡單,這裏直接使用官方提供的erlang和RabbitMQ-server的自動安裝腳本(官方安裝文檔),逐行執行下邊的代碼就能夠安裝完成erlang和RabbitMQ。
#安裝socat yum install socat #安裝erlang curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash yum -y install erlang #安裝rabbitmq-server curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash yum -y install rabbitmq-server #啓動rabbitmq服務 systemctl start rabbitmq-server #添加web管理插件 rabbitmq-plugins enable rabbitmq_management
補充:若是安裝完成後,執行RabbitMQ執行命令特別慢,或者出現報錯【rabbitmq unable to perform an operation on node xxx@xxx】,解決方法:
編輯hosts,執行命令 vim /etc/hosts ,添加本機IP(或者虛擬機IP)
命令執行結束後,使用瀏覽器訪問 http://127.0.0.1:15672/ 也會出現web管理界面。經過上邊的安裝步驟安裝的RabbitMQ會生成Unit文件,因此咱們可使用Systemd管理RabbitMQ服務,如下是幾條經常使用的命令:
#啓動RabbitMQ服務 systemctl start rabbitmq-server #中止RabbitMQ服務 systemctl stop rabbitmq-server #查看RabbitMQ運行狀態 systemctl status rabbitmq-server #重啓RabbitMQ服務 systemctl restart rabbitmq-server
使用Web管理界面能夠實現RabbitMQ的大部分經常使用功能,可是有些功能WebUI是作不到的,如:開啓/關閉RabbitMQ應用程序和集羣的管理等。RabbitMQ Control是RabbitMQ的命令行管理工具,能夠調用全部的RabbitMQ內置功能,主命令是rabbitmqctl ,下邊是一個查詢用戶列表的命令,注意須要切換到sbin目錄下執行:
爲了方便的使用RabbitMQ Control工具,咱們最好添加一個環境變量,Windows默認安裝時在PATH中添加一條: C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.14\sbin ,不是默認安裝的話找到對應的安裝目錄添加PATH。按照上的安裝方法,Centos能夠直接使用RabbitMQ Control工具,不須要多餘的配置。若是想詳細瞭解RabbitMQ Control工具,能夠參考RabbitMQ Control的官方文檔。
這裏總結了一些最經常使用到的RabbitMQ Controll命令,有興趣的小夥伴能夠試着運行一下這些命令,如在命令行工具中使用命令 rabbitmqctl add_user <username> <password> 添加一個新用戶。
基本控制命令主要用於啓動、中止應用程序、runtime等
#中止rabbitmq和runtime rabbitmqctl shutdown #中止erlang節點 rabbitmqctl stop #啓用rabbitmq rabbitmqctl start_app #中止rabbitmq rabbitmqctl stop_app #查看狀態 rabbitmqctl status #查看環境 rabbitmqctl environment #rabbitmq恢復最初狀態,內部的exchange和queue都清除 rabbitmqctl reset
這些命令主要用於用於查看exchang、channel、binding、queue、consumers:
#返回queue的信息 list_queues [-p <vhostpath>] [<queueinfoitem> ...] #返回exchange的信息 list_exchanges [-p <vhostpath>] [<exchangeinfoitem> ...] #返回綁定信息 list_bindings [-p <vhostpath>] [<bindinginfoitem> ...] #返回連接信息 list_connections [<connectioninfoitem> ...] #返回目前全部的channels list_channels [<channelinfoitem> ...] #返回consumers list_consumers [-p <vhostpath>]
這些命令主要用於添加、修改、刪除用戶及管理用戶權限
#在rabbitmq的內部數據庫添加用戶 add_user <username> <password> #刪除一個用戶 delete_user <username> #改變用戶密碼 change_password <username> <newpassword> #清除用戶密碼,禁止用戶登陸 clear_password <username> #設置用戶tags,就是設置用戶角色 set_user_tags <username> <tag> # 查看用戶列表 list_users #建立一個vhost add_vhost <vhostpath> #刪除一個vhosts delete_vhost <vhostpath> #列出vhosts list_vhosts [<vhostinfoitem> ...] #針對一個vhosts 給用戶賦予相關權限 set_permissions [-p <vhostpath>] <user> <conf> <write> <read> #清除一個用戶對vhost的權限 clear_permissions [-p <vhostpath>] <username> #列出全部用戶對某一vhost的權限 list_permissions [-p <vhostpath>] #列出某用戶的訪問權限 list_user_permissions <username>
#clusternode表示node名稱,--ram表示node以ram node加入集羣中。默認node以disc node加入集羣,在一個node加入cluster以前,必須先中止該node的rabbitmq應用,即先執行stop_app。 join_cluster <clusternode> [--ram] #顯示cluster中的全部node cluster_status #設置集羣名字 set_cluster_name <clustername> #修改集羣名字 rename_cluster_node <oldname> <newname> #改變一個cluster中node的模式,該節點在轉換前必須先中止,不能把一個集羣中惟一的disk node轉化爲ram node change_cluster_node_type <disc | ram> #遠程刪除一個節點,刪除前必須該節點必須先中止 rabbitmqctl forget_cluster_node rabbit@rabbit1 #同步鏡像隊列 sync_queue <queuename> #取消同步隊列 cancel_sync_queue <queuename> #清空隊列中全部消息 purge_queue [-p vhost] <queuename>
這裏列舉的不少命令是現階段用不到的,如集羣控制相關的命令,這些命令的用法會在之後的章節中逐漸理解。
做爲開發者,咱們最在乎的仍是怎麼在代碼中使用RabbitMQ,能夠經過官方RabbitMQ開發文檔來學習RabbitMQ的使用,這裏以.NET爲例演示一下RabbitMQ的最基本用法。建立兩個Console應用,一個做爲發送消息的生產者(Producer),一個做爲接受消息的消費者(Consumer),生產者向隊列寫入消息,消費者接受這條消息,結構以下:
兩個控制檯應用都要添加RabbitMQ.Client包,命令以下:
Install-Package RabbitMQ.Client
生產者(Producer)代碼:
class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在設備ip,這裏就是本機 HostName = "127.0.0.1", UserName = "wyy",//用戶名 Password = "123321"//密碼 }; //第一步:建立鏈接connection using (var connection = factory.CreateConnection()) { //第二步:建立通道channel using (var channel = connection.CreateModel()) { //第三步:聲明交換機exchang channel.ExchangeDeclare(exchange: "myexchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); //第四步:聲明隊列queue channel.QueueDeclare(queue: "myqueue", durable: true, exclusive: false, autoDelete: false, arguments: null); Console.WriteLine("生產者準備就緒...."); //第五步:綁定隊列到交互機 channel.QueueBind(queue:"myqueue", exchange:"myexchange", routingKey:"mykey"); string message = ""; //第六步:發送消息 //在控制檯輸入消息,按enter鍵發送消息 while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase)) { message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); //基本發佈 channel.BasicPublish(exchange: "myexchange", routingKey: "mykey", basicProperties: null, body: body); Console.WriteLine($"消息【{message}】已發送到隊列"); } } } Console.ReadKey(); } }
消費者(Consumer)代碼:
class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在設備ip,這裏就是本機 HostName = "127.0.0.1", UserName = "wyy",//用戶名 Password = "123321"//密碼 }; //第一步:建立鏈接connection using (var connection = factory.CreateConnection()) { //第二步:建立通道channel using (var channel = connection.CreateModel()) { //第三步:聲明隊列queue channel.QueueDeclare(queue: "myqueue", durable: true, exclusive: false, autoDelete: false, arguments: null); //第四步:定義消費者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine($"接受到消息【{message}】"); }; Console.WriteLine("消費者準備就緒...."); //第五步:處理消息 channel.BasicConsume(queue: "myqueue", autoAck: true, consumer: consumer); Console.ReadLine(); } } } }
依次運行Producer和Consumer兩個應用程序,運行結果以下:
注意:上邊的代碼在生產者和消費者的代碼中都聲明瞭exchange和queue,這主要是爲了讓這兩個程序能夠按任意順序啓動,如:咱們只在生產者代碼中定義了exchange和queue,卻先啓動消費者,這會讓形成消費者找不到本身須要的exhange和queue(出現404錯誤)。實際開發中建立exchange/queue、綁定隊列以及設置routingKey這些工做,均可以經過WebUI管理界面或者使用Rabbitmq Control工具完成。
QueueDeclare方法用於聲明隊列,ExchangeDeclare用於聲明交換機,咱們在使用這兩個方法聲明時,能夠設置隊列和交換機的屬性,如queue的名字,長度限制,exchange是否持久化、交換機類型等。
在上邊的栗子中咱們使用了聲明隊列的方法 QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments) ,該方法經過參數設置隊列的特性。這裏介紹一下該方法 中幾個參數的做用,先看一個完整的聲明隊列的栗子:
//聲明隊列newsQueue channel.QueueDeclare(queue: "myqueue", durable: false, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object>() { //隊列中消息的過時時間是8s { "x-message-ttl",1000*8 }, //隊列60s沒有被使用,則刪除該隊列 {"x-expires",1000*60 }, //隊列最多保存100條消息 {"x-max-length",100 }, //隊列中ready類型消息總共不能超過1000字節 {"x-max-length-bytes",1000 }, //當隊列消息滿了時,丟棄傳來後續消息 {"x-overflow","reject-publish" }, //丟棄的消息發送到deadExchange交換機 {"x-dead-letter-exchange","deadExchange" }, //丟棄的消息發送到deadExchange交換機時的RoutingKey {"x-dead-letter-routing-key","deadKey" }, //隊列中最大的優先級等級爲10(在Publish消息時對每條消息設置優先級) {"x-max-priority",10 }, //設置隊列默認爲lazy {"x-queue-mode","lazy" } });
QueueDeclare方法的參數以下:
queue:隊列名字;
durable:是否持久化。設置爲true時,隊列信息保存在rabbitmq的內置數據庫中,服務器重啓時隊列也會恢復(注意:重啓後隊列內部的消息不會恢復,怎麼實現消息持久化之後會詳細介紹);
exclusive:是否排外。設置爲true時只有首次聲明該隊列的Connection能夠訪問,其餘Connection不能訪問該隊列;且在Connection斷開時,隊列會被刪除(即便durable設置爲true也會被刪除);
autoDelete:是否自動刪除。設置爲true時,表示在最後一條使用該隊列的鏈接(Connection)斷開時,將自動刪除這個隊列;
arguments:設置隊列的一些其它屬性,爲Dictionary<string,object>類型,下表總結了arguments中能夠設置的經常使用屬性。
參數名 | 做用 | 示例 |
Message TTL | 設置隊列中消息的有效時間 | { "x-message-ttl",1000*8 },設置隊列中的全部消息的有效期爲8s; |
Auto expire | 自動刪除隊列。必定的時間內隊列沒有被使用,則自動刪除隊列 | {"x-expires",1000*60 },設置隊列的過時時長爲60s,若是60s沒有隊列被訪問,則刪除隊列; |
Max length | 隊列能保存消息的最大條數 | {"x-max-length",100 },設置隊列最多保存100條消息; |
Max length bytes | 隊列中ready類型消息的總字節數 | {"x-max-length-bytes",1000 }, 設置隊列中ready類型消息總共不能超過1000字節; |
Overflow behaviour | 當隊列消息滿了時,再接收消息時的處理方法。有兩種處理方案:默認爲"drop-head"模式,表示從隊列頭部丟棄消息;"reject-publish "表示不接收後續的消息 |
{"x-overflow","reject-publish" },設置當隊列消息滿了時,丟棄傳來後續消息; |
Dead letter exchange | 用於存儲被丟棄的消息的交換機名。Overflow behaviour 的兩種處理方案中丟棄的消息都會發送到這個交換機 | {"x-dead-letter-exchange","beiyongExchange" },設置丟棄的消息發送到名字位beiyongExchange的交換機; |
Dead letter routing key | 被丟棄的消息發送到Dead letter exchange時的使用的routing Key | {"x-dead-letter-routing-key","deadKey" },設置丟棄的消息發送到beiyongExchange交換機時的RoutingKey值是"deadKey"; |
Maximum priority | 設置隊列中消息優先級的最大等級,在publish消息時能夠設置單條消息的優先級等級 | {"x-max-priority",10 },設置中消息優先級的最大等級爲10; |
Lazy mode | 設置隊列的模式,若是設置爲Lazy表示隊列中消息儘量存放在磁盤中,以減小內存佔用;不設置時消息都存放在隊列中,用以儘量快的處理消息 | {"x-queue-mode","lazy"},3.6之後版本可用,設置隊列中消息儘量存放在磁盤中,以減小內存佔用。在消息擁堵時和消息持久化配置使用能夠減小內存佔用。 |
聲明交換機的方法 void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments) 能夠設置交換機的特性,這裏簡單介紹一下這個方法的幾個參數:
channel.ExchangeDeclare(exchange: "myexchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: new Dictionary<string, object> {
{"alternate-exchange","BeiyongExchange" }//若是消息不能路由到該交換機,就把消息路由到備用交換機BeiyongExchange上
});
exchange:交換機名字。
type:交換機類型。exchange有direct、fanout、topic、header四種類型,在下一篇會詳細介紹;
durable:是否持久化。設置爲true時,交換機信息保存在rabbitmq的內置數據庫中,服務器重啓時交換機信息也會恢復;
autoDelete:是否自動刪除。設置爲true時,表示在最後一條使用該交換機的鏈接(Connection)斷開時,自動刪除這個exchange;
arguments:其餘的一些參數,類型爲Dictionary<string,object> 。
小結
本節主要介紹了RabbitMQ的基本概念,在Windows和Centos上的安裝方法,及RabbitMQ Control工具的基本使用,最後演示了一個C#驅動RabbitMQ的栗子,並詳細介紹了聲明queue和exchange的方法。經過這一節咱們大概瞭解了RabbitMQ的基本使用。之後的章節會逐漸介紹RabbitMQ的四種exchange、兩種Consumer的特色和使用場景,以及消息確認、優先級、持久化等,最後搭建一個高可用的RabbitMQ集羣,若是文中有錯誤的話,但願你們能夠指出,我會及時修改,謝謝!
參考文章
【1】 https://www.cnblogs.com/dwlsxj/p/RabbitMQ.html
【2】http://www.javashuo.com/article/p-somvodfc-cb.html
原文出處:https://www.cnblogs.com/wyy1234/p/10743567.html