在以前異常處理部分就已經寫了,對於consumer的異常退出致使消息丟失,能夠時候consumer的消息確認機制。重複的就不說了,這裏說一些不同的。性能
當一個消費者收到一個快遞,可是這個包裹是破損的,這時候通常會有如下選擇測試
拒收快遞,讓快遞員把快遞寄回。 (若是有多個consumer可能這條消息會到其它的consumer中,若是隻有一個,那麼下次獲取仍是能夠拿到)spa
簽收快遞,而後偷偷的扔了(錢多任性)3d
拒收快遞,聯繫商家再給我補發一個code
下面是具體的方法,BasicReject同時承擔了扔掉消息與退回。區別在第二個參數。BasicNack則是批量進行上面兩個操做,DeliveryTag小於或等於當前消息的都會進行該操做,固然是否批量是由第二個參數來決定的blog
//扔掉消息 channel.BasicReject(result.DeliveryTag, false); //退回消息 channel.BasicReject(result.DeliveryTag, true); //批量退回或刪除,中間的參數 是否批量 true是/false否 (也就是隻一條) channel.BasicNack(result.DeliveryTag, true, true);
BasicRecover方法則是進行補發操做,其中的參數若是爲true是把消息退回到queue可是有可能被其它的consumer接收到,設置爲false是隻補發給當前的consumerrabbitmq
//補發消息 true退回到queue中/false只補發給當前的consumer channel.BasicRecover(true);
消息不僅是在consumer處理的時候出問題,在發佈的時候也可能會出問題。不是說把消息向方法裏一丟就必定會成功的。因此發佈的確認機制也是爲高可靠性保駕護航。事務
rabbitmq爲咱們提供了兩種方式get
確認方式 (confirm)string
事務控制方式 (tx)
可是要知道這些都是耗費性能的,其中事務的性能消耗最大,confirm其次
第一種confirm方式,發佈後等待rabbitmq返回消息發佈狀態
//建立返回一個新的頻道 using (var channel = RabbitMqHelper.GetConnection().CreateModel()) { for (var i = 0; i < 6; i++) { channel.BasicPublish(string.Empty, "testqueue", null, Encoding.UTF8.GetBytes($"這是{i}個消息")); } //等待發布成功並返回發佈狀態 bool isok = channel.WaitForConfirms(); Console.ReadKey(); }
第二種事務控制方式
//建立返回一個新的頻道 using (var channel = RabbitMqHelper.GetConnection().CreateModel()) { try { //鎖往 channel.TxSelect(); for (var i = 0; i < 6; i++) { channel.BasicPublish(string.Empty, "testqueue", null, Encoding.UTF8.GetBytes($"這是{i}個消息")); } //提交 channel.TxCommit(); Console.ReadKey(); } catch (Exception e) { //回退 channel.TxRollback(); } }
下面是針對速度測試圖,發佈一萬條消息。 第一次我使用了事務控制方式。耗時915毫秒
第二次使用了confirm方式,耗時374毫秒
第三次沒有使用消息確認機制,耗時317毫秒