net core 使用 rabbitmq

目錄(?)[+]html

 

windows環境安裝:
http://www.javashuo.com/article/p-hwqddnpd-gp.html
.NET Core 使用RabbitMQ
http://www.javashuo.com/article/p-yplqyfft-o.htmlwindows

安裝

"C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.9\sbin\rabbitmq-plugins.bat" enable rabbitmq_managementmarkdown

net stop RabbitMQ && net start RabbitMQpost

建立用戶,密碼,綁定角色網站

查看已有用戶及用戶的角色:
rabbitmqctl.bat list_usersspa

新增一個用戶:
rabbitmqctl.bat add_user username password
示例:
rabbitmqctl.bat add_user tangsansan 1234563d

rabbitmqctl.bat set_user_tags username administratorcode

示例:
rabbitmqctl.bat set_user_tags tangsansan administratorserver

基本用法

引入:RabbitMQ.Clienthtm

消費者

//建立鏈接工廠 var factory = new ConnectionFactory() { UserName = "tangsansan",//用戶名 Password = "123456",//密碼 HostName = "localhost"//rabbitmq ip }; //建立鏈接 var connection = factory.CreateConnection(); //建立通道 var channel = connection.CreateModel(); //定義一個隊列 channel.QueueDeclare("hello", false, false, false, null); Console.WriteLine("\nRabbitMQ鏈接成功,請輸入消息,輸入exit退出!"); string input; do { input = Console.ReadLine(); var sendBytes = Encoding.UTF8.GetBytes(input); //發佈消息 channel.BasicPublish("", "hello", null, sendBytes); } while (input.Trim().ToLower() != "exit"); channel.Close(); connection.Close();

生產者

//建立鏈接工廠 var factory = new ConnectionFactory() { UserName = "tangsansan",//用戶名 Password = "123456",//密碼 HostName = "localhost"//rabbitmq ip }; //建立鏈接 var connection = factory.CreateConnection(); //建立通道 var channel = connection.CreateModel(); //定義一個隊列 channel.QueueDeclare("hello", false, false, false, null); Console.WriteLine("\nRabbitMQ鏈接成功,請輸入消息,輸入exit退出!"); string input; do { input = Console.ReadLine(); var sendBytes = Encoding.UTF8.GetBytes(input); //發佈消息 channel.BasicPublish("", "hello", null, sendBytes); } while (input.Trim().ToLower() != "exit"); channel.Close(); connection.Close();

啓動了一個生產者,兩個消費者,能夠看見兩個消費者都能收到消息,消息投遞到哪一個消費者是由RabbitMQ決定的。

RabbitMQ消費失敗的處理

RabbitMQ採用消息應答機制,即消費者收到一個消息以後,須要發送一個應答,而後RabbitMQ纔會將這個消息從隊列中刪除,若是消費者在消費過程當中出現異常,斷開鏈接切沒有發送應答,那麼RabbitMQ會將這個消息從新投遞

//接收到消息事件 consumer.Received += (ch, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine($"收到消息: {message}"); Console.WriteLine($"收到該消息[{ea.DeliveryTag}] 延遲10s發送回執"); Thread.Sleep(10000); //確認該消息已被消費 channel.BasicAck(ea.DeliveryTag, false); Console.WriteLine($"已發送回執[{ea.DeliveryTag}]"); };

使用RabbitMQ的Exchange

前面咱們能夠看到生產者將消息投遞到Queue中,實際上這在RabbitMQ中這種事情永遠都不會發生。實際的狀況是,生產者將消息發送到Exchange(交換器),由Exchange將消息路由到一個或多個Queue中(或者丟棄)
生產者將消息發送到Exchange(交換器),由Exchange將消息路由到一個或多個Queue中(或者丟棄)

Direct Exchange

string exchangeName = "TestChange"; string queueName = "hello"; string routeKey = "helloRouteKey"; //建立鏈接工廠 var factory = new ConnectionFactory() { UserName = "tangsansan",//用戶名 Password = "123456",//密碼 HostName = "localhost"//rabbitmq ip }; //建立鏈接 var connection = factory.CreateConnection(); //建立通道 var channel = connection.CreateModel(); //定義一個Direct類型交換機 channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null); //定義一個隊列 channel.QueueDeclare(queueName, false, false, false, null); //將隊列綁定到交換機 channel.QueueBind(queueName, exchangeName, routeKey, null); Console.WriteLine($"\nRabbitMQ鏈接成功,Exchange:{exchangeName},Queue:{queueName},Route:{routeKey},\n\n請輸入消息,輸入exit退出!"); string input; do { input = Console.ReadLine(); var sendBytes = Encoding.UTF8.GetBytes(input); //發佈消息 channel.BasicPublish(exchangeName, routeKey, null, sendBytes); } while (input.Trim().ToLower() != "exit"); channel.Close(); connection.Close();

Fanout Exchange


全部發送到Fanout Exchange的消息都會被轉發到與該Exchange 綁定(Binding)的全部Queue上。

Fanout Exchange 不須要處理RouteKey 。只須要簡單的將隊列綁定到exchange 上。這樣發送到exchange的消息都會被轉發到與該交換機綁定的全部隊列上。相似子網廣播,每臺子網內的主機都得到了一份複製的消息。

因此,Fanout Exchange 轉發消息是最快的。

static void Main(string[] args) { string exchangeName = "TestFanoutChange"; string queueName1 = "hello1"; string queueName2 = "hello"; string routeKey = ""; //建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory { UserName = "tangsansan",//用戶名 Password = "123456",//密碼 HostName = "localhost"//rabbitmq ip }; //建立鏈接 var connection = factory.CreateConnection(); //建立通道 var channel = connection.CreateModel(); //定義一個Direct類型交換機 channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null); //定義隊列1 channel.QueueDeclare(queueName1, false, false, false, null); //定義隊列2 channel.QueueDeclare(queueName2, false, false, false, null); //將隊列綁定到交換機 channel.QueueBind(queueName1, exchangeName, routeKey, null); channel.QueueBind(queueName2, exchangeName, routeKey, null); //生成兩個隊列的消費者 ConsumerGenerator(queueName1); ConsumerGenerator(queueName2); Console.WriteLine($"\nRabbitMQ鏈接成功,\n\n請輸入消息,輸入exit退出!"); string input; do { input = Console.ReadLine(); var sendBytes = Encoding.UTF8.GetBytes(input); //發佈消息 channel.BasicPublish(exchangeName, routeKey, null, sendBytes); } while (input.Trim().ToLower() != "exit"); channel.Close(); connection.Close(); } /// <summary> /// 根據隊列名稱生成消費者 /// </summary> /// <param name="queueName"></param> static void ConsumerGenerator(string queueName) { //建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory { UserName = "tangsansan",//用戶名 Password = "123456",//密碼 HostName = "localhost"//rabbitmq ip }; //建立鏈接 var connection = factory.CreateConnection(); //建立通道 var channel = connection.CreateModel(); //事件基本消費者 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //接收到消息事件 consumer.Received += (ch, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine($"Queue:{queueName}收到消息: {message}"); //確認該消息已被消費 channel.BasicAck(ea.DeliveryTag, false); }; //啓動消費者 設置爲手動應答消息 channel.BasicConsume(queueName, false, consumer); Console.WriteLine($"Queue:{queueName},消費者已啓動"); }

Topic Exchange


全部發送到Topic Exchange的消息被轉發到能和Topic匹配的Queue上,

Exchange 將路由進行模糊匹配。可使用通配符進行模糊匹配,符號「#」匹配一個或多個詞,符號「」匹配很少很多一個詞。所以「XiaoChen.#」可以匹配到「XiaoChen.pets.cat」,可是「XiaoChen.」 只會匹配到「XiaoChen.money」。

因此,Topic Exchange 使用很是靈活。

static void Main(string[] args) { string exchangeName = "TestTopicChange"; string queueName = "hello"; string routeKey = "TestRouteKey.*"; //建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory { UserName = "tangsansan",//用戶名 Password = "123456",//密碼 HostName = "localhost"//rabbitmq ip }; //建立鏈接 var connection = factory.CreateConnection(); //建立通道 var channel = connection.CreateModel(); //定義一個Direct類型交換機 channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, false, false, null); //定義隊列1 channel.QueueDeclare(queueName, false, false, false, null); //將隊列綁定到交換機 channel.QueueBind(queueName, exchangeName, routeKey, null); Console.WriteLine($"\nRabbitMQ鏈接成功,\n\n請輸入消息,輸入exit退出!"); string input; do { input = Console.ReadLine(); var sendBytes = Encoding.UTF8.GetBytes(input); //發佈消息 channel.BasicPublish(exchangeName, "TestRouteKey.one", null, sendBytes); } while (input.Trim().ToLower() != "exit"); channel.Close(); connection.Close(); }

問題:

None of the specified endpoints were reachable

這個異常在建立鏈接時拋出(CreateConnection()),緣由通常是ConnectionFactory參數設置不對,好比HostName、UserName、Password
未設置VirtualHost的權限
設置方法:RabbitmqWeb管理網站-->Admin

 
分類:  RabbitMQ
相關文章
相關標籤/搜索