轉 RabbitMQ

轉自:https://blog.thankbabe.com/2017/08/03/rabbitmq-demo/?from=cnblogs

介紹

RabbitMQ是一個由erlang開發的基於AMQP(Advanced Message Queue)協議的開源實現。用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面都很是的優秀。是當前最主流的消息中間件之一。html

RabbitMQ的官方git

image

  • 概念:
    • Brocker:消息隊列服務器實體。
    • Exchange:消息交換機,指定消息按什麼規則,路由到哪一個隊列。
    • Queue:消息隊列,每一個消息都會被投入到一個或者多個隊列裏。
    • Binding:綁定,它的做用是把exchange和queue按照路由規則binding起來。
    • Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
    • Vhost:虛擬主機,一個broker裏能夠開設多個vhost,用做不用用戶的權限分離。
    • Producer:消息生產者,就是投遞消息的程序。
    • Consumer:消息消費者,就是接受消息的程序。
    • Channel:消息通道,在客戶端的每一個鏈接裏,可創建多個channel,每一個channel表明一個會話任務。
  • 消息隊列的使用過程大概以下:
    • 消息接收
      • 客戶端鏈接到消息隊列服務器,打開一個channel。
      • 客戶端聲明一個exchange,並設置相關屬性。
      • 客戶端聲明一個queue,並設置相關屬性。
      • 客戶端使用routing key,在exchange和queue之間創建好綁定關係。
    • 消息發佈
      • 客戶端投遞消息到exchange。
      • exchange接收到消息後,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列裏。
  • AMQP 裏主要要說兩個組件:
    • Exchange 和 Queue
    • 綠色的 X 就是 Exchange ,紅色的是 Queue ,這二者都在 Server 端,又稱做 Broker
    • 這部分是 RabbitMQ 實現的,而藍色的則是客戶端,一般有 Producer 和 Consumer 兩種類型。
  • Exchange一般分爲四種:
    • fanout:該類型路由規則很是簡單,會把全部發送到該Exchange的消息路由到全部與它綁定的Queue中,至關於廣播功能
    • direct:該類型路由規則會將消息路由到binding key與routing key徹底匹配的Queue中
    • topic:與direct類型類似,只是規則沒有那麼嚴格,能夠模糊匹配和多條件匹配
    • headers:該類型不依賴於routing key與binding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配
  • 使用場景

下載與安裝

  • 下載
  • 安裝
    • 先安裝erlang
    • 而後再安裝rabbitmq

管理工具

操做起來很簡單,只須要在DOS下面,進入安裝目錄(安裝路徑\RabbitMQ Server\rabbitmq_server-3.2.2\sbin)執行以下命令就能夠成功安裝。github

rabbitmq-plugins enable rabbitmq_management 

能夠經過訪問:http://localhost:15672進行測試,默認的登錄帳號爲:guest,密碼爲:guest。bash

圖片

其餘配置

1. 安裝完之後erlang須要手動設置ERLANG_HOME 的系統變量。服務器

set ERLANG_HOME=F:\Program Files\erl9.0 #環境變量`path`里加入:%ERLANG_HOME%\bin #環境變量`path`里加入: 安裝路徑\RabbitMQ Server\rabbitmq_server-3.6.10\sbin 

2.激活Rabbit MQ’s Management Plugin分佈式

使用Rabbit MQ 管理插件,能夠更好的可視化方式查看Rabbit MQ 服務器實例的狀態,你能夠在命令行中使用下面的命令激活。ide

rabbitmq-plugins.bat  enable rabbitmq_management 

3.建立管理用戶工具

rabbitmqctl.bat add_user sa 123456

4. 設置管理員測試

rabbitmqctl.bat set_user_tags sa administrator

5.設置權限ui

rabbitmqctl.bat set_permissions -p / sa ".*" ".*" ".*" 

6. 其餘命令

#查詢用戶: rabbitmqctl.bat list_users #查詢vhosts: rabbitmqctl.bat list_vhosts #啓動RabbitMQ服務: net stop RabbitMQ && net start RabbitMQ 

以上這些,帳號、vhost、權限、做用域等基本就設置完了。


基於.net使用

RabbitMQ.Client 是RabbiMQ 官方提供的的客戶端 
EasyNetQ 是基於RabbitMQ.Client 基礎上封裝的開源客戶端,使用很是方便

如下操做RabbitMQ的代碼例子,都是基於EasyNetQ的使用和再封裝,在文章底部有demo例子的源碼下載地址


建立 IBus

/// <summary> /// 消息服務器鏈接器 /// </summary> public class BusBuilder { public static IBus CreateMessageBus() { // 消息服務器鏈接字符串 // var connectionString = ConfigurationManager.ConnectionStrings["RabbitMQ"]; string connString = "host=127.0.0.1:5672;virtualHost=TestQueue;username=sa;password=123456"; if (connString == null || connString == string.Empty) throw new Exception("messageserver connection string is missing or empty"); return RabbitHutch.CreateBus(connString); } } 

Fanout Exchange

image

全部發送到Fanout Exchange的消息都會被轉發到與該Exchange 綁定(Binding)的全部Queue上。 
Fanout Exchange 不須要處理RouteKey 。只須要簡單的將隊列綁定到exchange 上。這樣發送到exchange的消息都會被轉發到與該交換機綁定的全部隊列上。相似子網廣播,每臺子網內的主機都得到了一份複製的消息。 因此,Fanout Exchange 轉發消息是最快的。


/// <summary> /// 消息消耗(fanout) /// </summary> /// <typeparam name="T">消息類型</typeparam> /// <param name="handler">回調</param> /// <param name="exChangeName">交換器名</param> /// <param name="queueName">隊列名</param> /// <param name="routingKey">路由名</param> public static void FanoutConsume<T>(Action<T> handler, string exChangeName = "fanout_mq", string queueName = "fanout_queue_default", string routingKey = "") where T : class { var bus = BusBuilder.CreateMessageBus(); var adbus = bus.Advanced; var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Fanout); var queue = CreateQueue(adbus, queueName); adbus.Bind(exchange, queue, routingKey); adbus.Consume(queue, registration => { registration.Add<T>((message, info) => { handler(message.Body); }); }); } /// <summary> /// 消息上報(fanout) /// </summary> /// <typeparam name="T">消息類型</typeparam> /// <param name="topic">主題名</param> /// <param name="t">消息命名</param> /// <param name="msg">錯誤信息</param> /// <returns></returns> public static bool FanoutPush<T>(T t, out string msg, string exChangeName = "fanout_mq", string routingKey = "") where T : class { msg = string.Empty; try { using (var bus = BusBuilder.CreateMessageBus()) { var adbus = bus.Advanced; var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Fanout); adbus.Publish(exchange, routingKey, false, new Message<T>(t)); return true; } } catch (Exception ex) { msg = ex.ToString(); return false; } } 

image 
全部發送到Direct Exchange的消息被轉發到RouteKey中指定的Queue。 
Direct模式,可使用RabbitMQ自帶的Exchange:default Exchange 。因此不須要將Exchange進行任何綁定(binding)操做 。消息傳遞時,RouteKey必須徹底匹配,纔會被隊列接收,不然該消息會被拋棄。


/// <summary> /// 消息發送(direct) /// </summary> /// <typeparam name="T">消息類型</typeparam> /// <param name="queue">發送到的隊列</param> /// <param name="message">發送內容</param> public static void DirectSend<T>(string queue, T message) where T : class { using (var bus = BusBuilder.CreateMessageBus()) { bus.Send(queue, message); } } /// <summary> /// 消息接收(direct) /// </summary> /// <typeparam name="T">消息類型</typeparam> /// <param name="queue">接收的隊列</param> /// <param name="callback">回調操做</param> /// <param name="msg">錯誤信息</param> /// <returns></returns> public static bool DirectReceive<T>(string queue, Action<T> callback, out string msg) where T : class { msg = string.Empty; try { var bus = BusBuilder.CreateMessageBus(); bus.Receive<T>(queue, callback); } catch (Exception ex) { msg = ex.ToString(); return false; } return true; } /// <summary> /// 消息發送 /// <![CDATA[(direct EasyNetQ高級API)]]> /// </summary> /// <typeparam name="T"></typeparam> /// <param name="t"></param> /// <param name="msg"></param> /// <param name="exChangeName"></param> /// <param name="routingKey"></param> /// <returns></returns> public static bool DirectPush<T>(T t, out string msg, string exChangeName = "direct_mq", string routingKey = "direct_rout_default") where T : class { msg = string.Empty; try { using (var bus = BusBuilder.CreateMessageBus()) { var adbus = bus.Advanced; var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Direct); adbus.Publish(exchange, routingKey, false, new Message<T>(t)); return true; } } catch (Exception ex) { msg = ex.ToString(); return false; } } /// <summary> /// 消息接收 /// <![CDATA[(direct EasyNetQ高級API)]]> /// </summary> /// <typeparam name="T">消息類型</typeparam> /// <param name="handler">回調</param> /// <param name="exChangeName">交換器名</param> /// <param name="queueName">隊列名</param> /// <param name="routingKey">路由名</param> public static bool DirectConsume<T>(Action<T> handler, out string msg, string exChangeName = "direct_mq", string queueName = "direct_queue_default", string routingKey = "direct_rout_default") where T : class { msg = string.Empty; try { var bus = BusBuilder.CreateMessageBus(); var adbus = bus.Advanced; var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Direct); var queue = CreateQueue(adbus, queueName); adbus.Bind(exchange, queue, routingKey); adbus.Consume(queue, registration => { registration.Add<T>((message, info) => { handler(message.Body); }); }); } catch (Exception ex) { msg = ex.ToString(); return false; } return true; } 

Topic Exchange

image

  • 消息發佈(Publish)

要使用主題發佈,只需使用帶有主題的重載的Publish方法:

var bus = RabbitHutch.CreateBus(...); bus.Publish(message, "X.A"); 

訂閱者能夠經過指定要匹配的主題來過濾郵件。

  • 這些能夠包括通配符:
    • *=>匹配一個字。
    • #=>匹配到零個或多個單詞。

因此發佈的主題爲「X.A.2」的消息將匹配「#」,「X.#」,「* .A.*」,而不是「X.B. *」或「A」。

警告,Publish只顧發送消息到隊列,可是無論有沒有消費端訂閱,因此,發佈以後,若是沒有消費者,該消息將不會被消費甚至丟失。

  • 消息訂閱(Subscribe)

EasyNetQ提供了消息訂閱,當調用Subscribe方法時候,EasyNetQ會建立一個用於接收消息的隊列,不過與消息發佈不一樣的是,消息訂閱增長了一個參數,subscribe_id.代碼以下:

bus.Subscribe("my_id", handler, x => x.WithTopic("X.*")); 

警告: 具備相同訂閱者但不一樣主題字符串的兩個單獨訂閱可能不會產生您指望的效果。 subscriberId有效地標識個體AMQP隊列。 具備相同subscriptionId的兩個訂閱者將鏈接到相同的隊列,而且二者都將添加本身的主題綁定。 因此,例如,若是你這樣作:

bus.Subscribe("my_id", handlerOfXDotStar, x => x.WithTopic("X.*")); bus.Subscribe("my_id", handlerOfStarDotB, x => x.WithTopic("*.B")); 

匹配「x.」或「 .B」的全部消息將被傳遞到「XXX_my_id」隊列。 而後,RabbitMQ將向兩個消費者傳遞消息,其中handlerOfXDotStar和handlerOfStarDotB輪流獲取每條消息。

如今,若是你想要匹配多個主題(「X. 」OR「 .B」),你可使用另外一個重載的訂閱方法,它採用多個主題,以下所示:

bus.Subscribe("my_id", handler, x => x.WithTopic("X.*").WithTopic("*.B")); 

/// <summary> /// 獲取主題 /// </summary> /// <typeparam name="T">主題內容類型</typeparam> /// <param name="subscriptionId">訂閱者ID</param> /// <param name="callback">消息接收響應回調</param> /// <param name="topics">訂閱主題集合</param> public static void TopicSubscribe<T>(string subscriptionId, Action<T> callback, params string[] topics) where T : class { var bus = BusBuilder.CreateMessageBus(); bus.Subscribe(subscriptionId, callback, (config) => { foreach (var item in topics) config.WithTopic(item); }); } /// <summary> /// 發佈主題 /// </summary> /// <typeparam name="T">主題內容類型</typeparam> /// <param name="topic">主題名稱</param> /// <param name="message">主題內容</param> /// <param name="msg">錯誤信息</param> /// <returns></returns> public static bool TopicPublish<T>(string topic, T message, out string msg) where T : class { msg = string.Empty; try { using (var bus = BusBuilder.CreateMessageBus()) { bus.Publish(message, topic); return true; } } catch (Exception ex) { msg = ex.ToString(); return false; } } /// <summary> /// 發佈主題 /// </summary> /// <![CDATA[(topic EasyNetQ高級API)]]> /// <typeparam name="T">消息類型</typeparam> /// <param name="t">消息內容</param> /// <param name="topic">主題名</param> /// <param name="msg">錯誤信息</param> /// <param name="exChangeName">交換器名</param> /// <returns></returns> public static bool TopicSub<T>(T t, string topic, out string msg, string exChangeName = "topic_mq") where T : class { msg = string.Empty; try { if (string.IsNullOrWhiteSpace(topic)) throw new Exception("推送主題不能爲空"); using (var bus = BusBuilder.CreateMessageBus()) { var adbus = bus.Advanced; //var queue = adbus.QueueDeclare("user.notice.zhangsan"); var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Topic); adbus.Publish(exchange, topic, false, new Message<T>(t)); return true; } } catch (Exception ex) { msg = ex.ToString(); return false; } } /// <summary> /// 獲取主題 /// </summary> /// <![CDATA[(topic EasyNetQ高級API)]]> /// <typeparam name="T">消息類型</typeparam> /// <param name="subscriptionId">訂閱者ID</param> /// <param name="callback">回調</param> /// <param name="exChangeName">交換器名</param> /// <param name="topics">主題名</param> public static void TopicConsume<T>(Action<T> callback, string exChangeName = "topic_mq",string subscriptionId = "topic_subid", params string[] topics) where T : class { var bus = BusBuilder.CreateMessageBus(); var adbus = bus.Advanced; var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Topic); var queue = adbus.QueueDeclare(subscriptionId); foreach (var item in topics) adbus.Bind(exchange, queue, item); adbus.Consume(queue, registration => { registration.Add<T>((message, info) => { callback(message.Body); }); }); } 

具體發佈/訂閱消息的Demo和相關測試看源碼Demo image爲了方便使用,demo改進版本: image

注意

當在建立訂閱者去消費隊列的時候

/// <summary> /// 獲取主題 /// </summary> /// <param name="topic"></param> public static void GetSub<T>(T topic, Action<T> callback) where T : class { using (var bus = BusBuilder.CreateMessageBus()) { bus.Subscribe<T>(topic.ToString(), callback, x => x.WithTopic(topic.ToString())); } } 

using裏的對象在執行完成後被回收了,致使剛鏈接上去就又斷開了(剛開始寫的時候,習慣性加using,排查了很久才發現,欲哭無淚)

源碼項目運行前的準備與確認:

到RabbitMQ管理後臺添加TestQueueVHost,而且分配用戶權限,而後到RabbitMQHelper.BusBuilder類裏配置RabbitMQ鏈接服務的相關信息host=127.0.0.1:5672;virtualHost=TestQueue;username=sa;password=123456,(根據配置的內容和用戶修改)

image


參考資料(鳴謝):


附:Demo源碼GitHub地址

相關文章
相關標籤/搜索