RabbitMQ入門學習系列(五) Exchange的Direct類型

快速閱讀

利用Exchange的Direct類型,實現對隊列的過濾,消費者啓動之後,輸入相應的key值,攻取該key值對應的在隊列中的消息 。c#

從一節知道Exchange有四種類型this

Direct,Topic,headers,fanout

前面咱們說了fanout類型,能夠把消息發送給全部的消費者,3d

在用Fanout類型的時候,咱們綁定的時候是沒有指定Routing key的【空值】code

channel.BasicPublish(exchange: "logs",
                                         routingKey: "",
                                         basicProperties: null,
                                         body: body);

此次咱們說一下Direct類型blog

Exchange的Direct類型將與隊列中的routing key進行精確的匹配。

生產者代碼

  1. 建立鏈接和信道
  2. 聲明交換器名字和指定類型爲direct
  3. 發送routingkey=rk1 和rk2的消息各五次
static void Main(string[] args)
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
    {
        channel.ExchangeDeclare(exchange: "directType1", type: "direct");
        for (var i = 0; i < 5; i++)
        {
            string message = "Hello World!this rk1 message " + i;
            var body = Encoding.UTF8.GetBytes(message);
            var properties = channel.CreateBasicProperties();
            properties.Persistent = true;

            channel.BasicPublish(exchange: "directType1",
                                 routingKey: "rk1",
                                 basicProperties: null,
                                 body: body);

            Console.WriteLine(" [x] Sent {0},id={1}", message,i);
            Thread.Sleep(1000);
        }

        for (var i = 0; i < 5; i++)
        {
            string message = "Hello World!this rk2 message " + i;
            var body = Encoding.UTF8.GetBytes(message);
            var properties = channel.CreateBasicProperties();
            properties.Persistent = true;

            channel.BasicPublish(exchange: "directType1",
                                 routingKey: "rk2",
                                 basicProperties: null,
                                 body: body);

            Console.WriteLine(" [x] Sent {0},id={1}", message, i);
            Thread.Sleep(1000);
        }

    }

    Console.WriteLine(" Press [enter] to exit.");
    Console.ReadLine();
}

消費者代碼

  1. 輸入要查看的消息類型,支持rk1 和rk2
  2. 建立鏈接和信道
  3. 聲明交換器名字和指定類型爲direct
  4. 指定隊列名稱,而且把routingkey的值賦值給控制檯手動須要輸入的rk1或者rk2
  5. 接收消息並回饋,和fanout類型同樣的代碼了。
static void Main(string[] args)
{
    bool flag = true;
    string level = "";
    while (flag)
    {
        Console.WriteLine("請選擇要查看的消息類型");
        level = Console.ReadLine();
        if (level == "rk1" || level == "rk2" )
            flag = false;
        else
            Console.Write("僅支持rk1與rk2");
    }

    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "directType1", type: "direct");
            var queueName = channel.QueueDeclare().QueueName;
            channel.QueueBind(queue: queueName, exchange: "directType1", routingKey: level);
            //如下是區別生產者的
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (sender, e) =>
            {
                var body = e.Body;
                var message = Encoding.UTF8.GetString(body);
                var rk = e.RoutingKey;
                Console.WriteLine("Received {0},routingKey:{1}", message, rk);
                Thread.Sleep(3000);//模擬耗時任務 ,
                Console.WriteLine("Received over");
                channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false);
            };
            channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
            Console.WriteLine("");
            Console.ReadLine();
        }

    }

查看結果

咱們看到生產者分別生產了五條rk1和五條rk2的消息隊列

消費者1輸入只查看rk1的消息,成功得到了rk1的消息ip

一樣的string

消費者2輸入只查看rk2的消息,成功得到了rk2的消息it

要注意的是先把先消費者啓動起來io

相關文章
相關標籤/搜索