RabbitMQ就是一個消息代理(message broker),能夠用來接收和發送消息。html
消息隊列有一些黑話,咱們來看下:python
注意,producer 和 consumer 和 queue 能夠在同一臺主機,也能夠不在同一臺主機。一般不在api
如圖,P表示producer , C 表示consumer . 中間的盒子表示queue異步
發送message的producer函數
Send.csspa
using System;using RabbitMQ.Client;using System.Text; class Send { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) //創建鏈接,若是咱們想鏈接到一個不一樣機器的broker,咱們能夠指定一個名字或者ip using(var channel = connection.CreateModel()) //創建通道,大多數api獲取數據都在這裏 { channel.QueueDeclare(queue: "hello", //聲明一個queue(用來把message發送過去) durable: false, exclusive: false, autoDelete: false, arguments: null); string message = "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", //實際發送的動做 routingKey: "hello", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } }
若是你第一次使用RabbitMQ,而且發送message失敗。那麼有多是the broker(代理:指RabbitMQ 或者說queue)啓動的時候沒有足夠的硬盤空間(默認須要至少50M)所以拒絕接受請求。代理
你能夠檢查the broker 的日誌文件來確認而且減小限制。詳細 configuration file documentation 日誌
對於consumer ,它用來從RabbitMQ監聽message.因此,它會持續監聽message.code
Receive.cshtm
using RabbitMQ.Client;using RabbitMQ.Client.Events;using System;using System.Text; class Receive { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) //創建鏈接 using(var channel = connection.CreateModel()) //創建通道 { channel.QueueDeclare(queue: "hello", //聲明queue durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => //回調函數 { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); }; channel.BasicConsume(queue: "hello", //接受消息動做 autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
注意,咱們在這裏也聲明瞭queue.由於咱們可能在啓動publisher以前先啓動consumer,因此咱們在consume messages以前須要確保queue存在。
咱們在接收queue中的message時是異步的,因此咱們提供了一個回調函數。即 EventingBasicConsumer.Received
代碼結構以下
本示例直接在一個解決方案中創建了兩個控制檯程序,用來send和receive 隊列queue中的message.
代碼使用上面講述的代碼,
效果以下:
上面的代碼直接在程序啓動後就發送了message.使看起來不是很直觀,下面是對send.cs修改後的代碼。
send.cs
public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); while (true) { var message = Console.ReadLine(); //string message = "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0}", message); //Console.WriteLine(" Press [exit] to exit."); } } //Console.WriteLine(" Press [enter] to exit."); //Console.ReadLine(); }
如上代碼,使用while循環稍做修改,是能夠持續手動輸入message
效果以下
如上,能夠看出,當想要持續發送時,咱們在發送端加了while循環。
可是,接收端卻不須要作任何修改,由於接收端自己是在持續監聽queue裏的message。
參考網址:RabbitMQ