RabbitMQ入門

什麼是RabbitMQ?

AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。java

消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。
AMQP的主要特徵是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性、安全。正則表達式

RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。安全

做用

能夠將一些無需即時返回且耗時的操做提取出來,進行異步處理(好比咱們項目訂單系統中出票成功後記報表、發短信等操做)服務器

這樣能夠減輕服務器壓力,大大節省服務器的請求響應時間,同時利於系統解耦。架構

系統架構

rabbitmq系統架構圖

生產者發送消息

核心概念exchange、route、queue

rabbitmq_exchange類型

生產者在發送消息時,都須要指定一個RoutingKey和Exchange,Exchange在接到該RoutingKey之後,會判斷該ExchangeType:異步

  1. 若是是Direct類型,則會將消息中的RoutingKey與該Exchange關聯的全部Binding中的BindingKey進行比較,若是相等,則發送到該Binding對應的Queue中。分佈式

  2. 若是是Fanout類型,則會將消息發送給全部與該Exchange定義過Binding的全部Queues中去,實際上是一種廣播行爲。ui

  3. 若是是Topic類型,則會按照正則表達式,對RoutingKey與BindingKey進行匹配,若是匹配成功,則發送到對應的Queue中。spa

發送消息(java)

  1. 創建鏈接設計

    ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername(userName);
    factory.setPassword(password);
    factory.setVirtualHost(virtualHost);
    factory.setHost(hostName);
    factory.setPort(portNumber);
    Connection conn = factory.newConnection();
  2. 建立通道

    Channel channel = conn.createChannel();
  3. 申明exchange、queue以及它們之間的綁定關係

    channel.exchangeDeclare(exchangeName, "direct", true);
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, exchangeName, routingKey);
  4. 發送消息

    byte[] messageBodyBytes = "Hello, world!".getBytes();
    channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

消費者消費消息

  1. 一種是經過basic.consume命令,訂閱某一個隊列中的消息,Channel會自動在處理完上一條消息以後,接收下一條消息(同一個Channel消息處理是串行的)。除非關閉Channel或者取消訂閱,不然客戶端將會一直接收隊列的消息。

  2. 另一種方式是經過basic.get命令主動獲取隊列中的消息,可是絕對不能夠經過循環調用basic.get來代替basic.consume,這是由於basic.get在實際執行的時候,是首先Consume某一個隊列,而後檢索第一條消息,而後再取消訂閱。若是是高吞吐率的消費者,最好仍是建議使用basic.consume。

若是有多個消費者同時訂閱同一個隊列的話,RabbitMQ是採用循環的方式分發消息的,每一條消息只能被一個訂閱者接收。例如,有隊列Queue,其中ClientA和ClientB都Consume了該隊列,MessageA到達隊列後,被分派到ClientA,ClientA回覆服務器收到響應,服務器刪除MessageA;再有一條消息MessageB抵達隊列,服務器根據「循環推送」原則,將消息會發給ClientB,而後收到ClientB的確認後,刪除MessageB;等到再下一條消息時,服務器會再將消息發送給ClientA。

這裏咱們能夠看出,消費者再接到消息之後,都須要給服務器發送一條確認命令,這個既能夠在handleDelivery裏顯示的調用basic.ack實現,也能夠在Consume某個隊列的時候,設置autoACK屬性爲true實現。這個ACK僅僅是通知服務器能夠安全的刪除該消息,而不是通知生產者。 若是消費者在接到消息之後還沒來得及返回ACK就斷開了鏈接,消息服務器會重傳該消息給下一個訂閱者,若是沒有訂閱者就會存儲該消息。

既然RabbitMQ提供了ACK某一個消息的命令,固然也提供了Reject某一個消息的命令。當客戶端發生錯誤,調用basic.reject命令拒絕某一個消息時,能夠設置一個requeue的屬性,若是爲true,則消息服務器會重傳該消息給下一個訂閱者;若是爲false,則會直接刪除該消息。固然,也能夠經過ack,讓消息服務器直接刪除該消息而且不會重傳。

相關文章
相關標籤/搜索