RabbitMq應用二

在應用一中,基本的消息隊列使用已經完成了,在實際項目中,必定會出現各類各樣的需求和問題,rabbitmq內置的不少強大機制和功能會幫助咱們解決不少的問題,下面就一個一個的一塊兒學習一下。數組

消息響應機制服務器

應用一的列子,在消費者從指定隊列獲取消息的時候,把通知參數no_ack給設成true了,這樣就不須要給rabbitMq服務發送已經處理完畢的通知,rabbitmq把消息發出去後,就會直接刪除掉,不去管消費者是否處理成功,這樣在實際項目中存在很大的風險,出現代碼的健壯性不好的錯誤。因此必定要把no_ack參數設成false:負載均衡

 channel.BasicConsume("newQueue", false, customer);

在接受邏輯所有處理成功後加上一句代碼,通知rabbitmq,接到通知後纔會刪除學習

 var ea = (BasicDeliverEventArgs)customer.Queue.Dequeue();
                        //將消息二進制轉回字符串
                        var msg = Encoding.UTF8.GetString(ea.Body);
                        //通知隊列,已經處理完成
                        channel.BasicAck(ea.DeliveryTag, false);
                        Console.WriteLine(msg);

 

消息持久化測試

響應保證了消息不會被錯誤刪除,假如rabbitmq掛了,全部消息所有會丟掉,rabbitmq一個普遍使用的機制就是能夠持久化,作持久化要兩步spa

1.隊列持久化code

 //隊列是否持久化
                    bool durable = true;
channel.QueueDeclare("firstQueue",durable,false,false,null);

2.消息持久化,經過設置IBasicProperties.SetPersistent來作對象

 //消息持久化
                    var properties = channel.CreateBasicProperties();
                    properties.SetPersistent(true);
                    properties.DeliveryMode = 2; //消息是持久的,存在並不會受服務器重啓影響  

上面的持久化,大部分時候不會出現問題,可是假如在寫入隊列的時候rabbitmq掛了,仍是不會持久上,這種狀況,咱們就要用到咱們代碼的邏輯來強制進行持久化了。。。。blog

負載均衡分發消息rabbitmq

若是有兩個接收端消費者同時訂閱一個隊列,會出現不固定的分發流程,某個消費者可能會出現過多的消息流入形成壓力,而另外一個空閒的蛋疼。因此,若是能公平的接受消息,處理完一個,接受另外一個,同時保證壓力的均衡。代碼在消費者端設置:

channel.BasicQos(0, 1, false);

上面是幾個rabbitmq比較重要的機制,下面開始是rabbitmq的核心牛逼的東西路由

這裏涉及2個概念:

1.exchange,這是交換機,也叫路由器,在消息生產者發送消息的時候,實際上不是直接發送到queue隊列中,由於他不知道發送到哪一個隊列,他會先發送到路由器中exchange裏,exchange再經過路由匹配把消息發送到匹配的隊列當中。

 

2.routingKey這個是路由的匹配規則,當消息發送到exchange裏後,會根據routingkey來匹配到底發送到哪一個隊列,若是沒匹配到,則消息丟失

 

exchange的四種類型:

 

1.direct:按routingkey的名稱匹配

 

2.fanout:廣播,無需匹配routingkey消息會發送到全部隊列

 

3.topic:這個是貪婪匹配,也是最靈活的匹配方式,有兩種符號#,*.,......*號的意思是

 

 #符號的意思是好比a_#,能夠匹配的隊列能夠是a_a,a_aa,a_aaaaaa,a_a_b.......多詞

 

 *符號的意識是好比a_*,能夠匹配的隊列能夠是a_a,a_b,a_c.......單詞

 

這個是應用一中發送消息給隊列的代碼,

 

 

channel.BasicPublish("", "firstQueue", null, body);

 

 

經過查看這個方法的參數中可看到第一個參數是exchange路由,第二個是routingkey匹配規則,而發送的代碼第一個參數是"",第二個參數是firstQueue,開始覺得是隊列實際並非,緣由是若是用空字符串去申明一個exchange,那麼系統就會使用"amq.direct"這個exchange。咱們在建立一個queue的時候,默認的都會有一個和新建queue同名的routingKey綁定到這個默認的exchange上去,由於在第一個參數選擇了默認的exchange,而咱們申明的隊列叫firstQueue,因此默認的,它在新建一個也叫firstQueue的routingKey,並綁定在默認的exchange上,致使了咱們能夠在第二個參數routingKey中寫firstQueue,這樣它就會找到定義的同名的queue,並把消息放進去。

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

基本概念已經差很少了,我是很擅長排版解釋,往下是各類匹配規則的代碼和運行狀況,直接上代碼:

1.路由類型direct,匹配規則rroutingKey相同,一個生產者,兩個消費者,採用負載均衡方式分發:

生產者

 

 //建立連接工廠,設置目標,用戶,密碼
            var factory = new ConnectionFactory() { 
                HostName = "127.0.0.1",
                UserName = "feiyang",
                Password = "123456",
                AutomaticRecoveryEnabled = true, //自動重連 
                RequestedHeartbeat = UInt16.MaxValue//心跳超時時間
            };
            
            //開啓當前服務設置的用戶的連接
            using (var connection =  factory.CreateConnection())
            {
                //開啓一個頻道
                using (var channel = connection.CreateModel())
                {
                    //建立一個隊列
                    //隊列是否持久化
                    bool durable = true;
                    //已經存在的隊列,不能再定義持久化
                   // channel.QueueDeclare("firstQueue",false,false,false,null);
                    //建立一個新的,持久的交換區  
                    channel.ExchangeDeclare("NewExchange", ExchangeType.Direct, true, false, null);  
                    //持久的隊列, 沒有排他性,與不自動刪除 
                    channel.QueueDeclare("newQueue", durable, false, false, null);
                    // 綁定隊列到交換區  
                    channel.QueueBind("newQueue", "NewExchange", "newRoutingKey"); 
                    //消息持久化
                    var properties = channel.CreateBasicProperties();
                    properties.SetPersistent(true);
                    properties.DeliveryMode = 2; //消息是持久的,存在並不會受服務器重啓影響  
                    byte[] body = null;
                    //消息是以二進制數組的形式傳輸的,因此若是消息是實體對象的話,須要序列化和而後轉化爲二進制數組。
                    for (int i = 0; i < 100; i++)
                    {
                        body = Encoding.UTF8.GetBytes("這是第-----"+i+"-----條消息");
                        //channel.BasicPublish("", "firstQueue", null, body);
                        channel.BasicPublish("NewExchange", "newRoutingKey", properties, body); 
                        Console.Write("成功發送第-----"+i+"-----條消息!");
                    }
                    Console.ReadKey();
                }
            }

 

消費者a:

 

 

static void Main(string[] args)
        {
            var factory = new ConnectionFactory();
            factory.HostName = "127.0.0.1";
            factory.UserName = "feiyang";
            factory.Password = "123456";
            using (var connection  = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    //建立一個新的,持久的交換區  
                    channel.ExchangeDeclare("NewExchange", ExchangeType.Direct, true, false, null);  
                    //仍是鏈接到哪一個隊列
                    channel.QueueDeclare("newQueue",true,false,false,null);
                    // 綁定隊列到交換區  
                    channel.QueueBind("newQueue", "NewExchange", "newRoutingKey"); 
                    //定義消息接受者
                    var customer = new QueueingBasicConsumer(channel);
                    //從指定隊列獲取消息,
                    //中間這個參數實際必須打開,爲false,意思是是否不通知rabbitm已經處理完畢,咱們這裏要設成false,要通知
                    //channel.BasicConsume("firstQueue",true,customer);
                    channel.BasicConsume("newQueue", false, customer);
                    //因爲隊列分發不公平致使一個壓力很大,一個很小,在這設置下,公平q分發,也就是一個消費者處理完通知隊列後,纔會繼續分發一個
                    channel.BasicQos(0, 1, false);
                    //開始不斷循環出隊列的消息
                    while (true)
                    {
                        var ea = (BasicDeliverEventArgs)customer.Queue.Dequeue();
                        //將消息二進制轉回字符串
                        var msg = Encoding.UTF8.GetString(ea.Body);
                        //通知隊列,已經處理完成
                        channel.BasicAck(ea.DeliveryTag, false);
                        Console.WriteLine(msg);
                    }
                    //sw.Stop();
                    //Console.WriteLine("共用時" + sw.ElapsedTicks + "毫秒");
                    //Console.ReadKey();
                }
            }
        }

消費者b:

 var factory = new ConnectionFactory();
            factory.HostName = "127.0.0.1";
            factory.UserName = "feiyang";
            factory.Password = "123456";
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    //建立一個新的,持久的交換區  
                    //channel.ExchangeDeclare("NewExchange", ExchangeType.Direct, true, false, null);
                    //仍是鏈接到哪一個隊列
                    channel.QueueDeclare("newQueue", true, false, false, null);
                    // 綁定隊列到交換區  
                    //channel.QueueBind("newQueue", "NewExchange", "newRoutingKey");
                    //定義消息接受者
                    var customer = new QueueingBasicConsumer(channel);
                    //從指定隊列獲取消息,
                    //中間這個參數實際必須打開,爲false,意思是是否不通知rabbitm已經處理完畢,咱們這裏要設成false,要通知
                    //channel.BasicConsume("firstQueue",true,customer);
                    channel.BasicConsume("newQueue", false, customer);
                    //因爲隊列分發不公平致使一個壓力很大,一個很小,在這設置下,公平q分發,也就是一個消費者處理完通知隊列後,纔會繼續分發一個
                    channel.BasicQos(0, 1, false);
                    //開始不斷循環出隊列的消息
                    while (true)
                    {
                        var ea = (BasicDeliverEventArgs)customer.Queue.Dequeue();
                        //將消息二進制轉回字符串
                        var msg = Encoding.UTF8.GetString(ea.Body);
                        //通知隊列,已經處理完成
                        channel.BasicAck(ea.DeliveryTag, false);
                        Console.WriteLine(msg);
                    }
                    //sw.Stop();
                    //Console.WriteLine("共用時" + sw.ElapsedTicks + "毫秒");
                    //Console.ReadKey();
                }
            }

運行結果

 

2.1個生產者,2個消費者,路由類型direct,匹配規則routingKey相同,匹配不一樣的隊列,一次發送到2個隊列各個消費者取出各自的隊列消息。

生產者,建立一個交換區,建立一個隊列,

 //建立一個隊列
                    //隊列是否持久化
                    bool durable = true;
                    //已經存在的隊列,不能再定義持久化
                   // channel.QueueDeclare("firstQueue",false,false,false,null);
                    //建立一個新的,持久的交換區  
                    channel.ExchangeDeclare("queueExchange", ExchangeType.Direct, true, false, null);  
                    //持久的隊列, 沒有排他性,與不自動刪除 
                    channel.QueueDeclare("queue_a", durable, false, false, null);
                    // 綁定隊列到交換區  
                    channel.QueueBind("queue_a", "queueExchange", "queueRoutingKey"); 
                    //消息持久化
                    var properties = channel.CreateBasicProperties();
                    properties.SetPersistent(true);
                    properties.DeliveryMode = 2; //消息是持久的,存在並不會受服務器重啓影響  
                    byte[] body = null;
                    //消息是以二進制數組的形式傳輸的,因此若是消息是實體對象的話,須要序列化和而後轉化爲二進制數組。
                    for (int i = 0; i < 100; i++)
                    {
                        body = Encoding.UTF8.GetBytes("這是第-----"+i+"-----條消息");
                        //channel.BasicPublish("", "firstQueue", null, body);
                        channel.BasicPublish("queueExchange", "queueRoutingKey", properties, body); 
                        Console.Write("成功發送第-----"+i+"-----條消息!");
                    }

消費者a,建立一個新隊列,綁定到和生產者同一個交換區,讀取剛剛建立的新隊列數據。

 //建立一個新的,持久的交換區  
                    //channel.ExchangeDeclare("NewExchange", ExchangeType.Direct, true, false, null);  
                    //仍是鏈接到哪一個隊列
                    channel.QueueDeclare("queue_a_b", true, false, false, null);
                    // 綁定隊列到交換區  
                    channel.QueueBind("queue_a_b", "queueExchange", "queueRoutingKey"); 
                    //定義消息接受者
                    var customer = new QueueingBasicConsumer(channel);
                    //從指定隊列獲取消息,
                    //中間這個參數實際必須打開,爲false,意思是是否不通知rabbitm已經處理完畢,咱們這裏要設成false,要通知
                    //channel.BasicConsume("firstQueue",true,customer);
                    channel.BasicConsume("queue_a_b", false, customer);
                    //因爲隊列分發不公平致使一個壓力很大,一個很小,在這設置下,公平q分發,也就是一個消費者處理完通知隊列後,纔會繼續分發一個
                    channel.BasicQos(0, 1, false);
                    //開始不斷循環出隊列的消息
                    while (true)
                    {
                        var ea = (BasicDeliverEventArgs)customer.Queue.Dequeue();
                        //將消息二進制轉回字符串
                        var msg = Encoding.UTF8.GetString(ea.Body);
                        //通知隊列,已經處理完成
                        channel.BasicAck(ea.DeliveryTag, false);
                        Console.WriteLine(msg);
                    }

消費b,直接讀取生產者建立的queue_a隊列消息

 //建立一個新的,持久的交換區  
                    //channel.ExchangeDeclare("NewExchange", ExchangeType.Direct, true, false, null);
                    //仍是鏈接到哪一個隊列
                    //channel.QueueDeclare("newQueue", true, false, false, null);
                    // 綁定隊列到交換區  
                    //channel.QueueBind("newQueue", "NewExchange", "newRoutingKey");
                    //定義消息接受者
                    var customer = new QueueingBasicConsumer(channel);
                    //從指定隊列獲取消息,
                    //中間這個參數實際必須打開,爲false,意思是是否不通知rabbitm已經處理完畢,咱們這裏要設成false,要通知
                    //channel.BasicConsume("firstQueue",true,customer);
                    channel.BasicConsume("queue_a", false, customer);
                    //因爲隊列分發不公平致使一個壓力很大,一個很小,在這設置下,公平q分發,也就是一個消費者處理完通知隊列後,纔會繼續分發一個
                    channel.BasicQos(0, 1, false);
                    //開始不斷循環出隊列的消息
                    while (true)
                    {
                        var ea = (BasicDeliverEventArgs)customer.Queue.Dequeue();
                        //將消息二進制轉回字符串
                        var msg = Encoding.UTF8.GetString(ea.Body);
                        //通知隊列,已經處理完成
                        channel.BasicAck(ea.DeliveryTag, false);
                        Console.WriteLine(msg);
                    }

運行結果

能夠看見,經過路由匹配,一次發送消息,發送到匹配到的兩個隊列中,兩個消費者各自讀取各自的隊列。

3.篇幅有限,再來一個路由類型爲Topic的代碼例子。

生產者,因爲已經建立了一個queueexChange類型爲direct的交換區,不能更改類型,因此從新建立一個交換區

  //建立一個隊列
                    //隊列是否持久化
                    bool durable = true;
                    //已經存在的隊列,不能再定義持久化
                   // channel.QueueDeclare("firstQueue",false,false,false,null);
                    //建立一個新的,持久的交換區  
                    channel.ExchangeDeclare("queueTopicExchange", ExchangeType.Topic, true, false, null);  
                    //持久的隊列, 沒有排他性,與不自動刪除 
                    channel.QueueDeclare("queue.a", durable, false, false, null);
                    // 綁定隊列到交換區  
                    channel.QueueBind("queue.a", "queueTopicExchange", "queue.#"); 
                    //消息持久化
                    var properties = channel.CreateBasicProperties();
                    properties.SetPersistent(true);
                    properties.DeliveryMode = 2; //消息是持久的,存在並不會受服務器重啓影響  
                    byte[] body = null;
                    //消息是以二進制數組的形式傳輸的,因此若是消息是實體對象的話,須要序列化和而後轉化爲二進制數組。
                    for (int i = 0; i < 100; i++)
                    {
                        body = Encoding.UTF8.GetBytes("這是第-----"+i+"-----條消息");
                        //channel.BasicPublish("", "firstQueue", null, body);
                        channel.BasicPublish("queueTopicExchange", "queue.#", properties, body); 
                        Console.Write("成功發送第-----"+i+"-----條消息!");
                    }

消費者a:

 //建立一個新的,持久的交換區  
                    //channel.ExchangeDeclare("NewExchange", ExchangeType.Direct, true, false, null);  
                    //仍是鏈接到哪一個隊列
                    channel.QueueDeclare("queue.a.b", true, false, false, null);
                    // 綁定隊列到交換區  
                    channel.QueueBind("queue.a.b", "queueTopicExchange", "queue.#"); 
                    //定義消息接受者
                    var customer = new QueueingBasicConsumer(channel);
                    //從指定隊列獲取消息,
                    //中間這個參數實際必須打開,爲false,意思是是否不通知rabbitm已經處理完畢,咱們這裏要設成false,要通知
                    //channel.BasicConsume("firstQueue",true,customer);
                    channel.BasicConsume("queue.a.b", false, customer);
                    //因爲隊列分發不公平致使一個壓力很大,一個很小,在這設置下,公平q分發,也就是一個消費者處理完通知隊列後,纔會繼續分發一個
                    channel.BasicQos(0, 1, false);
                    //開始不斷循環出隊列的消息
                    while (true)
                    {
                        var ea = (BasicDeliverEventArgs)customer.Queue.Dequeue();
                        //將消息二進制轉回字符串
                        var msg = Encoding.UTF8.GetString(ea.Body);
                        //通知隊列,已經處理完成
                        channel.BasicAck(ea.DeliveryTag, false);
                        Console.WriteLine(msg);
                    }

消費者b:

 //建立一個新的,持久的交換區  
                    //channel.ExchangeDeclare("NewExchange", ExchangeType.Direct, true, false, null);
                    //仍是鏈接到哪一個隊列
                    //channel.QueueDeclare("newQueue", true, false, false, null);
                    // 綁定隊列到交換區  
                    //channel.QueueBind("newQueue", "NewExchange", "newRoutingKey");
                    //定義消息接受者
                    var customer = new QueueingBasicConsumer(channel);
                    //從指定隊列獲取消息,
                    //中間這個參數實際必須打開,爲false,意思是是否不通知rabbitm已經處理完畢,咱們這裏要設成false,要通知
                    //channel.BasicConsume("firstQueue",true,customer);
                    channel.BasicConsume("queue.a", false, customer);
                    //因爲隊列分發不公平致使一個壓力很大,一個很小,在這設置下,公平q分發,也就是一個消費者處理完通知隊列後,纔會繼續分發一個
                    channel.BasicQos(0, 1, false);
                    //開始不斷循環出隊列的消息
                    while (true)
                    {
                        var ea = (BasicDeliverEventArgs)customer.Queue.Dequeue();
                        //將消息二進制轉回字符串
                        var msg = Encoding.UTF8.GetString(ea.Body);
                        //通知隊列,已經處理完成
                        channel.BasicAck(ea.DeliveryTag, false);
                        Console.WriteLine(msg);
                    }

運行結果:

代碼例子就不一一寫出來了,還有不少種狀況,實際項目根據不一樣的需求靈活運用,有興趣的能夠本身搭配測試一下。

相關文章
相關標籤/搜索