幾種隊列

 c#中自帶的隊列

 使用C#自帶的隊列,通常會把進行隊列監聽的代碼放於Global.asax之類的文件或寄宿windows服務之中,與應用服務器在同一臺,會搶佔服務器資源。後面會介紹使用其餘分佈式隊列。html

來一個簡單的示例:redis

隊列幫助類c#

namespace queue
{
    public sealed class QueueHelper
    {
        private static QueueHelper queue;
        Queue<People> queueobj = new Queue<People>();

        public static QueueHelper instance
        {
            get
            {
                if (queue==null)
                {
                    queue = new QueueHelper();
                }
                return queue;
            }
        }
        /// <summary>
        /// 向隊列中添加數據
        /// </summary>
        /// <param name="Id"></param>
        /// <param name="title"></param>
        /// <param name="content"></param>
        public void AddQueue(int id,int age, string name)
        {
            queueobj.Enqueue(new People()
            {
                ID=id,
                Age = age,
                Name=name
            });
        }
        /// <summary>
        /// 當前隊列數據量
        /// </summary>
        /// <returns></returns>
        public int Count()
        {
            return queueobj.Count();
        }
        //啓動一個線程去監聽隊列
        public void StartQueue()
        {
            Task t = Task.Run(() => {
                ScanQueue();
            });
        }
        private void ScanQueue()
        {
            while (true)
            {
                if (queueobj.Count > 0)
                {
                    try
                    {
                        //從隊列中取出  
                        People queueinfo = queueobj.Dequeue();
                        Console.WriteLine($"取得隊列:ID={queueinfo.ID},name={queueinfo.Name},age={queueinfo.Age}");
                    }
                    catch (Exception ex)
                    {
                        throw;
                    }
                }
                else
                {
                    Console.WriteLine("沒有數據,先休眠5秒在掃描");
                    Thread.Sleep(5000);
                }               
            }          
        }
    }
}

Main方法windows

    static void Main(string[] args)
        {
            Console.WriteLine("當前隊列數:"+ QueueHelper.instance.Count());
            Console.WriteLine("開啓線程掃描");
            QueueHelper.instance.StartQueue();
            Thread.Sleep(10000);
            QueueHelper.instance.AddQueue(1,25,"小王");
            QueueHelper.instance.AddQueue(2, 28, "小明");
            QueueHelper.instance.AddQueue(3, 99, "大爺");
            Console.ReadLine();
        }

運行結果服務器

 redis隊列

 對可靠性和穩定性要求不高的應用場景,能夠使用redis簡單方便的實現分佈式

關於redis隊列的實現方式有兩種:ide

一、生產者消費者模式(List)。函數

二、發佈者訂閱者模式(pub/sub)。性能

驅動:StackExchange.Redis   fetch

如下爲第一種方式示例

RedisHelper幫助類

public class RedisHelper
    {
        // redis實例
        private static RedisHelper instance = null;
        private IDatabase db;
        private ConnectionMultiplexer redis;
        private IServer redisServer;
        private readonly string _enqueueName = "PersonObj";
        /// <summary>
        /// 靜態單例方法
        /// </summary>
        /// <returns></returns>
        public static RedisHelper Get()
        {
            if (instance == null)
            {
                instance = new RedisHelper();
            }
            return instance;
        }
        /// <summary>
        /// 無參數構造函數
        /// </summary>
        private RedisHelper()
        {
            var redisConnection = "127.0.0.1:6379";
            redis = ConnectionMultiplexer.Connect(redisConnection);
            redisServer = redis.GetServer(redisConnection);
            db = redis.GetDatabase();
        }
        /// <summary>
        /// 入隊
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="enqueueName">隊列名稱</param>
        /// <param name="value"></param>
        public void EnqueueItem<T>( T value)
        {
            //序列化
            var valueString = JsonConvert.SerializeObject(value);
            db.ListLeftPushAsync(_enqueueName, valueString);
        }
        /// <summary>
        /// 出隊
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="enqueueName">隊列名稱</param>
        /// <returns></returns>
        public T DequeueItem<T>()
        {
            var valueString = db.ListRightPopAsync(_enqueueName).Result;
       //反序列化 T obj
= JsonConvert.DeserializeObject<T>(valueString); return obj; } /// <summary> /// 當前隊列數據量 /// </summary> /// <param name="enqueueName"></param> /// <returns></returns> public long Count() { return db.ListLengthAsync(_enqueueName).Result; } //啓動一個線程去監聽隊列 public void StartQueue() { Task t = Task.Run(() => { ScanQueue(); }); } private void ScanQueue() { while (true) { if (this.Count() > 0) { try { //從隊列中取出 Person queueinfo = DequeueItem<Person>(); Console.WriteLine($"取得隊列:name={queueinfo.Name},age={queueinfo.Age}"); } catch (Exception ex) { throw; } } else { Console.WriteLine("沒有數據,先休眠5秒在掃描"); Thread.Sleep(5000); } } } }

 Main方法:

      static void Main(string[] args)
        {
            Console.WriteLine($"當前隊列數{RedisHelper.Get().Count()}");
            Console.WriteLine("開啓線程掃描");
            RedisHelper.Get().StartQueue();
            Thread.Sleep(10000);
            RedisHelper.Get().EnqueueItem<Person>(new Person { Name="小王",Age=20});
            RedisHelper.Get().EnqueueItem<Person>(new Person { Name = "小明", Age = 20 });
            Console.WriteLine($"當前隊列數{RedisHelper.Get().Count()}");
            Console.ReadLine();
        }

運行結果

 

RabbitMQ

 RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java等,且支持AJAX。用於在分佈式系統中存儲轉發消息,具體特色包括易用性、擴展性、高可用性、消息集羣等

關於RabbitMQ的教程能夠參考:https://www.cnblogs.com/julyluo/p/6262553.html

安裝教程能夠參考:https://www.cnblogs.com/ericli-ericli/p/5902270.html

 RabbitMq默認的監聽端口是15672

    public class RabbitmqHelper
    {
        private static RabbitmqHelper instance;
        private readonly ConnectionFactory rabbitMqFactory;
        //交換器名稱
        const string ExchangeName = "myExchange";
        //當前消息隊列名稱
        const string QueueName = "myFirstQueue";

        public RabbitmqHelper()
        {
            //沒有設置帳號密碼端口或服務端沒有先開啓權限會報:None of the specified endpoints were reachable
            rabbitMqFactory = new ConnectionFactory { HostName = "192.168.0.112" };
            rabbitMqFactory.Port = 5672;
            rabbitMqFactory.UserName = "123456";
            rabbitMqFactory.Password = "123456";
        }
        /// <summary>
        /// 靜態單例方法
        /// </summary>
        /// <returns></returns>
        public static RabbitmqHelper Get()
        {
            if (instance == null)
            {              
                instance = new RabbitmqHelper();
            }
            return instance;
        }

        /// <summary>
        /// 生產者-發佈消息
        /// </summary>
        /// <param name="msg">消息</param>
        public void Enqueue(string msg)
        {
            using (IConnection conn = rabbitMqFactory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    //定義交換器
                    channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);
                    //持久化一個隊列,若是名稱相同不會重複建立
                    channel.QueueDeclare(QueueName, true, false, false, null);
                    //定義exchange到queue的binding
                    channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
                    byte[] bytes = Encoding.UTF8.GetBytes(msg);
                    //設置消息持久化
                    IBasicProperties properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    channel.BasicPublish("", QueueName, properties, bytes);
                }
            }
        }
        /// <summary>
        /// 消費者-接收消息-基於訂閱模式
        /// </summary>
        /// <param name="msg">消息</param>
        public void Dequeue()
        {
            IConnection conn = rabbitMqFactory.CreateConnection();
            IModel channel = conn.CreateModel();

            //持久化一個隊列,若是名稱相同不會重複建立
            channel.QueueDeclare(QueueName, durable: true, autoDelete: false, exclusive: false, arguments: null);

            //告訴broker同一時間只處理一個消息
            channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

            //因處理性能問題,已被官方棄用
            //var consumer2 = new QueueingBasicConsumer(channel)
            //var msgResponse = consumer.Queue.Dequeue();

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                try
                {
                    var msgBody = Encoding.UTF8.GetString(ea.Body);
                    Console.WriteLine($"收到消息:{msgBody}");

                    //處理完成,服務端能夠刪除消息了同時分配新的消息
                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                }
                catch (Exception e)
                {
                    Console.WriteLine($"出現錯誤:{e.Message}");
                }
            };

            //noAck設置false,發送消息以後,消息不要主動刪除,先等消費者處理完
            channel.BasicConsume(QueueName, false, consumer);
        }
        /// <summary>
        /// 消費者-接收消息-主動拉取
        /// </summary>
        /// <param name="msg">消息</param>
        public void Dequeue2()
        {
            using (IConnection conn = rabbitMqFactory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    while (true)
                    {
                        BasicGetResult res = channel.BasicGet(QueueName, false/*noAck*/);
                        if (res != null)
                        {
                            var msg = System.Text.UTF8Encoding.UTF8.GetString(res.Body);
                            Console.WriteLine($"獲取到消息:{msg}");
                            channel.BasicAck(res.DeliveryTag, false);
                        }
                    }
                }
            }


        }
    }
View Code
相關文章
相關標籤/搜索