本文版權歸博客園和做者吳雙本人共同全部。歡迎轉載,轉載和爬蟲請註明原文地址:http://www.cnblogs.com/tdws/p/5860668.htmlhtml
想必MQ這兩個字母對於各位前輩們和老司機們並不陌生。本文初探RabbitMQ的簡單分享可能值得學習之處不怎麼多,本人對於RabbitMQ的研究目前也很初級,這個月打算按照好的學習線路提升一下,歡迎新老司機留下大家的看法。多線程
首先提到第一個簡單的場景,文件併發。我先手動實現一下文件併發,引起異常,請看以下代碼。併發
1 static void Main(string[] args) 2 { 3 new Thread(write1).Start(); 4 new Thread(write1).Start(); 5 new Thread(write1).Start(); 6 Console.WriteLine("等待"); 7 Console.ReadKey(); 8 } 9 public static void write1() 10 { 11 for (int i = 0; i < 10000; i++) 12 { 13 WriteLog(i); 14 } 15 //Console.ReadKey(); 16 } 17 public static void WriteLog(int i) 18 { 19 using (FileStream f = new FileStream(@"d:\\A.txt", FileMode.Append)) 20 { 21 using (StreamWriter sw = new StreamWriter(f, Encoding.Default)) 22 { 23 sw.Write(i); 24 } 25 } 26 }
我使用多線程併發向同一個append數據。相信你應該知道接下來運行起來會發生什麼!app
是的,正如你所料,該文件正由另外一個進程使用,所以該進程沒法訪問此文件。也許這個場景,就像你寫應用程序運行日誌,異常日誌。若是你沒使用任何插件或者組件,就只能直接向文件中append。這樣的問題,是你不得不解決的。框架
這個時候就是隊列出場的時候了。固然隊列的組件有不少,.NET框架下也有自帶的隊列Queue,微軟也有獨立的隊列組件MSQueue。Apache有其ActiveMQ,另外知名的消息隊列還有Equeue,ZeroMQ等。消息隊列的使用場景,大概包括解耦,提升峯值處理能力,送達和排序保證,緩衝等。學習
RabbitMQ是一個消息中間件,其主要的觀點很簡單:接受和轉發消息。你能夠把他想象成郵局,當你發送新建到郵遞箱,你很肯定郵遞員最終將會把你的新建傳遞到你的收件人手中。咱們使用郵遞箱和郵遞員來隱喻RabbitMQ。關於RabbitMQ和郵局以前的主要區別在於郵局處理紙質信件,而MQ存儲和轉發二進制數據-message。下面提到RabbitMQ的幾個」行話「。spa
生產者意味着發送消息。一個發送消息的應用程序是一個生產者,咱們稱其爲"P"。插件
隊列queue意味着郵遞箱。他存在於RabbitMQ當中.儘管消息在RabbitMQ和你的應用程序中」流通「,他們能夠被僅存在一個隊列當中。一個隊列不受任何限制,他能夠存儲你想要存儲的消息量,它本質上是一個無限的緩衝區。多個生產者能夠向同一個隊列發送消息,多個消費者能夠嘗試從同一個消息隊列中接收數據。一個隊列像下面這樣,上面是它的隊列名稱。線程
消費者意味着接收,消費者是等待接收處理消息的應用程序。日誌
最後的關係就以下圖:
接下來,我將使用RabbitMQ來解決最開始的文件併發問題。也就是說爲了不文件併發,咱們要將生產者所需append到文件中的內容存入到消息隊列當中,而後取出隊列中的message讓消費者寫入到文件當中。關於RabbitMQ的安裝和配置請先看張善友老師的文章:http://www.cnblogs.com/shanyou/p/4067250.html ,寫的特別詳細。只不過歷經數月RabbitMQ C#客戶端方法上有略微的更新。
下面我將新建兩個ConsoleApp應用程序。一個負責Receive,消費者,寫文件。另外一個Send,生產者,將所需寫入的數據推到RabbitMQ當中。
Receive代碼以下:
static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "127.0.0.1", UserName = "WuShuang", Password = "123456" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); WriteLog(message); }; channel.BasicConsume(queue: "hello", noAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } public static void WriteLog(string i) { using (FileStream f = new FileStream(@"d:\\A.txt", FileMode.Append)) { using (StreamWriter sw = new StreamWriter(f, Encoding.Default)) { sw.Write(i + "\n"); } } }
Send代碼以下:
static void Main(string[] args) { new Thread(write1).Start(); new Thread(write1).Start(); new Thread(write1).Start(); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } public static void write1() { var factory = new ConnectionFactory() { HostName = "127.0.0.1", UserName = "WuShuang", Password = "123456" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); channel.QueueBind("hello", "wsExchange", "hello"); for (int i = 0; i < 10000; i++) { string message = "Hello World ws!" + i; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "wsExchange", routingKey: "hello", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0}", message); } } //Console.ReadKey(); }
有了代碼後,首先把Receive項目運行起來。等待生產者Send消息到隊列中,而後Receive訂閱的事件將會取出消息,並將消息寫入到文件當中。
左側的ConsoleApp持續向隊列中寫入消息,右側的Receive的ConsoleApp運行過程當中,會不斷寫文件。生產者承受了多線程併發些數據,固然消費者有序的取出隊列中的數據,也不會發生最開始的文件併發異常。
今天的分享就這麼簡單,關於RabbitMQ有待繼續深刻學習。
若是您以爲個人點滴分享,對您有點滴幫助,歡迎點贊,也爲您本身的進步點贊!
點擊下方關注,咱們共同進步!
分享的過程當中,總會遇到不斷地驚喜。
參考文章:
張善友老師:Windows上安裝RabbitMQ指南
RabbitMQ(開源)官方文檔 Hello world