RabbitMQ入門學習系列(四) 發佈訂閱模式

發佈訂閱模式c#

什麼時發佈訂閱模式

把消息發送給多個訂閱者。也就是有多個消費端都完整的接收生產者的消息測試

換句話說 把消息廣播給多個消費者this

消息模型的核心

RabbitMQ不發送消息給隊列,生產者也不知道消息發送到隊列code

生產者只發送消息到exchange 交換器,blog

exchange一方面從生產者接收消息,另外一方面把消息推送到隊列中。隊列

exchange必須知道如何處理接收到的消息 。是加到特定隊列中,仍是添加到多個隊列中,仍是放棄。這個是由他的類型來決定 。ip

而消息的類型有四種,分別是string

Direct,Topic,headers,fanout

fanout消息類型

fanout消息類型能夠把消息廣播到全部隊列中。it

指定exchange的名稱爲logs,routingkey設置爲空。io

channel.BasicPublish(exchange: "logs",
                                         routingKey: "",
                                         basicProperties: null,
                                         body: body);

爲隊列的名稱指定一個隨機數

var queueName = channel.QueueDeclare().QueueName;
channel.QueueDeclare(queue: queueName,
                     durable: true,
                     exclusive: false,
                     autoDelete: false,
                     arguments: null);

通過對以前代碼的改造

咱們定義了一個能夠廣播類型的exchange和一個隨機名字的隊列 ,

如今咱們須要把他們綁定起來。

channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");

生產者的代碼

  1. 建立鏈接
  2. 建立信道
  3. 聲明類型爲fanout的消息
  4. publish發送消息
static void Main(string[] args)
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
        channel.ExchangeDeclare(exchange: "logs", type: "fanout");
        for (var i = 0; i < 10; i++)
        {
            string message = "Hello World!this is message "+i;
            var body = Encoding.UTF8.GetBytes(message);
            var properties = channel.CreateBasicProperties();
            properties.Persistent = true;

            channel.BasicPublish(exchange: "logs",
                                 routingKey: "",
                                 basicProperties: null,
                                 body: body);

            Console.WriteLine(" [x] Sent {0},id={1}", message,i);
            Thread.Sleep(1000);
        }
    }

    Console.WriteLine(" Press [enter] to exit.");
    Console.ReadLine();
}

消費者代碼

  1. 建立鏈接
  2. 建立信道
  3. 聲明類型爲fanout的消息
  4. 聲明一個隊列
  5. 把隊列而後綁定到信道上
  6. 接收消息
static void Main(string[] args)
{
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    {
        using (var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "logs", type: "fanout");
            var queueName = channel.QueueDeclare().QueueName;
            channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");
            //如下是區別生產者的
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (sender, e) =>
            {
                var body = e.Body;
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine("Received {0}", message);
                Thread.Sleep(3000);//模擬耗時任務 ,
                Console.WriteLine("Received over");
                channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false);
            };
            channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
            Console.WriteLine("");
            Console.ReadLine();
        }

    }

}

測試結果

啓了一個生產者,兩個消費者,生產者發送10條消息 ,兩個消費者都收到了10條消息

友情提示 我對個人文章負責,發現好多網上的文章 沒有實踐,都發出來的,讓人走不少彎路,若是你在個人文章中遇到沒法實現,或者沒法走通的問題。能夠直接在公衆號《愛碼農愛生活 》留言。一定會再次複查緣由。讓每一篇 文章的流程都能順利實現。

相關文章
相關標籤/搜索