首先咱們知道隊列是先進先出的機制,因此在處理併發是個不錯的選擇。而後就寫兩個隊列的簡單應用。windows
命名空間:System.Collections,不在這裏作過多的理論解釋,這個東西很是的好理解。api
能夠看下官方文檔:https://docs.microsoft.com/zh-cn/dotnet/api/system.collections.queue?view=netframework-4.7.2安全
我這裏就是爲了方便記憶作了一個基本的例子,首先建立了QueueTest類:服務器
包含了獲取隊列的數量,入隊和出隊的實現多線程
1 public class QueueTest 2 { 3 public static Queue<string> q = new Queue<string>(); 4 5 #region 獲取隊列數量 6 public int GetCount() 7 { 8 9 return q.Count; 10 } 11 #endregion 12 13 #region 隊列添加數據 14 public void IntoData(string qStr) 15 { 16 string threadId = System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(); 17 q.Enqueue(qStr); 18 Console.WriteLine($"隊列添加數據: {qStr};當前線程id:{threadId}"); 19 } 20 #endregion 21 22 #region 隊列輸出數據 23 24 public string OutData() 25 { 26 string threadId = System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(); 27 string str = q.Dequeue(); 28 Console.WriteLine($"隊列輸出數據: {str};當前線程id:{threadId}"); 29 return str; 30 } 31 #endregion 32 33 }
爲了模擬併發狀況下也不會出現重複讀取和插入混亂的問題因此寫了TaskTest類裏面開闢了兩個異步線程進行插入和讀取:併發
這裏只是證實了多線程插入不會形成丟失。無憂證實併發的先進先出異步
1 class TaskTest 2 { 3 4 #region 隊列的操做模擬 5 public static void QueueMian() 6 { 7 QueueA(); 8 QueueB(); 9 } 10 private static async void QueueA() 11 { 12 QueueTest queue = new QueueTest(); 13 var task = Task.Run(() => 14 { 15 for (int i = 0; i < 20; i++) 16 { 17 queue.IntoData("QueueA" + i); 18 } 19 }); 20 await task; 21 Console.WriteLine("QueueAA插入完成,進行輸出:"); 22 23 while (queue.GetCount() > 0) 24 { 25 queue.OutData(); 26 } 27 } 28 29 private static async void QueueB() 30 { 31 QueueTest queue = new QueueTest(); 32 var task = Task.Run(() => 33 { 34 for (int i = 0; i < 20; i++) 35 { 36 queue.IntoData("QueueB" + i); 37 } 38 }); 39 await task; 40 Console.WriteLine("QueueB插入完成,進行輸出:"); 41 42 while (queue.GetCount() > 0) 43 { 44 queue.OutData(); 45 } 46 } 47 #endregion 48 49 }
而後在main函數直接調用便可:async
經過上面的截圖能夠看出插入線程是無前後的。ide
這張圖也是線程無前後。函數
補充:經過園友的提問,我發現我一開始測試的不太仔細,只注意多線程下的插入,沒有注意到輸出其實不是跟插入的順序一致,對不起,這說明queue不是線程安全的,因此這個就當是入隊,出隊的基礎例子並不能說明併發。後面有一個補充的ConcurrentQueue隊列是說明了併發線程的先進先出。
msmq是微軟提供的消息隊列,原本在windows系統中就存在,可是默認沒有開啓。須要開啓。
打開控制面板=>程序和功能=> 啓動或關閉windows功能 => Microsoft Message Queue(MSMQ)服務器=>Microsoft Message Queue(MSMQ)服務器核心
通常選擇:MSMQ Active Directory域服務繼承和MSMQ HTTP支持便可。
點擊肯定等待安裝成功。
須要引用System.Messaging.DLL
命名空間:System.Messaging
官方資料文檔:https://docs.microsoft.com/zh-cn/dotnet/api/system.messaging.messagequeue?view=netframework-4.7.2
與上面queue一樣的示例方式,建立一個MSMQ類,實現建立消息隊列,查詢數據,入列,出列功能:
1 /// <summary> 2 /// MSMQ消息隊列 3 /// </summary> 4 class MSMQ 5 { 6 static string path = ".\\Private$\\myQueue"; 7 static MessageQueue queue; 8 public static void Createqueue(string queuePath) 9 { 10 try 11 { 12 if (MessageQueue.Exists(queuePath)) 13 { 14 Console.WriteLine("消息隊列已經存在"); 15 //獲取這個消息隊列 16 queue = new MessageQueue(queuePath); 17 } 18 else 19 { 20 //不存在,就建立一個新的,並獲取這個消息隊列對象 21 queue = MessageQueue.Create(queuePath); 22 path = queuePath; 23 } 24 } 25 catch (Exception e) 26 { 27 Console.WriteLine(e.Message); 28 } 29 30 } 31 32 33 #region 獲取消息隊列的數量 34 public static int GetMessageCount() 35 { 36 try 37 { 38 if (queue != null) 39 { 40 int count = queue.GetAllMessages().Length; 41 Console.WriteLine($"消息隊列數量:{count}"); 42 return count; 43 } 44 else 45 { 46 return 0; 47 } 48 } 49 catch (MessageQueueException e) 50 { 51 52 Console.WriteLine(e.Message); 53 return 0; 54 } 55 56 57 } 58 #endregion 59 60 #region 發送消息到隊列 61 public static void SendMessage(string qStr) 62 { 63 try 64 { 65 //鏈接到本地隊列 66 67 MessageQueue myQueue = new MessageQueue(path); 68 69 //MessageQueue myQueue = new MessageQueue("FormatName:Direct=TCP:192.168.12.79//Private$//myQueue1"); 70 71 //MessageQueue rmQ = new MessageQueue("FormatName:Direct=TCP:121.0.0.1//private$//queue");--遠程格式 72 73 Message myMessage = new Message(); 74 75 myMessage.Body = qStr; 76 77 myMessage.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) }); 78 79 //發生消息到隊列中 80 81 myQueue.Send(myMessage); 82 83 string threadId = System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(); 84 Console.WriteLine($"消息發送成功: {qStr};當前線程id:{threadId}"); 85 } 86 catch (MessageQueueException e) 87 { 88 Console.WriteLine(e.Message); 89 } 90 } 91 #endregion 92 93 #region 鏈接消息隊列讀取消息 94 public static void ReceiveMessage() 95 { 96 MessageQueue myQueue = new MessageQueue(path); 97 98 99 myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) }); 100 101 try 102 103 { 104 105 //從隊列中接收消息 106 107 Message myMessage = myQueue.Receive(new TimeSpan(10));// myQueue.Peek();--接收後不消息從隊列中移除 108 myQueue.Close(); 109 110 string context = myMessage.Body.ToString(); 111 string threadId = System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(); 112 Console.WriteLine($"--------------------------消息內容: {context};當前線程id:{threadId}"); 113 114 } 115 116 catch (System.Messaging.MessageQueueException e) 117 118 { 119 120 Console.WriteLine(e.Message); 121 122 } 123 124 catch (InvalidCastException e) 125 126 { 127 128 Console.WriteLine(e.Message); 129 130 } 131 132 } 133 #endregion 134 }
這裏說明一下path這個字段,這是消息隊列的文件位置和隊列名稱,我這裏寫的「.」(點)就是表明的位置MachineName字段,,表明本機的意思
而後TaskTest類修改爲這個樣子:
1 class TaskTest 2 { 3 4 #region 消息隊列的操做模擬 5 public static void MSMQMian() 6 { 7 MSMQ.Createqueue(".\\Private$\\myQueue"); 8 MSMQA(); 9 MSMQB(); 10 Console.WriteLine("MSMQ結束"); 11 } 12 private static async void MSMQA() 13 { 14 var task = Task.Run(() => 15 { 16 for (int i = 0; i < 20; i++) 17 { 18 MSMQ.SendMessage("MSMQA" + i); 19 } 20 }); 21 await task; 22 Console.WriteLine("MSMQA發送完成,進行讀取:"); 23 24 while (MSMQ.GetMessageCount() > 0) 25 { 26 MSMQ.ReceiveMessage(); 27 } 28 } 29 30 private static async void MSMQB() 31 { 32 var task = Task.Run(() => 33 { 34 for (int i = 0; i < 20; i++) 35 { 36 MSMQ.SendMessage("MSMQB" + i); 37 } 38 }); 39 await task; 40 Console.WriteLine("MSMQB發送完成,進行讀取:"); 41 42 while (MSMQ.GetMessageCount() > 0) 43 { 44 MSMQ.ReceiveMessage(); 45 } 46 } 47 #endregion
建立成功的消息隊列咱們能夠在電腦上查看:個人電腦=>管理 =>計算機管理 =>服務與應用程序 =>消息隊列 =>專用隊列就看到我剛纔建立的消息隊列
感謝 virtual1988 提出的queue不是線程安全這個問題,是我沒搞清楚。線程安全要使用ConcurrentQueue隊列。
謝謝提出的寶貴意見。
因此我有修改了一下寫了個ConcurrentQueue隊列的:
修改代碼以下:
//public static Queue<string> q = new Queue<string>(); public static ConcurrentQueue<string> q = new ConcurrentQueue<string>(); //public static Queue q =Queue.Synchronized(new Queue()); #region 獲取隊列數量 public static int GetCount() { return q.Count; } #endregion #region 隊列添加數據 public static void IntoData(string qStr) { string threadId = System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(); q.Enqueue(qStr); System.Threading.Thread.Sleep(10); Console.WriteLine($"隊列添加數據: {qStr};當前線程id:{threadId}"); } #endregion #region 隊列輸出數據 public static string OutData2() { string threadId = System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(); foreach (var item in q) { Console.WriteLine($"------隊列輸出數據: {item};當前線程id:{threadId}"); string d=""; q.TryDequeue( out d); } return "211"; } #endregion
task類:
#region 隊列的操做模擬 public static async void QueueMian() { QueueA(); QueueB(); } private static async void QueueA() { var task = Task.Run(() => { for (int i = 0; i < 20; i++) { QueueTest.IntoData("QueueA" + i); } }); await task; Console.WriteLine("QueueA插入完成,進行輸出:"); } private static async void QueueB() { var task = Task.Run(() => { for (int i = 0; i < 20; i++) { QueueTest.IntoData("QueueB" + i); } }); await task; Console.WriteLine("QueueB插入完成,進行輸出:"); } public static void QueueC() { Console.WriteLine("Queue插入完成,進行輸出:"); while (QueueTest.GetCount() > 0) { QueueTest.OutData2(); } } #endregion
Main函數調用:
static void Main(string[] args) { try { Stopwatch stopWatch = new Stopwatch(); TaskTest.QueueMian(); Console.ReadLine(); TaskTest.QueueC(); Console.ReadLine(); } catch (Exception e) { throw; } }
插入效果:
輸出效果: