使用C#自帶的隊列,通常會把進行隊列監聽的代碼放於Global.asax之類的文件或寄宿windows服務之中,與應用服務器在同一臺,會搶佔服務器資源。後面會介紹使用其餘分佈式隊列。html
來一個簡單的示例:redis
隊列幫助類c#
namespace queue { public sealed class QueueHelper { private static QueueHelper queue; Queue<People> queueobj = new Queue<People>(); public static QueueHelper instance { get { if (queue==null) { queue = new QueueHelper(); } return queue; } } /// <summary> /// 向隊列中添加數據 /// </summary> /// <param name="Id"></param> /// <param name="title"></param> /// <param name="content"></param> public void AddQueue(int id,int age, string name) { queueobj.Enqueue(new People() { ID=id, Age = age, Name=name }); } /// <summary> /// 當前隊列數據量 /// </summary> /// <returns></returns> public int Count() { return queueobj.Count(); } //啓動一個線程去監聽隊列 public void StartQueue() { Task t = Task.Run(() => { ScanQueue(); }); } private void ScanQueue() { while (true) { if (queueobj.Count > 0) { try { //從隊列中取出 People queueinfo = queueobj.Dequeue(); Console.WriteLine($"取得隊列:ID={queueinfo.ID},name={queueinfo.Name},age={queueinfo.Age}"); } catch (Exception ex) { throw; } } else { Console.WriteLine("沒有數據,先休眠5秒在掃描"); Thread.Sleep(5000); } } } } }
Main方法windows
static void Main(string[] args) { Console.WriteLine("當前隊列數:"+ QueueHelper.instance.Count()); Console.WriteLine("開啓線程掃描"); QueueHelper.instance.StartQueue(); Thread.Sleep(10000); QueueHelper.instance.AddQueue(1,25,"小王"); QueueHelper.instance.AddQueue(2, 28, "小明"); QueueHelper.instance.AddQueue(3, 99, "大爺"); Console.ReadLine(); }
運行結果服務器
對可靠性和穩定性要求不高的應用場景,能夠使用redis簡單方便的實現分佈式
關於redis隊列的實現方式有兩種:ide
一、生產者消費者模式(List)。函數
二、發佈者訂閱者模式(pub/sub)。性能
驅動:StackExchange.Redis fetch
如下爲第一種方式示例
RedisHelper幫助類
public class RedisHelper { // redis實例 private static RedisHelper instance = null; private IDatabase db; private ConnectionMultiplexer redis; private IServer redisServer; private readonly string _enqueueName = "PersonObj"; /// <summary> /// 靜態單例方法 /// </summary> /// <returns></returns> public static RedisHelper Get() { if (instance == null) { instance = new RedisHelper(); } return instance; } /// <summary> /// 無參數構造函數 /// </summary> private RedisHelper() { var redisConnection = "127.0.0.1:6379"; redis = ConnectionMultiplexer.Connect(redisConnection); redisServer = redis.GetServer(redisConnection); db = redis.GetDatabase(); } /// <summary> /// 入隊 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="enqueueName">隊列名稱</param> /// <param name="value"></param> public void EnqueueItem<T>( T value) { //序列化 var valueString = JsonConvert.SerializeObject(value); db.ListLeftPushAsync(_enqueueName, valueString); } /// <summary> /// 出隊 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="enqueueName">隊列名稱</param> /// <returns></returns> public T DequeueItem<T>() { var valueString = db.ListRightPopAsync(_enqueueName).Result;
//反序列化 T obj = JsonConvert.DeserializeObject<T>(valueString); return obj; } /// <summary> /// 當前隊列數據量 /// </summary> /// <param name="enqueueName"></param> /// <returns></returns> public long Count() { return db.ListLengthAsync(_enqueueName).Result; } //啓動一個線程去監聽隊列 public void StartQueue() { Task t = Task.Run(() => { ScanQueue(); }); } private void ScanQueue() { while (true) { if (this.Count() > 0) { try { //從隊列中取出 Person queueinfo = DequeueItem<Person>(); Console.WriteLine($"取得隊列:name={queueinfo.Name},age={queueinfo.Age}"); } catch (Exception ex) { throw; } } else { Console.WriteLine("沒有數據,先休眠5秒在掃描"); Thread.Sleep(5000); } } } }
Main方法:
static void Main(string[] args) { Console.WriteLine($"當前隊列數{RedisHelper.Get().Count()}"); Console.WriteLine("開啓線程掃描"); RedisHelper.Get().StartQueue(); Thread.Sleep(10000); RedisHelper.Get().EnqueueItem<Person>(new Person { Name="小王",Age=20}); RedisHelper.Get().EnqueueItem<Person>(new Person { Name = "小明", Age = 20 }); Console.WriteLine($"當前隊列數{RedisHelper.Get().Count()}"); Console.ReadLine(); }
運行結果
RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java等,且支持AJAX。用於在分佈式系統中存儲轉發消息,具體特色包括易用性、擴展性、高可用性、消息集羣等
關於RabbitMQ的教程能夠參考:https://www.cnblogs.com/julyluo/p/6262553.html
安裝教程能夠參考:https://www.cnblogs.com/ericli-ericli/p/5902270.html
RabbitMq默認的監聽端口是15672
public class RabbitmqHelper { private static RabbitmqHelper instance; private readonly ConnectionFactory rabbitMqFactory; //交換器名稱 const string ExchangeName = "myExchange"; //當前消息隊列名稱 const string QueueName = "myFirstQueue"; public RabbitmqHelper() { //沒有設置帳號密碼端口或服務端沒有先開啓權限會報:None of the specified endpoints were reachable rabbitMqFactory = new ConnectionFactory { HostName = "192.168.0.112" }; rabbitMqFactory.Port = 5672; rabbitMqFactory.UserName = "123456"; rabbitMqFactory.Password = "123456"; } /// <summary> /// 靜態單例方法 /// </summary> /// <returns></returns> public static RabbitmqHelper Get() { if (instance == null) { instance = new RabbitmqHelper(); } return instance; } /// <summary> /// 生產者-發佈消息 /// </summary> /// <param name="msg">消息</param> public void Enqueue(string msg) { using (IConnection conn = rabbitMqFactory.CreateConnection()) { using (IModel channel = conn.CreateModel()) { //定義交換器 channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null); //持久化一個隊列,若是名稱相同不會重複建立 channel.QueueDeclare(QueueName, true, false, false, null); //定義exchange到queue的binding channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName); byte[] bytes = Encoding.UTF8.GetBytes(msg); //設置消息持久化 IBasicProperties properties = channel.CreateBasicProperties(); properties.Persistent = true; channel.BasicPublish("", QueueName, properties, bytes); } } } /// <summary> /// 消費者-接收消息-基於訂閱模式 /// </summary> /// <param name="msg">消息</param> public void Dequeue() { IConnection conn = rabbitMqFactory.CreateConnection(); IModel channel = conn.CreateModel(); //持久化一個隊列,若是名稱相同不會重複建立 channel.QueueDeclare(QueueName, durable: true, autoDelete: false, exclusive: false, arguments: null); //告訴broker同一時間只處理一個消息 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); //因處理性能問題,已被官方棄用 //var consumer2 = new QueueingBasicConsumer(channel) //var msgResponse = consumer.Queue.Dequeue(); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { try { var msgBody = Encoding.UTF8.GetString(ea.Body); Console.WriteLine($"收到消息:{msgBody}"); //處理完成,服務端能夠刪除消息了同時分配新的消息 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } catch (Exception e) { Console.WriteLine($"出現錯誤:{e.Message}"); } }; //noAck設置false,發送消息以後,消息不要主動刪除,先等消費者處理完 channel.BasicConsume(QueueName, false, consumer); } /// <summary> /// 消費者-接收消息-主動拉取 /// </summary> /// <param name="msg">消息</param> public void Dequeue2() { using (IConnection conn = rabbitMqFactory.CreateConnection()) { using (IModel channel = conn.CreateModel()) { while (true) { BasicGetResult res = channel.BasicGet(QueueName, false/*noAck*/); if (res != null) { var msg = System.Text.UTF8Encoding.UTF8.GetString(res.Body); Console.WriteLine($"獲取到消息:{msg}"); channel.BasicAck(res.DeliveryTag, false); } } } } } }