RabbitMQ(一)

系列說明

本系列主要講解RabbitMQ,講解其特性,例如消息持久化、消息TTL、消息的優先、延遲消息、消息可靠性、消費模式以及在Spring Boot中使用RabbitMQ,代碼在個人Github上java

RabbitMQ介紹

RabbitMQ使用Erlang語言開發基於AQMP協議的開源消息隊列,RabbitMQ主要有如下特色:git

  • 高可靠性: RabbitMQ依靠消息確認、持久化等實現高可靠性,但其吞吐量不過高
  • 高可用: RabbitMQ支持分佈式部署,多個RabbitMQ服務器組成一個集羣造成一個邏輯Broker
  • 多種協議支持: RabbitMQ基於AQMP協議,可是能夠經過安裝插件支持其它協議,例如STOMP、MQTT協議等
  • 多種客戶端語言支持: RabbitMQ提供Java、C++等多種客戶端語言支持
  • 管理頁面: RabbitMQ提供Web管理頁面以即可視化管理

AQMP

RabbitMQ基於AQMP協議開發的消息隊列,AQMP協議在以前消息隊列(一)中已經簡單的介紹了,這裏就簡單的介紹一下:github

圖來自網絡

  • Broker: Broker指的是代理消息隊列,是一個邏輯概念,指的是RabbitMQ服務器,其能夠有多個Vritual Host組成。
  • Virtual Host: Vritual Host是一個虛擬概念,相似於權限控制組,一個Vritual Host能夠有若干Exchange和Queue,權限控制的最小粒度是Virtual Host
  • Exchange: Exchange叫交換機,其能夠多個Queue根據路由規則(Routing Key)綁定。Exchange接收生產者發送的消息,根據其類型(ExchangeType)和路由規則(Routing Key)把消息發送給隊列。
  • Bingding: Binding聯繫Exchange和Queue
  • Connection: Connection在RabbitMQ中是一個客戶端和Broker之間的TCP鏈接
  • Channel: Channel在RabbitMQ中叫作信道,有Connection建立,而且一個Connection能夠建立多個Channel。在RabbitMQ必須經過Channel才能發送消息。之因此須要Channel,主要由於TCP鏈接過於昂貴。

須要注意的地方:編程

  • 若是消息發送到Exchange後,Exchange不能經過路由規則找到合適的隊列,該消息將會被刪除
  • RabbitMQ建議客戶端線程之間不要共用Channel,而是共用Connection

RabbitMQ使用

RabbitMQ-Java官方提供了簡單的使用教程,這裏就簡單的提一下,具體可見其網友翻譯版本:RabbitMQ入門教程服務器

這裏展現的是RabbitMQ發送消息網絡

public class Sender {

  private static final String EXCHANGE_NAME = "log";

  private static final String ROUTING_KEY = "info";

  private static final String MESSAGE = "hello world!";

  public static void main(String[] args) throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("47.100.20.186");
    factory.setPort(5672);
    factory.setUsername("guest");
    factory.setPassword("guest");
    // 經過鏈接工廠建立鏈接
    Connection connection = factory.newConnection();
    // 經過Connection建立Channel
    Channel channel = connection.createChannel();
    // 聲明Exchange:名稱及其類型,該操做一樣是冪等的,如何聲明對隊列同樣
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    // 經過Channel向Exchange發送消息和Routing Key,而且配置了BasicProperties(消息屬性)
    channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,
        MessageProperties.PERSISTENT_BASIC, MESSAGE.getBytes(StandardCharsets.UTF_8));
    // 關閉Channel和Connection
    channel.close();
    connection.close();
  }
}

這裏展現使用RabbitMQ接收消息app

public class Receiver {

  private static final String QUEUE_NAME = "log_info_queue";

  private static final String EXCHANGE_NAME = "log";

  private static final String ROUTING_KEY = "info";

  public static void main(String[] args) throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("47.100.20.186");
    factory.setPort(5672);
    factory.setUsername("guest");
    factory.setPassword("guest");
    // 經過鏈接工廠建立鏈接
    Connection connection = factory.newConnection();
    // 經過Connection建立Channel
    Channel channel = connection.createChannel();
    // 聲明一個隊列 -- 在RabbitMQ中,隊列聲明是冪等性的
    // 一個冪等操做的特色是其任意屢次執行所產生的影響均與一次執行的影響相同
    // 也就是說,若是不存在,就建立,若是存在,不會對已經存在的隊列產生任何影響
    // 可是若是聲明時修改已存在隊列的屬性,則會拋出異常
    channel.queueDeclare(QUEUE_NAME, false, false , false, null);
    // 把Queue和Exchange經過Routing Key綁定
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
    // 設置該消費者預讀取消息數量:這裏主要是考慮到慢消費的問題,這裏使用PUSH模型,服務器推消息給客戶端,
    // 可能會致使消息堆積,設置預讀取數量後,服務器會發送指定數量消息後等待前面消息ACK後纔會繼續發送消息
    channel.basicQos(1);
    // 接收消息:這裏使用自動ACK,固然也能夠獲取消息後手動ACK
    String s = channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, StandardCharsets.UTF_8);
      }
    });
  }
}

RabbitMQ的消息傳遞模型

RabbitMQ主要經過ExchangeType來設置消息傳遞模型,主要有下面4種模型,其中Header模型用的少:異步

  • Direct模型
  • Fanout模型
  • Topic模型
  • Header模型

Direct模型

Direct模型顧名思義指的是直接鏈接,只有當消息中的Routing Key與Queue綁定到Exchange的Routing Key一致,纔會轉發消息給該Queue
mark分佈式

Fanout模型

Fanout模型相似於訂閱/發佈模型,Exchange會把消息轉發給全部綁定到該Exchange上的Queue
markide

Topic模型

Topic模型類型與Servlet的URL匹配模型,其會匹配消息的Routing Key和Queue綁定到Exchange的Routing Key,使用通配符匹配。有#和兩種通配符,#表明0個或多個字符,表明1個字符
mark

RabbitMQ的持久化

首先RabbitMQ的持久化是異步持久化模型,也就是說在特定狀況下,可能形成消息丟失。好比在RabbitMQ Server回調RabbitMQ Producer Client的接口代表已經接收到該消息,可是因爲是異步持久化可能尚未把消息持久化到磁盤中,這時候MQ-Server斷電就會致使消息的丟失

RabbitMQ中消息的持久化須要保證Exchange、Queue、Message都進行持久化操做。須要注意的是:Exchange、Queue的聲明時冪等的。冪等指說屢次聲明產生的結果都是同樣,也就是說若是其不存在則建立,存在則返回且不會對其產生任何影響,可是若是聲明已存在的隊列,且其屬性不一樣則會拋出異常。

Exchange的持久化

RabbitMQ聲明Exchange有幾種方法,但主要使用下面方法,其中第三個參數表示是否將該Exchange持久化

/**
 * Actively declare a non-autodelete exchange with no extra arguments
 * @param exchange the name of the exchange
 * @param type the exchange type
 * @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
 */
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;

Queue的持久化

RabbitMQ聲明Queue與Exchange的方法類型,一樣使用durable參數表示是否將該Queue進行持久化操做,下面是其中一個方法

/**
 * Declare a queue
 * @param queue the name of the queue
 * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
 * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
 * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
 * @param arguments other properties (construction arguments) for the queue
 */
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;

部分參數說明:

  • exclusive:代表該隊列是不是排它隊列。若是一個隊列被聲明爲排他隊列,該隊列僅對首次申明它的鏈接可見,並在鏈接斷開時自動刪除。這裏須要注意三點:1. 排他隊列是基於鏈接可見的,同一鏈接的不一樣信道是能夠同時訪問同一鏈接建立的排他隊列;2.「首次」,若是一個鏈接已經聲明瞭一個排他隊列,其餘鏈接是不容許創建同名的排他隊列的,這個與普通隊列不一樣;3.即便該隊列是持久化的,一旦鏈接關閉或者客戶端退出,該排他隊列都會被自動刪除的,這種隊列適用於一個客戶端發送讀取消息的應用場景。
  • autoDelete:代表該隊列是否自動刪除。自動刪除,若是該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列。

Message的持久化

消息的持久化須要在生產者發送消息時設置消息屬性,以代表該消息時持久化消息。下面是消息發送的一個API

/**
 * Publish a message.
 *
 * Publishing to a non-existent exchange will result in a channel-level
 * protocol exception, which closes the channel.
 *
 * @param exchange the exchange to publish the message to
 * @param routingKey the routing key
 * @param props other properties for the message - routing headers etc
 * @param body the message body
 */
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

部分參數說明:

  • exchange:表示該消息發送到哪一個Exchange
  • routingKey:表示該消息的Routing Key
  • props:表示該消息的屬性
  • body:消息實體

BasicProperties定義以下:

public BasicProperties(
            String contentType,//消息類型如:text/plain
            String contentEncoding,//編碼
            Map<String,Object> headers,
            Integer deliveryMode,//1:nonpersistent 2:persistent
            Integer priority,//優先級
            String correlationId,
            String replyTo,//反饋隊列
            String expiration,//expiration到期時間
            String messageId,
            Date timestamp,
            String type,
            String userId,
            String appId,
            String clusterId)

其中RabbitMQ提供了屬性實現已經更簡單的配置消息屬性:

/** Empty basic properties, with no fields set */
BasicProperties.MINIMAL_BASIC
/** Empty basic properties, with only deliveryMode set to 2 (persistent) */
BasicProperties.MINIMAL_PERSISTENT_BASIC
/** Content-type "application/octet-stream", deliveryMode 1 (nonpersistent), priority zero */
BasicProperties.BASIC
/** Content-type "application/octet-stream", deliveryMode 2 (persistent), priority zero */
BasicProperties.PERSISTENT_BASIC
/** Content-type "text/plain", deliveryMode 1 (nonpersistent), priority zero */
BasicProperties.TEXT_PLAIN
/** Content-type "text/plain", deliveryMode 2 (persistent), priority zero */
BasicProperties.PERSISTENT_TEXT_PLAIN

固然可使用時編程自定義設置消息屬性:

AMQP.BasicProperties.Builder builder = new Builder();
BasicProperties properties = builder
    .deliveryMode(2)
    .build();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, MESSAGE.getBytes(StandardCharsets.UTF_8));

消息何時刷到磁盤(持久化)

寫入文件前會有一個Buffer,大小爲1M,數據在寫入文件時,首先會寫入到這個Buffer,若是Buffer已滿,則會將Buffer寫入到文件(未必刷到磁盤)。
有個固定的刷盤時間:25ms,也就是無論Buffer滿不滿,每一個25ms,Buffer裏的數據及未刷新到磁盤的文件內容一定會刷到磁盤。
每次消息寫入後,若是沒有後續寫入請求,則會直接將已寫入的消息刷到磁盤:使用Erlang的receive x after 0實現,只要進程的信箱裏沒有消息,則產生一個timeout消息,而timeout會觸發刷盤操做。

TTL

TTL(Time To Live)表示存活時間。RabbitMQ中能夠對Queue和Message設置TTL,以控制Queue和Message的存活時間。

Queue TTL

隊列的存活時間指的是Queue在自動刪除前能夠處於未使用狀態的時間。未使用狀態指的是Queue上沒有Consumer、Queue沒有被從新聲明。隊列的存活時間在隊列第一次聲明時經過指定隊列的屬性"x-expires"指定,單位是毫秒,代碼以下:

Map<String, Object> queueArgs = new HashMap<>();
// 設置1分鐘過時
queueArgs.put("x-expires", 60000);
channel.queueDeclare("queue", false, false, false, queueArgs);

Message TTL

消息的存活時間指的是消息在隊列中的存活時間,超過該時間消息將被刪除或者不能傳遞給消費者。消息的存活時間能夠經過設置每條消息的存活時間或者設置某條隊列中的因此存活時間,當二者都有時,時間小的有效。

設置消息屬性
針對每條消息能夠在發送消息時設置消息屬性

// 設置消息屬性-TTL爲30s
BasicProperties properties = new BasicProperties.Builder()
    .expiration("30000").build();
channel.basicPublish("exchange", "kanyuxia", properties,
    "hello".getBytes(StandardCharsets.UTF_8));

設置隊列屬性
經過設置隊列中消息的TTL屬性,而後傳入該隊列的全部消息都有該TTL屬性

Map<String, Object> queueArgs = new HashMap<>();
queueArgs.put("x-message-ttl", 30000);
channel.queueDeclare("queue", false, false, false, queueArgs);

Reference

https://www.jianshu.com/p/64357bf35808?utm_campaign=maleskine&utm_content=note&utm_medium=seo_notes&utm_source=recommendation http://blog.csdn.net/wanbf123/article/details/78052419 http://blog.csdn.net/u013256816/article/details/60875666 http://blog.csdn.net/u013256816/article/details/54916011

相關文章
相關標籤/搜索