AMQP(高級消息隊列協議)是一個網絡協議。它支持符合要求的客戶端應用(application)和消息中間件代理(messaging middleware broker)之間進行通訊。RabbitMQ是實現了高級消息隊列協議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。RabbitMQ服務器是用Erlang語言編寫的,因此安裝RabbitMq服務器端前須要先安裝Erlang。如下記錄在windows 7環境下的學習筆記。html
Erlang下載地址:https://www.erlang.org/downloads正則表達式
RabbitMQ服務端下載地址:https://www.rabbitmq.com/install-windows.htmlwindows
安裝好Erlang和RabbitMQ以後都須要新建系統變量,計算機-->屬性-->高級系統設置-->環境變量-->系統變量新建服務器
建系統變量時值爲安裝目錄,個人安裝目錄:網絡
Erlang:C:\Program Files\erl10.4app
RabbitMQ:C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.15學習
新建系統變量後,還須要在已有系統變量PATH中增長Erlang和RabbitMQ的配置(注:是追加,PATH原有的值不要刪除),以分號分隔。測試
Erlang:%ERLANG_HOME%\binfetch
RabbitMQ:%RABBITMQ_SERVER%\sbinspa
在cmd中運行以下命令:
"C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.15\sbin\rabbitmq-plugins.bat" enable rabbitmq_management
默認會生成一個帳號guest,密碼也是guest。
查看已有用戶及用戶的角色命令:
rabbitmqctl.bat list_users
RabbitMQ安裝後是默認啓動服務的,若要手動啓動、中止服務,執行以下命令:
啓動服務:
net start rabbitmq
中止服務:
net stop rabbitmq
默認地址是本地端口15672,地址:http://127.0.0.1:15672,用guest帳號登陸。
消息(message)被髮布者(publisher)發送給交換機(exchange),交換機經常被比喻成郵局或者郵箱。而後交換機將收到的消息根據路由規則分發給綁定的隊列(queue)。最後AMQP代理會將消息投遞給訂閱了此隊列的消費者,或者消費者按照需求自行獲取。
Default exchange(默認交換機):一個由消息代理預先聲明好的沒有名字(名字爲空字符串)的直連交換機(direct exchange)。它有一個特殊的屬性使得它對於簡單應用特別有用處:那就是每一個新建隊列(queue)都會自動綁定到默認交換機上,綁定的路由鍵(routing key)名稱與隊列名稱相同。
Direct exchange(直連交換機):根據消息攜帶的路由鍵(routing key)將消息投遞給對應隊列的。
Fanout exchange(扇型交換機):將消息路由給綁定到它身上的全部隊列,而不理會綁定的路由鍵。若是N個隊列綁定到某個扇型交換機上,當有消息發送給此扇型交換機時,交換機會將消息的拷貝分別發送給這全部的N個隊列。扇型用來交換機處理消息的廣播路由(broadcast routing)。
Topic exchange(主題交換機):經過對消息的路由鍵和隊列到交換機的綁定模式之間的匹配,將消息路由給一個或多個隊列。主題交換機常常用來實現各類分發/訂閱模式及其變種。主題交換機一般用來實現消息的多播路由(multicast routing)。
routing_key格式是以點號「."分割的字符表。對於routing_key,有兩個特殊字符(在正則表達式裏叫元字符):
*表明任意一個單詞
#表明0個或者多個單詞
因爲有"*"和"#",Topic exchange 很是強大而且能夠轉化爲其餘的exchange:
若是binding_key是"#",它會接收全部的Message,無論routing_key是什麼,就像是Fanout exchange。
若是"*"和"#"都沒有被使用,那麼topic exchange就變成了Direct exchange。
Headers exchange(頭交換機):有時消息的路由操做會涉及到多個屬性,此時使用消息頭就比用路由鍵更容易表達,頭交換機(headers exchange)就是爲此而生的。頭交換機使用多個消息屬性來代替路由鍵創建路由規則。經過判斷消息頭的值可否與指定的綁定相匹配來確立路由規則。
建立2個項目,1個發送端Send,1個接收端Receive,經過Nuget安裝RabbitMQ.Client。
發送端代碼:
var message ="Hello Word "; var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); for (int i = 0; i < 10; i++) { var body = Encoding.UTF8.GetBytes(message + i); channel.BasicPublish(exchange: "",//使用默認交換機 routingKey: "hello", basicProperties: null, //new BasicProperties() { DeliveryMode = 2 }, body: body); } }
接收端代碼:
var factory = new ConnectionFactory() { HostName = "localhost" }; var connection = factory.CreateConnection(); var channel = connection.CreateModel(); channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);//限制流量,向消費者一條條發送消息 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var directory = AppContext.BaseDirectory + "/Log"; if (!Directory.Exists(directory)) { Directory.CreateDirectory(directory); } using (FileStream fs = System.IO.File.Open(directory + $"/{DateTime.Now:yyyyMMdd}{ea.ConsumerTag}.txt", FileMode.Append, FileAccess.Write, FileShare.ReadWrite)) { var sr = new StreamWriter(fs); sr.WriteLine($"[{DateTime.Now:yyyyMMddHHmmssfff}]"); sr.WriteLine(ea.ConsumerTag + ":" + message); sr.Flush(); sr.Close(); Thread.Sleep(100);//可複製一份代碼,將等待的時間設置成200,測試多消費者 } channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);//手動消息確認 }; channel.BasicConsume(queue: "hello", autoAck: false,//消息確認設置成手動 consumer: consumer);
更多的內容能夠看如下文章:
https://blog.csdn.net/anzhsoft2008/column/info/rabbitmq
http://rabbitmq.mr-ping.com/AMQP/AMQP_0-9-1_Model_Explained.html