利用Exchange的Direct類型,實現對隊列的過濾,消費者啓動之後,輸入相應的key值,攻取該key值對應的在隊列中的消息 。c#
從一節知道Exchange有四種類型this
Direct,Topic,headers,fanout
前面咱們說了fanout類型,能夠把消息發送給全部的消費者,3d
在用Fanout類型的時候,咱們綁定的時候是沒有指定Routing key的【空值】code
channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);
此次咱們說一下Direct類型blog
Exchange的Direct類型將與隊列中的routing key進行精確的匹配。
static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "directType1", type: "direct"); for (var i = 0; i < 5; i++) { string message = "Hello World!this rk1 message " + i; var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.Persistent = true; channel.BasicPublish(exchange: "directType1", routingKey: "rk1", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0},id={1}", message,i); Thread.Sleep(1000); } for (var i = 0; i < 5; i++) { string message = "Hello World!this rk2 message " + i; var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.Persistent = true; channel.BasicPublish(exchange: "directType1", routingKey: "rk2", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0},id={1}", message, i); Thread.Sleep(1000); } } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); }
static void Main(string[] args) { bool flag = true; string level = ""; while (flag) { Console.WriteLine("請選擇要查看的消息類型"); level = Console.ReadLine(); if (level == "rk1" || level == "rk2" ) flag = false; else Console.Write("僅支持rk1與rk2"); } var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "directType1", type: "direct"); var queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queue: queueName, exchange: "directType1", routingKey: level); //如下是區別生產者的 var consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, e) => { var body = e.Body; var message = Encoding.UTF8.GetString(body); var rk = e.RoutingKey; Console.WriteLine("Received {0},routingKey:{1}", message, rk); Thread.Sleep(3000);//模擬耗時任務 , Console.WriteLine("Received over"); channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); Console.WriteLine(""); Console.ReadLine(); } }
咱們看到生產者分別生產了五條rk1和五條rk2的消息隊列
消費者1輸入只查看rk1的消息,成功得到了rk1的消息ip
一樣的string
消費者2輸入只查看rk2的消息,成功得到了rk2的消息it
要注意的是先把先消費者啓動起來io