一個winform帶你玩轉rabbitMQ

源碼已放出 https://github.com/dubing/MaoyaRabbithtml

本章分3部分node

1、安裝部署初探git

2、進階github

3、api相關json


安裝 部署 初探

先上圖windows


一. 安裝部署api

  下載 rabbitMQ :http://www.rabbitmq.com/download.html安全

  安裝rabbitmq須要erlang,下載erlang:http://www.erlang.org/download.html服務器

  按照官網按照步驟,例如windows http://www.rabbitmq.com/install-windows.html app

  安裝完rabbitMQ能夠再啓動插件擴展,其中包含了一個管理後臺

  

  最新版本的後臺地址爲 http://localhost:15672/ 

  

  用戶名和密碼都爲guest,輸入完成進入主菜單

  

  功能很豐富,能夠查看當前服務器的交換機,隊列,消息,鏈接,會話等得使用狀況。

  基本上到這裏服務器的安裝部署環節算是ok,很簡單。


 

二.  簡介

  要了解rabbitMQ 首先要了解AMQP協議 百科上給的很詳細 http://baike.baidu.com/view/4023136.htm?fr=aladdin

  AMQP 有四個很是重要的概念:虛擬機(virtual host),通道(exchange),隊列(queue)和綁定(binding)。

  虛擬機: 一般是應用的外在邊界,咱們能夠爲不一樣的虛擬機分配訪問權限。虛擬機可持有多個交換機、隊列和綁定。
  交換機: 從鏈接通道(Channel)接收消息,並按照特定的路由規則發送給隊列。
  隊列: 消息最終的存儲容器,直到消費客戶端(Consumer)將其取走。
  綁定: 也就是所謂的路由規則,告訴交換機將何種類型的消息發送到某個隊列中。

  這個概念很重要 否則在學習rabbitmq的地方會碰到不少困難。想要進階學習的能夠參考 https://www.rabbitmq.com/tutorials/amqp-concepts.html

  借用官方一個圖來闡述AMQP

  

  RabbitMQ是一個消息代理。它的核心原理很是簡單:接收和發送消息。

  你能夠把它想像成一個郵局:你把信件放入郵箱,郵遞員就會把信件投遞到你的收件人處。在這個比喻中,RabbitMQ是一個郵箱、郵局、郵遞員。RabbitMQ和郵局的主要區別是,它處理的不是紙,而是接收、存儲和發送二進制的數據——消息。

  對於rabbitMQ自己的特色 參考官網 http://www.rabbitmq.com/features.html

  一、可靠性(Reliability)
  RabbitMQ提供不少特性供咱們能夠在性能和可靠性做出折中的選擇,包括持久化、發送確認、發佈者確認和高可用性等。
  二、彈性選路(Flexible Routing)
  消息在到達隊列前經過交換(exchanges)來被選路。RabbitMQ爲典型的選路邏輯設計了幾個內置的交換類型。對於更加複雜的選路,咱們能夠將exchanges綁定在一塊兒或者寫屬於本身的exchange類型插件。
  三、集羣化(Clustering)
  在一個局域網內的幾個RabbitMQ服務器能夠集羣起來,組成一個邏輯的代理人。
  四、聯盟(Federation)
  對於那些須要比集羣更加鬆散和非可靠鏈接的服務器來講,RabbitMQ提供一個聯盟模型(Federation Model)
  五、高可用隊列(High Available Queue)
  能夠在一個集羣裏的幾個機器裏對隊列作鏡像,確保即時發生了硬件失效,你的消息也是安全的。
  六、多客戶端(Many Clients)
  有各類語言的RabbitMQ客戶端
  七、管理UI(Management UI)
  RabbitMQ提供一個易用的管理UI來監控和控制消息代理人的各個方面。
  八、跟蹤(Tracing)
  若是你的消息系統行爲異常,RabbitMQ提供跟蹤支持來找出錯誤的根源。
  九、插件系統(Plugin System)
  RabbitMQ提供各類方式的插件擴展,咱們能夠實現本身的插件。

  使用任務隊列一個優勢是可以輕易地並行處理任務。當處理大量積壓的任務,只要增長工做隊列,經過這個方式,可以實現輕易的縮放。


 

三. 初探

  文中的winform所採起的client爲官方的.net版本 https://github.com/rabbitmq/rabbitmq-dotnet-client

  首先是Connection和Channel的概念

  Connection 創建與rabbitmq server的一個鏈接,由ConnectionFactory建立,Channel創建在connection基礎上的一個頻道,相對於connection來講,它是輕量級的。能夠理解成一次會話。

  代碼示例 本機環境

                using (IConnection connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
			//do something
                    }
                }

  exchange經常使用有三種類型:

  Direct :處理路由鍵。須要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵徹底匹配。這是一個完整的匹配。
  Fanout :不處理路由鍵。你只須要簡單的將隊列綁定到交換機上。一個發送到交換機的消息都會被轉發到與該交換機綁定的全部隊列上。
  Topic : 將路由鍵和某模式進行匹配。此時隊列須要綁定要一個模式上。符號「#」匹配一個或多個詞,符號「*」匹配很少很多一個詞。

  還有一種多重屬性的類型headers,咱們在下一章節討論。

  咱們用winform分別形成三種類型的exchange來實際體驗一下

  

  這裏所謂的限定exchange是在咱們安裝rabbitmq server的時候自動生成的一些 咱們的測試不使用這些exchange。

  而後咱們新建3個Queue,這裏咱們會發現一個有趣的現象,rabbitmq server對於新生成的隊列都會默認綁定在一個名稱爲「」的默認exchange上。

  先試試direct類型,下面咱們分別把Q1,Q2,Q3根據路由key爲空,k1,k.#綁定在dEx上(direct exchange)。
    

  而後咱們根據路由key爲空,k,k1,k2,k3來發送消息m1,m2,m3,m4,m5
    

  再用3個隊列接收消息試一下結果
  

  由於發送確認標記ack,因此隊列上讀取過的消息會被刪除,爲了進一步認證,我在結尾又添加了一個routingkey爲k.#的消息(對應綁定Q3),由圖可見direct 模式下隊列之收取他們徹底對應的routingkey消息。

  下面咱們再試一下fanout類型,把Q1,Q2,Q3根據路由key爲空,k1,k.#綁定在fEx上(fanout exchange)。

  
  同上步驟創建綁定關係

  

  生產消息,而後看下隊列接受消息的狀況
  

  效果很明顯,fanout爲廣播模式。

  再試試topic類型 把Q1,Q2,Q3根據路由key爲空,k1,k.#綁定在tEx上(topic exchange)。

  

  推送消息
  

  接收消息
  
  經過3種模式 3個隊列的消息讀取 你們應該瞭解了這3中模式的區別。


進階

一.  exchange屬性

  Type

  前一章咱們說了exchange的類型分爲fanout,direct,topic.還有一種不經常使用的headers。
  headers這種類型的exchange綁定的時候會忽略掉routingkey,Headers是一個鍵值對,能夠定義成成字典等。發送者在發送的時候定義一些鍵值對,接收者也能夠再綁定時候傳入一些鍵值對,二者匹配的話,則對應的隊列就能夠收到消息。匹配有兩種方式all和any。這兩種方式是在接收端必需要用鍵值"x-mactch"來定義。all表明定義的多個鍵值對都要知足,而any則代碼只要知足一個就能夠了。以前的幾種exchange的routingKey都須要要字符串形式的,而headers exchange則沒有這個要求,由於鍵值對的值能夠是任何類型
  舉個例子,發送端定義2個鍵值{k1,1},{k2,2},接收端綁定隊列的時候定義{"x-match", "any"},那麼接收端的鍵值屬性裏只要存在{k1,1}或{k2,2}均可以獲取到消息。
  這樣的類型擴展的程度很大,適合很是複雜的業務場景。

  Durability

  持久性,這是exchange的可選屬性,若是你Durability設置爲false,那些當前會話結束的時候,該exchange也會被銷燬。 
  新建一個transient exchange 
  
  關閉當前鏈接再查看一下
  

  

  剛纔咱們新建的transient已經銷燬了。

  Auto delete

  當沒有隊列或者其餘exchange綁定到此exchange的時候,該exchange被銷燬。這個很簡單就不示例了。

  Internal (比較簡單 也不展現了)

  表示這個exchange不能夠被client用來推送消息,僅用來進行exchange和exchange之間的綁定。

  PS: 沒法聲明2個名稱相同 可是類型卻不一樣的exchange

  


二.  Queue屬性  

  Durability 和exchange相同,未持久化的隊列,服務重啓後銷燬。

  Auto delete 當沒有消費者鏈接到該隊列的時候,隊列自動銷燬。

  Exclusive 使隊列成爲私有隊列,只有當前應用程序可用,當你須要限制隊列只有一個消費者,這是頗有用的。

  擴展屬性以下對應源程序 RabbitMQ.Client.IModel.QueueDeclare(string, bool, bool, bool, System.Collections.Generic.IDictionary<string,object>)最後的參數

  Message TTL 當一個消息被推送在該隊列的時候 能夠存在的時間 單位爲ms,(對應擴展參數argument "x-message-ttl" )

  Auto expire 在隊列自動刪除以前能夠保留多長時間(對應擴展參數argument "x-expires")

  Max length 一個隊列能夠容納的已準備消息的數量(對應擴展參數argument "x-max-length")

  ... 更多參考 http://www.rabbitmq.com/extensions.html

  ps:一旦建立了隊列和交換機,就不能修改其標誌了。例如,若是建立了一個non-durable的隊列,而後想把它改變成durable的,惟一的辦法就是刪除這個隊列而後重現建立。


三.  Message屬性

  Durability 

  消息的持久在代碼中設置的方法與exchange和queue不一樣,有2種方法

  1.

  IBasicProperties properties = channel.CreateBasicProperties();
  properties.SetPersistent(true);
  byte[] payload = Encoding.ASCII.GetBytes(message);
  channel.BasicPublish(exchange.name, txtMessageRoutingKey.Text.Trim(), properties, payload);

  2.

  IBasicProperties properties = channel.CreateBasicProperties();
  properties.DeliveryMode = 2;
  byte[] payload = Encoding.ASCII.GetBytes(message);
  channel.BasicPublish(exchange.name, txtMessageRoutingKey.Text.Trim(), properties, payload);

  contentType: 標識消息內容的MIME,例如JSON用application/json

  replayTo: 標識回調的queue的地址

  correlationId:用於request和response的關聯,確保消息的請求和響應的同一性

  Message的2種狀態:

  Ready

  此狀態的消息存在於隊列中待處理。

  Unacknowledged

  此狀態的消息表示已經在處理未確認。

  說到Unacknowledged,這裏須要瞭解一個ack的概念。當Consumer接收到消息、處理任務完成以後,會發送帶有這個消息標示符的ack,來告訴server這個消息接收到並處理完成。RabbitMQ會一直等處處理某個消息的Consumer的連接失去以後,才肯定這個消息沒有正確處理,從而RabbitMQ重發這個消息。
  Message acknowledgment是默認關閉的。初始化Consumer時有個noAck參數,若是設置爲true,這個Consumer在收到消息以後會立刻返回ack。

  string BasicConsume(string queue, bool noAck, RabbitMQ.Client.IBasicConsumer consumer)

  通常來講,經常使用的場景noack通常就是設置成true,可是對於風險要求比較高的項目,例如支付。對於每一條消息咱們都須要保證他的完整性和正確性。就須要獲取消息後確認執行完正確的業務邏輯後再主動返回一個ack給server。能夠經過rabbitmqctl list_queues name message_rady message_unacknowleded 命令來查看隊列中的消息狀況,也能夠經過後臺管理界面。

  咱們先hold住一條消息

  

  而後咱們再關閉連接或者重啓服務

  
  數據仍是完整的。 

  ps:message的消費還分爲consume和baseget 下面講到集羣的時候再介紹。


四.  binding相關

  若是你綁定了一個durable的隊列和一個durable的交換機,RabbitMQ會自動保留這個綁定。相似的,若是刪除了某個隊列或交換機(不管是否是 durable),依賴它的綁定都會自動刪除。

  在聲明一個隊列的同時,server會默認讓此隊列綁定在默認的exchange上,這個exchange的名稱爲空。

   


 五.  發佈訂閱

  咱們上一章的demo中實際上已經使用了發佈訂閱模式。

  RabbitMQ消息模型的核心理念是:發佈者(producer)不會直接發送任何消息給隊列。事實上,發佈者(producer)甚至不知道消息是否已經被投遞到隊列。發佈者(producer)只須要把消息發送給一個exchange。exchange很是簡單,它一邊從發佈者方接收消息,一邊把消息推入隊列。exchange必須知道如何處理它接收到的消息,是應該推送到指定的隊列仍是是多個隊列,或者是直接忽略消息。這些規則是經過exchange type來定義的。

  

  發佈訂閱其實很簡單,例如上章我所示例,假設咱們一開始沒有任何消息,如今有一個生產者P1,他是一個天氣預報播放者。而後咱們有2個消費者來訂閱他的消息。
  P1經過廣播類型的交換機fEx來發布他的天氣消息,c1,c2分別創建一個隊列爲Q1,Q2. 而且訂閱P1的fEx.

  基本能夠如圖所示
  
  咱們P1利用fEx生成一條消息的時候,c1,c2經過Q1,Q2均可以獲取到p1所發佈的消息

  咱們發佈3條消息
  
  查看隊列狀況
  Q1:
  
  Q2:

  

  Q1,Q2都拿到了廣播的消息,至於C1,C2如何消費這些消息,互相之間徹底沒有干擾。

  ps:簡單一句話 發佈訂閱中發佈者所產生的消息經過exchange對全部綁定他的隊列隊形消息推送,每一個隊列獲取綁定所對應的消息


六.  WorkQueue (可用於消費者集羣)

  區分於發佈訂閱,消費者集羣主要解決橫向服務器擴展問題,若是一個隊列積壓太多,如何均與的讓不一樣的消費者來承擔。

  

  默認來講,RabbitMQ會按順序得把消息發送給每一個消費者(consumer)。平均每一個消費者都會收到同等數量得消息。這種發送消息得方式叫作——輪詢(round-robin)。

  咱們開3個程序,1個生產 2個消費。

  如圖所示綁定關係以下

  

  2個消費者用一樣的程序,這裏記錄進程pid以區分,實際項目中能夠用不一樣服務器來區分

  

   啓動消息消費,使消費者處理work狀態

  

  而後咱們不停的經過生產者這發佈消息

  

  而後咱們看下2個消費者的消費狀況

  1.

  

  2.
  

  3.
  

  4.
  

  5.
  
  

  默認地,RabbitMQ會逐一地向下一個Consumer發放消息,每個Consumer會獲得數目相同的消息。文中所示之因此是按照1條一條的輪詢,是由於程序中控制了一個隊列單次消費的數量。

  void BasicQos(uint prefetchSize, ushort prefetchCount, bool global)


API CommandLine 以及其餘功能

RabbitMQ API

  RabbitMQ Server提供了豐富的http api。

  舉個列子

  

  須要HTTP基自己份驗證。默認的用戶名/密碼爲guest/guest。

  這些返回值得意義我從官網搬來解釋,爲了不翻譯的問題致使你們理解的偏差這裏直接給出原文

cluster_name The name of the entire cluster, as set with rabbitmqctl set_cluster_name.
erlang_full_version A string with extended detail about the Erlang VM and how it was compiled, for the node connected to.
erlang_version A string with the Erlang version of the node connected to. As clusters should all run the same version this can be taken as representing the cluster.
exchange_types A list of all exchange types available.
listeners All (non-HTTP) network listeners for all nodes in the cluster. (See contexts in /api/nodes for HTTP).
management_version Version of the management plugin in use.
message_stats A message_stats object for everything the user can see - for all vhosts regardless of permissions in the case of monitoring and administrator users, and for all vhosts the user has access to for other users.
node The name of the cluster node this management plugin instance is running on.
object_totals An object containing global counts of all connections, channels, exchanges, queues and consumers, subject to the same visibility rules as for message_stats.
queue_totals An object containing sums of the messagesmessages_ready and messages_unacknowledged fields for all queues, again subject to the same visibility rules as for message_stats.
rabbitmq_version Version of RabbitMQ on the node which processed this request.
statistics_db_node Name of the cluster node hosting the management statistics database.
statistics_level Whether the node is running fine or coarse statistics.

  又或者經過api查詢虛擬主機
  
  許多api的URI須要一個虛擬主機路徑的一部分的名字,由於名字只有惟一在一個虛擬主機識別物體。做爲默認的虛擬主機稱爲「/」,這​​將須要被編碼爲「%2F」。

  在個人demo程序中對應的api功能能夠經過這裏的功能來實現

  

  其更豐富的功能能夠參考官網說明文檔 http://hg.rabbitmq.com/rabbitmq-management/raw-file/3646dee55e02/priv/www-api/help.html

  以及 http://hg.rabbitmq.com/rabbitmq-management/raw-file/rabbitmq_v3_3_5/priv/www/api/index.html

  通常來講咱們經常使用的我在應用程序中已經給出 例如查看全部隊列等

  


 RabbitMQ CommandLine

  除了豐富的http api,rabbitmq server天然也有其很全面命令行。

  例如查詢全部exchange。

  

  查詢全部隊列以及他們包含的消息數目

   

  rabbitmqctl更多的命令說明參考 http://www.rabbitmq.com/man/rabbitmqctl.1.man.html


Message的BasicGet於consume的區別

   consume的功能上一張介紹過,basicget更偏向於咱們平時用過的其餘類型的MessageQueue,它就是最基本的接受消息,consume的消費針對basicget來講屬於一個長鏈接於短鏈接的區別。

消費者關係一旦肯定,基本上默認它就是在偵聽通道的消息是否在生產。而basicget則是由客戶端手動來控制。

  在demo中在下圖所示處區分

  

  若是你選擇了消費消息,那麼基本上代碼層面是這樣來完成的

                    var consumer = new QueueingBasicConsumer(channel);
                    channel.BasicQos(0, 1, false);
                    channel.BasicConsume(queue.name, rbAckTrue.Checked, consumer);
                    while (true)
                    {
                        var e = consumer.Queue.Dequeue();
                        MessageBox.Show(string.Format("隊列{0}獲取消息{1},線程id爲{2}", queue.name, Encoding.ASCII.GetString(e.Body), Process.GetCurrentProcess().Id));
                        Thread.Sleep(1000);
                    }

本篇先到此,但願對你們有幫助  

相關文章
相關標籤/搜索