C#使用RabbitMq隊列(Sample,Work,Fanout,Direct等模式的簡單使用)

1:RabbitMQ是個啥?(專業術語參考自網絡)docker

     RabbitMQ是實現了高級消息隊列協議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。數據庫

  RabbitMQ服務器是用Erlang語言編寫的,Erlang是專門爲高併發而生的語言,而集羣和故障轉移是構建在開發電信平臺框架上的。全部主要的編程語言均有與代理接口通信的客戶端庫
編程

2:使用RabbitMQ有啥好處?後端

RabbitMQ是使用Erlang語言開發的開源消息隊列系統,基於AMQP協議來實現。
AMQP的主要特徵是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性、安全。
AMQP協議更多用在企業系統內,對數據一致性、穩定性和可靠性要求很高的場景,對性能和吞吐量的要求還在其次。
RabbitMQ的可靠性是很是好的,數據可以保證百分之百的不丟失。能夠使用鏡像隊列,它的穩定性很是好。因此說在咱們互聯網的金融行業。
安全

對數據的穩定性和可靠性要求都很是高的狀況下,咱們都會選擇RabbitMQ。固然沒有kafka性能好,可是要比AvtiveMQ性能要好不少。也能夠本身作一些性能的優化。服務器

RabbitMQ能夠構建異地雙活架構,包括每個節點存儲方式能夠採用磁盤或者內存的方式,網絡

3:RabbitMq的安裝以及環境搭建等:架構

   網絡上有不少關於怎麼搭建配置RabbitMq服務環境的詳細文章,也比較簡單,這裏再也不說明,本人是Docker上面的pull RabbitMq 鏡像來安裝的!併發

3.1:運行容器的命令以下:框架

docker run -d --hostname Log --restart=always --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=log_user -e RABBITMQ_DEFAULT_PASS=331QQFEG123 rabbitmq:3-management

4:RabbitMq的使用場景主要有哪些,啥時候用或者不用?

4.1何時使用MQ?

1)數據驅動的任務依賴

2)上游不關心多下游執行結果

3)異步返回執行時間長

4.2何時不使用MQ?

須要實時關注執行結果 (eg:同步調用)

5:具體C#怎麼使用RabbitMq?下面直接上code和測試截圖了(Demo環境是.NetCore3.1控制檯+Docker上的RabbitMQ容器來進行的)

6:sample模式,就是簡單地隊列模式,一進一出的效果差很少,測試截圖:

 

Code:

 1 //簡單生產端 ui調用者
 2 
 3 using System;
 4 namespace RabbitMqPublishDemo
 5 {
 6     using MyRabbitMqService;
 7     using System.Runtime.CompilerServices;
 8 
 9     class Program
10     {
11         static void Main(string[] args)
12         {
13                 //就是簡單的隊列,生產者
14                 Console.WriteLine("====RabbitMqPublishDemo====");
15                 for (int i = 0; i < 500; i++)
16                 {
17                     ZrfRabbitMqHelper.PublishSampleMsg("smapleMsg", $"nihaifengge:{i}");
18                 }
19                 Console.WriteLine("生成完畢!");
20                 Console.ReadLine();
21         }
22     }
23 }
24 
25 /// <summary>
26 /// 簡單生產者 邏輯
27 /// </summary>
28 /// <param name="queueName"></param>
29 /// <param name="msg"></param>
30 public static void PublishSampleMsg(string queueName, string msg)
31 {
32 
33     using (IConnection conn = connectionFactory.CreateConnection())
34     {
35         using (IModel channel = conn.CreateModel())
36         {
37             channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
38             var msgBody = Encoding.UTF8.GetBytes(msg);
39             channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: msgBody);
40         }
41     }
42 }
43         
44 
45 //簡單消費端
46 using System;
47 
48 namespace RabbitMqConsumerDemo
49 {
50     using MyRabbitMqService;
51     using System.Runtime.InteropServices;
52 
53     class Program
54     {
55         static void Main(string[] args)
56         {
57             Console.WriteLine("====RabbitMqConsumerDemo====");
58             ZrfRabbitMqHelper.ConsumeSampleMsg("smapleMsg", isBasicNack: true, handleMsgStr: handleMsgStr =>
59             {
60                 Console.WriteLine($"訂閱到消息:{DateTime.Now}:{handleMsgStr}");
61             });
62             Console.ReadLine();
63         }
64     }
65 }
66 
67      #region 簡單生產者後端邏輯
68         /// <summary>
69         /// 簡單消費者
70         /// </summary>
71         /// <param name="queueName">隊列名稱</param>
72         /// <param name="isBasicNack">失敗後是否自動放到隊列</param>
73         /// <param name="handleMsgStr">有就本身對字符串的處理,若是要存儲到數據庫請自行擴展</param>
74         public static void ConsumeSampleMsg(string queueName, bool isBasicNack = false, Action<string> handleMsgStr = null)// bool ifBasicReject = false, 
75         {
76             Console.WriteLine("ConsumeSampleMsg Waiting for messages....");
77             IConnection conn = connectionFactory.CreateConnection();
78             IModel channel = conn.CreateModel();
79             channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
80             var consumer = new EventingBasicConsumer(channel);
81             consumer.Received += (sender, ea) =>
82             {
83                 byte[] bymsg = ea.Body.ToArray();
84                 string msg = Encoding.UTF8.GetString(bymsg);
85                 if (handleMsgStr != null)
86                 {
87                     handleMsgStr.Invoke(msg);
88                 }
89                 else
90                 {
91                     Console.WriteLine($"{DateTime.Now}->收到消息:{msg}");
92                 }
93             };
94             channel.BasicConsume(queueName, autoAck: true, consumer);
95         }
96         #endregion
97         
98         

7:Work模式

 1 //就以下的code, 屢次生產,3個消費者均可以自動開始消費
 2 
 3 //生產者
 4 using System;
 5 namespace RabbitMqPublishDemo
 6 {
 7     using MyRabbitMqService;
 8     using System.Runtime.CompilerServices;
 9     class Program
10     {
11         static void Main(string[] args)
12         {
13             for (int i = 0; i < 500; i++)
14             {
15                 ZrfRabbitMqHelper.PublishWorkQueueModel("workqueue", $" :發佈消息成功{i}");
16             }
17             Console.WriteLine("工做隊列模式 生成完畢......!");          
18             Console.ReadLine();
19         }
20     }
21 }
22 
23 //生產者後端邏輯
24 public static void PublishWorkQueueModel(string queueName, string msg)
25         {
26             using (var connection = connectionFactory.CreateConnection())
27             using (var channel = connection.CreateModel())
28             {
29                 channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
30                 var body = Encoding.UTF8.GetBytes(msg);
31                 var properties = channel.CreateBasicProperties();
32                 properties.Persistent = true;
33 
34                 channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: properties, body: body);
35                 Console.WriteLine($"{DateTime.Now},SentMsg: {msg}");
36             }
37         }
38 
39 //work消費端
40 using System;
41 
42 namespace RabbitMqConsumerDemo
43 {
44     using MyRabbitMqService;
45     using System.Runtime.InteropServices;
46     class Program
47     {
48         static void Main(string[] args)
49         {
50             Console.WriteLine("====Work模式開啓了====");
51             ZrfRabbitMqHelper.ConsumeWorkQueueModel("workqueue", handserMsg: msg =>
52             {
53                 Console.WriteLine($"work模式獲取到消息{msg}");
54             });
55             Console.ReadLine();
56         }
57     }
58 }
59 
60 //work後端邏輯
61        public static void ConsumeWorkQueueModel(string queueName, int sleepHmao = 90, Action<string> handserMsg = null)
62         {
63             var connection = connectionFactory.CreateConnection();
64             var channel = connection.CreateModel();
65 
66             channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
67             channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
68 
69             var consumer = new EventingBasicConsumer(channel);
70             Console.WriteLine(" ConsumeWorkQueueModel Waiting for messages....");
71 
72             consumer.Received += (sender, ea) =>
73             {
74                 var body = ea.Body.ToArray();
75                 var message = Encoding.UTF8.GetString(body);
76                 if (handserMsg != null)
77                 {
78                     if (!string.IsNullOrEmpty(message))
79                     {
80                         handserMsg.Invoke(message);
81                     }
82                 }
83                 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
84             };
85             channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
86         }

8:Fanout

 

 

 Code:

 1 //同一個消息會被多個訂閱者消費
 2 
 3 //發佈者
 4 using System;
 5 
 6 namespace RabbitMqPublishDemo
 7 {
 8     using MyRabbitMqService;
 9     using System.Runtime.CompilerServices;
10 
11     class Program
12     {
13         static void Main(string[] args)
14         {
15 
16             #region 發佈訂閱模式,帶上了exchange
17             for (int i = 0; i < 500; i++)
18             {
19                 ZrfRabbitMqHelper.PublishExchangeModel("exchangemodel", $"發佈的消息是:{i}");
20             }
21             Console.WriteLine("發佈ok!");
22             #endregion
23             Console.ReadLine();
24         }
25     }
26 }
27 //發佈者的後端邏輯 我在這裏選擇了扇形: ExchangeType.Fanout
28    public static void PublishExchangeModel(string exchangeName, string message)
29         {
30             using (var connection = connectionFactory.CreateConnection())
31             using (var channel = connection.CreateModel())
32             {
33                 channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);
34                 var body = Encoding.UTF8.GetBytes(message);
35                 channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: null, body: body);
36                 Console.WriteLine($" Sent {message}");
37             }
38         }
39 
40 
41 //訂閱者
42 using System;
43 namespace RabbitMqConsumerDemo
44 {
45     using MyRabbitMqService;
46     using System.Runtime.InteropServices;
47     class Program
48     {
49         static void Main(string[] args)
50         {
51 
52             #region 發佈訂閱模式 Exchange
53             ZrfRabbitMqHelper.SubscriberExchangeModel("exchangemodel", msg =>
54             {
55                 Console.WriteLine($"訂閱到消息:{msg}");
56             });
57             #endregion
58             Console.ReadLine();
59         }
60     }
61 }
62 
63 //訂閱者後端的邏輯
64  public static void SubscriberExchangeModel(string exchangeName, Action<string> handlerMsg = null)
65         {
66             var connection = connectionFactory.CreateConnection();
67             var channel = connection.CreateModel();
68 
69             channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);//Fanout 扇形分叉
70 
71             var queueName = channel.QueueDeclare().QueueName;
72             channel.QueueBind(queue: queueName,
73                               exchange: exchangeName,
74                               routingKey: "");
75 
76             Console.WriteLine(" Waiting for msg....");
77 
78             var consumer = new EventingBasicConsumer(channel);
79             consumer.Received += (model, ea) =>
80             {
81                 var body = ea.Body.ToArray();
82                 var message = Encoding.UTF8.GetString(body);
83                 if (handlerMsg != null)
84                 {
85                     if (!string.IsNullOrEmpty(message))
86                     {
87                         handlerMsg.Invoke(message);
88                     }
89                 }
90                 else
91                 {
92                     Console.WriteLine($"訂閱到消息:{message}");
93                 }
94             };
95             channel.BasicConsume(queue: queueName,  autoAck: true, consumer: consumer);
96         }

9:Direct

 

 

 

Code:

  1 //發佈者
  2 using System;
  3 
  4 namespace RabbitMqPublishDemo
  5 {
  6     using MyRabbitMqService;
  7     using System.Runtime.CompilerServices;
  8 
  9     class Program
 10     {
 11         static void Main(string[] args)
 12         {
 13             #region 發佈訂閱 交換機路由模式 Direct
 14             string routerKeyValue = args[0].Split("=")[1];//如 abc.exe --name='qq'
 15             Console.WriteLine("開始發佈中。。。");
 16             for (int i = 0; i < 20; i++)
 17             {
 18                 string msg = $"小明有{i}只寶劍";
 19                 ZrfRabbitMqHelper.ExchangeRoutersByDirectModelPublish(msg, routerKey: routerKeyValue);
 20 
 21                 //下面的爲固定的寫法
 22                 //ZrfRabbitMqHelper.ExchangeRoutersByDirectModelPublish(msg);
 23                 //ZrfRabbitMqHelper.ExchangeRoutersByDirectModelPublish($"你好我好你們好{i}", routerKey:"onlylog");
 24             }
 25             Console.WriteLine("此次發佈完畢。。。");
 26             #endregion
 27             Console.ReadLine();
 28         }
 29     }
 30 }
 31 
 32 //發佈者後端邏輯 發佈訂閱的路由模式 Direct
 33         /// <summary>
 34         /// 發佈 Direct 路由模式 Direct
 35         /// </summary>
 36         /// <param name="message"></param>
 37         /// <param name="exchangeName"></param>
 38         /// <param name="routerKey"></param>
 39         public static void ExchangeRoutersByDirectModelPublish(string message, string exchangeName = "qqai", string routerKey = "insertToStudent")
 40         {
 41             using (IConnection connection = connectionFactory.CreateConnection())
 42             {
 43                 using (IModel channelmodel = connection.CreateModel())
 44                 {
 45                     channelmodel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct);
 46                     byte[] bymsg = Encoding.UTF8.GetBytes(message);
 47                     channelmodel.BasicPublish(exchange: exchangeName, routingKey: routerKey, body: bymsg);
 48 
 49                     // ​byte[] bytemsg = Encoding.UTF8.GetBytes(message);
 50                     //  ​channelmodel.BasicPublish(exchange: exchangeName,routingKey: routerKey,basicProperties: null,body: bytemsg);
 51                 }
 52             }
 53         }
 54         
 55 //訂閱者 Exchange Router路由 Director
 56 using System;
 57 
 58 namespace RabbitMqConsumerDemo
 59 {
 60     using MyRabbitMqService;
 61     using System.Runtime.InteropServices;
 62 
 63     class Program
 64     {
 65         static void Main(string[] args)
 66         {
 67             Console.WriteLine("開始消費中。。!");
 68             if (args.Length > 0)
 69             {
 70                 string routerKeyValue = args[0].Split("=")[1];
 71                 Console.WriteLine($"routerKey=>{routerKeyValue}");
 72                 if (!string.IsNullOrEmpty(routerKeyValue))
 73                     ZrfRabbitMqHelper.ExchangeRoutersByDirectModelConsumer(routerKey: routerKeyValue, handler: msg =>
 74                     {
 75                         Console.WriteLine($"拿到消息:{msg}");
 76                     });
 77                 else
 78                     Console.WriteLine("沒有獲取到routerKey !");
 79             }
 80             //else
 81             //{
 82             //    ZrfRabbitMqHelper.ExchangeRoutersByDirectModelConsumer(handler: msg =>
 83             //    {
 84             //        Console.WriteLine($"拿到消息:{msg}");
 85             //    });
 86             //}
 87             Console.ReadLine();
 88         }
 89     }
 90 }
 91 
 92 //訂閱者 Exchange Router路由 Director 後端邏輯
 93        public static void ExchangeRoutersByDirectModelConsumer(string exchangeName = "qqai", string routerKey = "insertToStudent", Action<string> handler = null)
 94         {
 95             var connection = connectionFactory.CreateConnection();
 96             var channel = connection.CreateModel();
 97             channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct);
 98             var queueName = channel.QueueDeclare().QueueName;
 99             channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routerKey);
100 
101             Console.WriteLine("wating for message...!");
102             var consumer = new EventingBasicConsumer(channel);
103             //(object sender, BasicDeliverEventArgs e)
104             consumer.Received += (sender, e) =>
105             {
106                 var bytedata = e.Body.ToArray();
107                 var getRoutekey = e.RoutingKey;
108                 string msg = Encoding.UTF8.GetString(bytedata);
109                 if (handler != null)
110                     handler.Invoke(msg);
111                 else
112                     Console.WriteLine($"路由{getRoutekey},訂閱到消息{msg}!");
113             };
114             channel.BasicConsume(queue: queueName, autoAck: true, consumer);
115         }
116 

 須要完整的code,能夠留言獲取!

相關文章
相關標籤/搜索