RabbitMQ學習筆記

AMQP(高級消息隊列協議)是一個網絡協議。它支持符合要求的客戶端應用(application)和消息中間件代理(messaging middleware broker)之間進行通訊。RabbitMQ是實現了高級消息隊列協議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。RabbitMQ服務器是用Erlang語言編寫的,因此安裝RabbitMq服務器端前須要先安裝Erlang。如下記錄windows 7環境下的學習筆記。html

1、RabbitMQ下載安裝

Erlang下載地址:https://www.erlang.org/downloads正則表達式

RabbitMQ服務端下載地址:https://www.rabbitmq.com/install-windows.htmlwindows

安裝好Erlang和RabbitMQ以後都須要新建系統變量,計算機-->屬性-->高級系統設置-->環境變量-->系統變量新建服務器

1、新建ErlangRabbitMQ系統變量

 

 

建系統變量時值爲安裝目錄,個人安裝目錄:網絡

Erlang:C:\Program Files\erl10.4app

RabbitMQ:C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.15學習

新建系統變量後,還須要在已有系統變量PATH中增長Erlang和RabbitMQ的配置(注:是追加,PATH原有的值不要刪除),以分號分隔。測試

Erlang:%ERLANG_HOME%\binfetch

RabbitMQ:%RABBITMQ_SERVER%\sbinspa

 

2、啓動RabbitMQ的管理界面

cmd中運行以下命令:

"C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.15\sbin\rabbitmq-plugins.bat" enable rabbitmq_management

默認會生成一個帳號guest,密碼也是guest。

查看已有用戶及用戶的角色命令

rabbitmqctl.bat list_users

 

RabbitMQ安裝後是默認啓動服務的,若要手動啓動、中止服務,執行以下命令:

啓動服務:

net start rabbitmq

中止服務:

net stop rabbitmq

3、登陸RabbitMQ的管理界面

默認地址是本地端口15672,地址:http://127.0.0.1:15672guest帳號登陸。

 

2、Rabbit介紹

消息(message)被髮布者(publisher)發送給交換機(exchange),交換機經常被比喻成郵局或者郵箱。而後交換機將收到的消息根據路由規則分發給綁定的隊列(queue)。最後AMQP代理會將消息投遞給訂閱了此隊列的消費者,或者消費者按照需求自行獲取。

交換機和交換機類型

Default exchange默認交換機):一個由消息代理預先聲明好的沒有名字(名字爲空字符串)的直連交換機(direct exchange)。它有一個特殊的屬性使得它對於簡單應用特別有用處:那就是每一個新建隊列(queue)都會自動綁定到默認交換機上,綁定的路由鍵(routing key)名稱與隊列名稱相同。

Direct exchange(直連交換機)根據消息攜帶的路由鍵(routing key)將消息投遞給對應隊列的。

Fanout exchange(扇型交換機)將消息路由給綁定到它身上的全部隊列,而不理會綁定的路由鍵。若是N個隊列綁定到某個扇型交換機上,當有消息發送給此扇型交換機時,交換機會將消息的拷貝分別發送給這全部的N個隊列。扇型用來交換機處理消息的廣播路由(broadcast routing)。

Topic exchange(主題交換機)經過對消息的路由鍵和隊列到交換機的綁定模式之間的匹配,將消息路由給一個或多個隊列。主題交換機常常用來實現各類分發/訂閱模式及其變種。主題交換機一般用來實現消息的多播路由(multicast routing)。

routing_key格式是以點號「."分割的字符表對於routing_key,有兩個特殊字符(在正則表達式裏叫元字符):

*表明任意一個單詞

#表明0個或者多個單詞

因爲有"*""#"Topic exchange 很是強大而且能夠轉化爲其餘的exchange:

若是binding_key是"#"它會接收全部的Message,無論routing_key是什麼,就像是Fanout exchange

若是"*"和"#"都沒有被使用,那麼topic exchange就變成了Direct exchange

Headers exchange(頭交換機)有時消息的路由操做會涉及到多個屬性,此時使用消息頭就比用路由鍵更容易表達,頭交換機(headers exchange)就是爲此而生的。頭交換機使用多個消息屬性來代替路由鍵創建路由規則。經過判斷消息頭的值可否與指定的綁定相匹配來確立路由規則。

3、Hello Word

建立2個項目,1個發送端Send,1個接收端Receive,經過Nuget安裝RabbitMQ.Client。

發送端代碼:

            var message ="Hello Word ";

            var factory = new ConnectionFactory() { HostName = "localhost" };

            using (var connection = factory.CreateConnection())

            using (var channel = connection.CreateModel())

            {

                channel.QueueDeclare(queue: "hello",

                    durable: false,

                    exclusive: false,

                    autoDelete: false,

                    arguments: null);

                for (int i = 0; i < 10; i++)

                {

                    var body = Encoding.UTF8.GetBytes(message + i);

                    channel.BasicPublish(exchange: "",//使用默認交換機

                        routingKey: "hello",

                        basicProperties: null, //new BasicProperties() { DeliveryMode = 2 },

                        body: body);

                }

            }

接收端代碼:

            var factory = new ConnectionFactory() { HostName = "localhost" };

            var connection = factory.CreateConnection();

            var channel = connection.CreateModel();

            channel.QueueDeclare(queue: "hello",

                durable: false,

                exclusive: false,

                autoDelete: false,

                arguments: null);

            channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);//限制流量,向消費者一條條發送消息

            var consumer = new EventingBasicConsumer(channel);

            consumer.Received += (model, ea) =>

            {

                var body = ea.Body;

                var message = Encoding.UTF8.GetString(body);

                var directory = AppContext.BaseDirectory + "/Log";

                if (!Directory.Exists(directory))

                {

                    Directory.CreateDirectory(directory);

                }

                using (FileStream fs = System.IO.File.Open(directory + $"/{DateTime.Now:yyyyMMdd}{ea.ConsumerTag}.txt", FileMode.Append, FileAccess.Write, FileShare.ReadWrite))

                {

                    var sr = new StreamWriter(fs);

                    sr.WriteLine($"[{DateTime.Now:yyyyMMddHHmmssfff}]");

                    sr.WriteLine(ea.ConsumerTag + ":" + message);

                    sr.Flush();

                    sr.Close();

                    Thread.Sleep(100);//可複製一份代碼,將等待的時間設置成200,測試多消費者

                }

                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);//手動消息確認

            };

            channel.BasicConsume(queue: "hello",

                autoAck: false,//消息確認設置成手動

                consumer: consumer);

 

更多的內容能夠看如下文章:

https://blog.csdn.net/anzhsoft2008/column/info/rabbitmq

http://rabbitmq.mr-ping.com/AMQP/AMQP_0-9-1_Model_Explained.html

相關文章
相關標籤/搜索