RabbitMQ原理與相關操做(一)

小編是菜鳥一枚,最近想試試MQ相關的技術,因此本身看了下RabbitMQ官網,試着寫下本身的理解與操做的過程。html

剛開始的第一篇,原理只介紹 生產者、消費者、隊列,至於其餘的內容,會在後續中陸續補齊。windows

引入MQ話題

何時會用到MQ

可能不少人有疑惑:MQ究竟是什麼?哪些場景下要使用MQ?
前段時間安裝了RabbitMQ,如今就記錄下本身的學習心得吧。
首先看段程序:數組

class Program
    {
        static void Main(string[] args)
        {
            new Thread(Write).Start();
            new Thread(Write).Start();
            new Thread(Write).Start();
            new Thread(Write).Start();
        }
        
        public static void WriteLog(int i)
        {
            using (FileStream f = new FileStream(@"d:\\test.txt", FileMode.Append))
            {
                using (StreamWriter sw = new StreamWriter(f, Encoding.Default))
                {
                    sw.Write(i);
                }
            }
        }

        public static void Write()
        {
            for (int i = 0; i < 10000; i++)
            {
                WriteLog(i);
            }
        }
    }
View Code

僅僅從代碼上看,沒有以爲任何問題對吧?編譯也是經過的,可是執行時,出現一個問題:服務器

 固然,這僅僅是一個小的案例,相似這種多線程寫文件形成的問題, 就應該使用MQ了。多線程

MQ的使用場景大概包括解耦,提升峯值處理能力,送達和排序保證,緩衝等。socket

MQ概述

消息隊列技術是分佈式應用間交換信息的一種技術。分佈式

消息隊列可駐留在內存或磁盤上,隊列存儲消息直到它們被應用程序讀走。ide

經過消息隊列,應用程序可獨立地執行--它們不須要知道彼此的位置、或在繼續執行前不須要等待接收程序接收此消息。post

MQ主要做用是接受和轉發消息。你能夠想一想在生活中的一種場景:當你把信件的投進郵筒,郵遞員確定最終會將信件送給收件人。咱們能夠把MQ比做 郵局和郵遞員學習

MQ和郵局的主要區別是,它不處理消息,可是,它會接受數據、存儲消息數據、轉發消息。

RabbitMQ術語

生產者

消息發送者,在MQ中被稱爲生產者(producer),一個發送消息的應用也被叫作生產者,用P表示

消費者:

生產者「生產」出消息後,最終由誰消費呢?等待接受消息的應用程序,咱們稱之爲消費者(Consuming ),用C表示

隊列:

消息只能存儲在隊列(queue )中。儘管消息在rabbitMQ和應用程序間流通,可是隊列倒是存在於RabbitMQ內部。

一個隊列不受任何限制,它能夠存儲你想要存儲的消息量,它本質上是一個無限的緩衝區。

多個生產者能夠向同一個隊列發送消息,多個消費者能夠嘗試從同一個消息隊列中接收數據。

一個隊列像下面這樣(上面是它的隊列名稱)

注意:

生產者、消費者、中間件沒必要在一臺機器上,實際應用中也是絕大多數不在一塊兒的。咱們能夠用一張圖表示RabbitMQ的構造:

 

注:此圖片摘自於百度百科RabbitMQ

使用RabbitMQ解決多線程寫入文件問題

分析

多線程寫入,產生消息的也就是一個程序(一個生產者P),消費消息的也是一個消息,它的模型應該是:

編寫代碼

引入RabbitMQ client DLL

程序包管理控制檯命令:

PM> Install-Package RabbitMQ.Client

生產者

首先,建立一個 connection 經過socket鏈接 去和服務器鏈接起來(須要傳目的服務器的IP、用戶名、密碼等)。

接着 建立一個 channel ,這是大部分的要作的事情所在。

要發送消息,咱們必須聲明一個隊列,,而後咱們能夠向隊列發佈消息。

執行一次BasicPublish方法,推送一個消息。

class Program
    {
        static void Main(string[] args)
        {
            new Thread(Write).Start();
            new Thread(Write).Start();
            new Thread(Write).Start();
        }

        public static void Write()
        {
            var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "writeLog", durable: false, exclusive: false, autoDelete: false, arguments: null);
                for (int i = 0; i < 8000; i++)
                {
                    string message = i.ToString();
                    var body = Encoding.UTF8.GetBytes(message);

                    channel.BasicPublish(exchange: "", routingKey: "writeLog", basicProperties: null, body: body);
                    Console.WriteLine("Program Sent {0}", message);
                }
            }
        }
    }
View Code

聲明的隊列,在服務器中若是不存在了,會自動建立。而消息的內容是字節數組,在使用時,注意編碼問題。

消費者

當隊列裏有消息時,消費者要隨時可以從隊列裏獲取消息,因此我須要一直運行它,讓它監聽消息。

就像咱們打籃球進行傳球,須要事先確認要傳給的那個隊友位置同樣,生產者要發送消息,必定要事先知道消費消息的程序的對列是哪一個。因此,在運行生產者程序前,須要先啓動消費者程序。

由此,聲明對列,就應該在消費者程序中完成。

class Program
    {
        public static void Main()
        {
            var factory = new ConnectionFactory() { HostName = "localhost", UserName = "eric", Password = "123456", VirtualHost ="/"};
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "writeLog",
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    ExcuateWriteFile(message);
                    Console.WriteLine(" Receiver Received {0}", message);
                };
                channel.BasicConsume(queue: "writeLog",
                                     noAck: true,
                                     consumer: consumer);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
        public static void ExcuateWriteFile(string i)
        {
            using (FileStream fs = new FileStream(@"d:\\test.txt", FileMode.Append))
            {
                using (StreamWriter sw = new StreamWriter(fs, Encoding.Unicode))
                {
                    sw.Write(i);
                }
            }
        }
    }
View Code

執行程序

先執行 消費者程序,讓它一直保持監聽。

錯誤解決

執行時VS報錯:

「RabbitMQ.Client.Exceptions.BrokerUnreachableException」類型的未經處理的異常在 RabbitMQ.Client.dll 中發生 其餘信息: None of the specified endpoints were reachable。

進入查看詳細的內部異常:

innerEception:{"The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=530, text=\"NOT_ALLOWED - access to vhost '/' refused for user 'eric'\", classId=10, methodId=40, cause="}

此時,咱們打開在http://localhost:15672/#/users 能夠看到eric 下 的Can access virtual hosts 爲 NoAccess

解決辦法:

rabbitmqctl控制檯輸入

rabbitmqctl set_permissions -p / userName "." "." ".*"

再次執行時,能夠看到:

 

 而後運行 生產者程序。

 咱們先開着 Receive ,當生產者運行時

 

消費者的自動觸發執行 :

直到全部的 指定的 queue 裏面的消息徹底消費完爲止。(此時消費者程序仍然在監聽中)

 

對於須要安裝和設置用戶的同窗,請參考 windows下 安裝 rabbitMQ 及操做經常使用命令

 

本文參考:

rabbitMq外文網

百度百科

相關文章
相關標籤/搜索