小編是菜鳥一枚,最近想試試MQ相關的技術,因此本身看了下RabbitMQ官網,試着寫下本身的理解與操做的過程。html
剛開始的第一篇,原理只介紹 生產者、消費者、隊列,至於其餘的內容,會在後續中陸續補齊。windows
可能不少人有疑惑:MQ究竟是什麼?哪些場景下要使用MQ?
前段時間安裝了RabbitMQ,如今就記錄下本身的學習心得吧。
首先看段程序:數組
class Program { static void Main(string[] args) { new Thread(Write).Start(); new Thread(Write).Start(); new Thread(Write).Start(); new Thread(Write).Start(); } public static void WriteLog(int i) { using (FileStream f = new FileStream(@"d:\\test.txt", FileMode.Append)) { using (StreamWriter sw = new StreamWriter(f, Encoding.Default)) { sw.Write(i); } } } public static void Write() { for (int i = 0; i < 10000; i++) { WriteLog(i); } } }
僅僅從代碼上看,沒有以爲任何問題對吧?編譯也是經過的,可是執行時,出現一個問題:服務器
固然,這僅僅是一個小的案例,相似這種多線程寫文件形成的問題, 就應該使用MQ了。多線程
MQ的使用場景大概包括解耦,提升峯值處理能力,送達和排序保證,緩衝等。socket
消息隊列技術是分佈式應用間交換信息的一種技術。分佈式
消息隊列可駐留在內存或磁盤上,隊列存儲消息直到它們被應用程序讀走。ide
經過消息隊列,應用程序可獨立地執行--它們不須要知道彼此的位置、或在繼續執行前不須要等待接收程序接收此消息。post
MQ主要做用是接受和轉發消息。你能夠想一想在生活中的一種場景:當你把信件的投進郵筒,郵遞員確定最終會將信件送給收件人。咱們能夠把MQ比做 郵局和郵遞員。學習
MQ和郵局的主要區別是,它不處理消息,可是,它會接受數據、存儲消息數據、轉發消息。
生產者:
消息發送者,在MQ中被稱爲生產者(producer),一個發送消息的應用也被叫作生產者,用P表示
生產者「生產」出消息後,最終由誰消費呢?等待接受消息的應用程序,咱們稱之爲消費者(Consuming ),用C表示
消息只能存儲在隊列(queue )中。儘管消息在rabbitMQ和應用程序間流通,可是隊列倒是存在於RabbitMQ內部。
一個隊列不受任何限制,它能夠存儲你想要存儲的消息量,它本質上是一個無限的緩衝區。
多個生產者能夠向同一個隊列發送消息,多個消費者能夠嘗試從同一個消息隊列中接收數據。
一個隊列像下面這樣(上面是它的隊列名稱)
生產者、消費者、中間件沒必要在一臺機器上,實際應用中也是絕大多數不在一塊兒的。咱們能夠用一張圖表示RabbitMQ的構造:
注:此圖片摘自於百度百科RabbitMQ。
多線程寫入,產生消息的也就是一個程序(一個生產者P),消費消息的也是一個消息,它的模型應該是:
程序包管理控制檯命令:
PM> Install-Package RabbitMQ.Client
首先,建立一個 connection 經過socket鏈接 去和服務器鏈接起來(須要傳目的服務器的IP、用戶名、密碼等)。
接着 建立一個 channel ,這是大部分的要作的事情所在。
要發送消息,咱們必須聲明一個隊列,,而後咱們能夠向隊列發佈消息。
執行一次BasicPublish方法,推送一個消息。
class Program { static void Main(string[] args) { new Thread(Write).Start(); new Thread(Write).Start(); new Thread(Write).Start(); } public static void Write() { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "writeLog", durable: false, exclusive: false, autoDelete: false, arguments: null); for (int i = 0; i < 8000; i++) { string message = i.ToString(); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "writeLog", basicProperties: null, body: body); Console.WriteLine("Program Sent {0}", message); } } } }
聲明的隊列,在服務器中若是不存在了,會自動建立。而消息的內容是字節數組,在使用時,注意編碼問題。
當隊列裏有消息時,消費者要隨時可以從隊列裏獲取消息,因此我須要一直運行它,讓它監聽消息。
就像咱們打籃球進行傳球,須要事先確認要傳給的那個隊友位置同樣,生產者要發送消息,必定要事先知道消費消息的程序的對列是哪一個。因此,在運行生產者程序前,須要先啓動消費者程序。
由此,聲明對列,就應該在消費者程序中完成。
class Program { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", VirtualHost ="/"}; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "writeLog", durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); ExcuateWriteFile(message); Console.WriteLine(" Receiver Received {0}", message); }; channel.BasicConsume(queue: "writeLog", noAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } public static void ExcuateWriteFile(string i) { using (FileStream fs = new FileStream(@"d:\\test.txt", FileMode.Append)) { using (StreamWriter sw = new StreamWriter(fs, Encoding.Unicode)) { sw.Write(i); } } } }
先執行 消費者程序,讓它一直保持監聽。
執行時VS報錯:
「RabbitMQ.Client.Exceptions.BrokerUnreachableException」類型的未經處理的異常在 RabbitMQ.Client.dll 中發生 其餘信息: None of the specified endpoints were reachable。
進入查看詳細的內部異常:
innerEception:{"The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=530, text=\"NOT_ALLOWED - access to vhost '/' refused for user 'eric'\", classId=10, methodId=40, cause="}
此時,咱們打開在http://localhost:15672/#/users 能夠看到eric 下 的Can access virtual hosts 爲 NoAccess
解決辦法:
rabbitmqctl控制檯輸入
rabbitmqctl set_permissions -p / userName "." "." ".*"
再次執行時,能夠看到:
而後運行 生產者程序。
咱們先開着 Receive ,當生產者運行時
消費者的自動觸發執行 :
直到全部的 指定的 queue 裏面的消息徹底消費完爲止。(此時消費者程序仍然在監聽中)
對於須要安裝和設置用戶的同窗,請參考 windows下 安裝 rabbitMQ 及操做經常使用命令
本文參考: