[Open Source] RabbitMQ 安裝與使用

前言

吃多了拉就是隊列,吃飽了吐就是棧html

  • 使用場景
    • 對操做的實時性要求不高,而須要執行的任務極爲耗時;(發送短信,郵件提醒,更新文章閱讀計數,記錄用戶操做日誌)
    • 存在異構系統間的整合;

安裝

  • 下載 Erlang
    • 安裝完肯定ERLANG_HOME環境變量是否添加,不然:Setx ERLANG_HOME 「D:\Program Files\erl8.2″
  • 下載安裝包
    • 安裝完經過rabbitmqctl status肯定rabbitmq狀態
  • 管理服務
    • 默認安裝成功會自動啓動服務
    • 經過開始菜單能夠啓動,中止,卸載服務
  • 佔用端口
    • 4369(集羣、Erlang)
    • 5671,5672(應用層標準高級消息隊列協議)
    • 25672(Erlang分發,CLI通訊)
    • 15672(若是管理插件啓用)
    • 61613,61614(若是消息文本協議STOMP已啓用)
    • 1883,8883(若是erl實時通訊已啓用)
  • 支持的平臺
    • 基於Ubuntu和Debian的Linux發行版
    • 基於Fedora,CentOS和RPM的Linux發行版
    • Mac OS X
    • Windows XP及更高版本

概念

  • Connections:客戶端鏈接,建立該資源很是耗時,應儘可能避免屢次建立。
  • Channel:消息通道,在客戶端的每一個鏈接裏,可創建多個channel,每一個channel表明一個會話任務。
  • Exchange:消息交換機,它指定消息按什麼規則,路由到哪一個隊列。
  • Queue:消息隊列載體,每一個消息都會被投入到一個或多個隊列。c#

  • Broker:簡單來講就是消息隊列服務器實體。
  • Binding:綁定,它的做用就是把exchange和queue按照路由規則綁定起來。
  • Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
  • vhost:虛擬主機,一個broker裏能夠開設多個vhost,用做不一樣用戶的權限分離。windows

  • producer:消息生產者,就是投遞消息的程序。
  • consumer:消息消費者,就是接收消息的程序。服務器

消息隊列的發送過程大概以下:

  1. 客戶端建立Connection,鏈接到消息隊列服務器,打開一個channel。
  2. 客戶端聲明一個Exchange,並設置相關屬性。
  3. 客戶端聲明一個Queue,並設置相關屬性。
  4. 客戶端使用routing key,在exchange和queue之間創建好綁定關係。
  5. 客戶端發送消息首先到exchange
  6. exchange根據type路由到對應的隊列(能夠是多個隊列)中.

Exchange Type

  • direct(直連)
    • routing key 與 binding key相同
  • fanout
    • 給全部綁定隊列發送消息
  • topic
    • routing key:audit.irs.corporate => binding key:audit.#
    • routing key:audit.irs => binding key:audit.*
  • default
    • direct
    • binding key爲queue名稱

經常使用命令

  • 管理插件
    • rabbitmq-plugins enable rabbitmq_management // 啓用
    • rabbitmq-plugins disable rabbitmq_management // 禁用
  • 管理隊列
    • rabbitmqctl list_queues // 查看隊列
  • 管理用戶及權限
    • rabbitmqctl list_users // 查看全部用戶
    • rabbitmqctl add_user user_admin passwd_admin // 添加用戶
    • rabbitmqctl set_user_tags user_admin administrator // 添加權限
    • rabbitmqctl delete_user guest // 刪除用戶
    • rabbitmqctl change_password {username} {newpassowrd} // 修改密碼
  • 管理虛擬主機vhost
    • rabbitmqctl add_vhost vhostpath // 建立虛擬主機
    • rabbitmqctl delete_vhost vhostpath // 刪除虛擬主機
    • rabbitmqctl list_vhosts // 列出全部虛擬主機

使用

  • 發送消息(以持久化代碼爲例)
var factory = new ConnectionFactory
{
    HostName = hostName,                // rabbit server
    UserName = "admin",
    Password = "admin",
    Port = 5672,                        // Broker端口
    VirtualHost = "/"                   // 虛擬Host,需提早配置
};

using (var connection = factory.CreateConnection()) // 建立與RabbitMQ服務器的鏈接
{
    using (var channel = connection.CreateModel())  // 建立1個Channel(大部分API在該Channel中)
    {
        // 定義1個隊列,自動會和默認的exchange 作direct類型綁定
        channel.QueueDeclare(
            queue: "hello",                     // 隊列名稱
            durable: true,                      // 隊列是否持久化
            exclusive: false,                   // 排他隊列:若是一個隊列被聲明爲排他隊列,該隊列僅對首次聲明它的鏈接可見,並在鏈接斷開時自動刪除。(活動在一次鏈接內)
            autoDelete: false,                  // 自動刪除:當最後一個消費者取消訂閱時,隊列自動刪除。若是您須要僅由一個使用者使用的臨時隊列,請將自動刪除與排除。當消費者斷​​開鏈接時,隊列將被刪除。(至少消費者能連一次)
            arguments: null);                   // 配置參數

        var randomQueue = channel.QueueDeclare();                   // 定義隨機的隊列 該隊列爲臨時隊列(排他隊列 + 自動刪除)


        // 定義Exchange(通常而言,不須要定義exchange,rabbitmq默認建立了全部類型的exchange)
        //channel.ExchangeDeclare("direct-demo", ExchangeType.Direct);    // 定義direct exchange
        //channel.ExchangeDeclare("fannout-demo", ExchangeType.Fanout);   // 定義fanout exchange
        //channel.ExchangeDeclare("topic-demo", ExchangeType.Topic);      // 定義fanout exchange


        // 定義queue exchange key 關係(在某些業務場景下,會使用該關係作路由功能)
        //channel.QueueBind(queue: "hello", exchange: "amq.direct", routingKey: "hello"); // 默認綁定的關係和該行代碼效果同樣
        //channel.QueueBind("hello", "amq.fanout", "hello");                              // 該類型下的routingKey 實際不須要

        var properties = channel.CreateBasicProperties();
        properties.Persistent = true;

        while (true)
        {
            string message = "Hello World!" + DateTime.Now;
            var body = Encoding.UTF8.GetBytes(message);

            // 發送消息到隊列中
            channel.BasicPublish(
                exchange: string.Empty,         // 傳遞爲Empty的時候,經過   `(AMQP default)`傳遞
                routingKey: "hello",            // routing key 與 queuebind中的binding key對應
                basicProperties: properties,    // 消息header
                body: body);                    // 消息body:發送的是bytes 能夠任意編碼

            Console.WriteLine(" [x] Sent {0}", message);
        }
    }
}
  • 接收消息(以消息響應爲例)
var factory = new ConnectionFactory
{
    HostName = hostName,                // rabbit server
    UserName = "admin",
    Password = "admin",
    Port = 5672,                        // Broker端口
    VirtualHost = "/"                   // 虛擬Host,需提早配置
};
using (var connection = factory.CreateConnection())
{
    using (var channel = connection.CreateModel())
    {
        var consumer = new EventingBasicConsumer(channel);  // 建立Consumer
        consumer.Received += (model, ea) =>         // 經過回調函數異步推送咱們的消息
        {
            var body = ea.Body;
            var message = Encoding.UTF8.GetString(body);
            Thread.Sleep(1000);
            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); // 消息響應
            Console.WriteLine(" [x] Received {0}", message);
        };

        channel.BasicQos(0, 1, false);  // 設置perfetchCount=1 。這樣就告訴RabbitMQ 不要在同一時間給一個工做者發送多於1個的消息
        channel.BasicConsume(queue: "hello",
                                noAck: false,      // 須要消息響應(Acknowledgments)機制
                                consumer: consumer);

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }
}
  • 消息響應(acknowledgments)
    • 爲了防止消息丟失,RabbitMQ提供了消息響應(acknowledgments)機制。消費者會經過一個ack(響應),告訴RabbitMQ已經收到並處理了某條消息,而後RabbitMQ纔會釋放並刪除這條消息。
  • 持久化
    • 新隊列(沒法修改隊列)配置爲可持久化
    • 發送消息配置爲持久化
    • 消息何時刷到磁盤?
      • 寫入文件前會有一個Buffer,大小爲1M,數據在寫入文件時,首先會寫入到這個Buffer,若是Buffer已滿,則會將Buffer寫入到文件(未必刷到磁盤)。
      • 固定的刷盤時間:25ms,也就是無論Buffer滿不滿,每一個25ms,Buffer裏的數據及未刷新到磁盤的文件內容一定會刷到磁盤。
      • 每次消息寫入後,若是沒有後續寫入請求,則會直接將已寫入的消息刷到磁盤:使用Erlang的receive x after 0實現,只要進程的信箱裏沒有消息,則產生一個timeout消息,而timeout會觸發刷盤操做。

常見問題

  • RabbitMQ 管理插件啓動報錯
    • 確認RabbitMQ服務是否啓動
    • C:\Windows目錄下,將.erlang.cookie文件,拷貝到用戶目錄下 C:\Users{用戶名},這是Erlang的Cookie文件,容許與Erlang進行交互
    • 從新安裝erl 和 rabbit,儘可能不要帶空格的路徑
  • 修改配置文件
相關文章
相關標籤/搜索