本系列主要講解RabbitMQ,講解其特性,例如消息持久化、消息TTL、消息的優先、延遲消息、消息可靠性、消費模式以及在Spring Boot中使用RabbitMQ,代碼在個人Github上java
RabbitMQ使用Erlang語言開發基於AQMP協議的開源消息隊列,RabbitMQ主要有如下特色:git
RabbitMQ基於AQMP協議開發的消息隊列,AQMP協議在以前消息隊列(一)中已經簡單的介紹了,這裏就簡單的介紹一下:github
須要注意的地方:編程
RabbitMQ-Java官方提供了簡單的使用教程,這裏就簡單的提一下,具體可見其網友翻譯版本:RabbitMQ入門教程服務器
這裏展現的是RabbitMQ發送消息網絡
public class Sender { private static final String EXCHANGE_NAME = "log"; private static final String ROUTING_KEY = "info"; private static final String MESSAGE = "hello world!"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("47.100.20.186"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); // 經過鏈接工廠建立鏈接 Connection connection = factory.newConnection(); // 經過Connection建立Channel Channel channel = connection.createChannel(); // 聲明Exchange:名稱及其類型,該操做一樣是冪等的,如何聲明對隊列同樣 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 經過Channel向Exchange發送消息和Routing Key,而且配置了BasicProperties(消息屬性) channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_BASIC, MESSAGE.getBytes(StandardCharsets.UTF_8)); // 關閉Channel和Connection channel.close(); connection.close(); } }
這裏展現使用RabbitMQ接收消息app
public class Receiver { private static final String QUEUE_NAME = "log_info_queue"; private static final String EXCHANGE_NAME = "log"; private static final String ROUTING_KEY = "info"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("47.100.20.186"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); // 經過鏈接工廠建立鏈接 Connection connection = factory.newConnection(); // 經過Connection建立Channel Channel channel = connection.createChannel(); // 聲明一個隊列 -- 在RabbitMQ中,隊列聲明是冪等性的 // 一個冪等操做的特色是其任意屢次執行所產生的影響均與一次執行的影響相同 // 也就是說,若是不存在,就建立,若是存在,不會對已經存在的隊列產生任何影響 // 可是若是聲明時修改已存在隊列的屬性,則會拋出異常 channel.queueDeclare(QUEUE_NAME, false, false , false, null); // 把Queue和Exchange經過Routing Key綁定 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); // 設置該消費者預讀取消息數量:這裏主要是考慮到慢消費的問題,這裏使用PUSH模型,服務器推消息給客戶端, // 可能會致使消息堆積,設置預讀取數量後,服務器會發送指定數量消息後等待前面消息ACK後纔會繼續發送消息 channel.basicQos(1); // 接收消息:這裏使用自動ACK,固然也能夠獲取消息後手動ACK String s = channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body, StandardCharsets.UTF_8); } }); } }
RabbitMQ主要經過ExchangeType來設置消息傳遞模型,主要有下面4種模型,其中Header模型用的少:異步
Direct模型顧名思義指的是直接鏈接,只有當消息中的Routing Key與Queue綁定到Exchange的Routing Key一致,纔會轉發消息給該Queue
分佈式
Fanout模型相似於訂閱/發佈模型,Exchange會把消息轉發給全部綁定到該Exchange上的Queue
ide
Topic模型類型與Servlet的URL匹配模型,其會匹配消息的Routing Key和Queue綁定到Exchange的Routing Key,使用通配符匹配。有#和兩種通配符,#表明0個或多個字符,表明1個字符
首先RabbitMQ的持久化是異步持久化模型,也就是說在特定狀況下,可能形成消息丟失。好比在RabbitMQ Server回調RabbitMQ Producer Client的接口代表已經接收到該消息,可是因爲是異步持久化可能尚未把消息持久化到磁盤中,這時候MQ-Server斷電就會致使消息的丟失
RabbitMQ中消息的持久化須要保證Exchange、Queue、Message都進行持久化操做。須要注意的是:Exchange、Queue的聲明時冪等的。冪等指說屢次聲明產生的結果都是同樣,也就是說若是其不存在則建立,存在則返回且不會對其產生任何影響,可是若是聲明已存在的隊列,且其屬性不一樣則會拋出異常。
RabbitMQ聲明Exchange有幾種方法,但主要使用下面方法,其中第三個參數表示是否將該Exchange持久化
/** * Actively declare a non-autodelete exchange with no extra arguments * @param exchange the name of the exchange * @param type the exchange type * @param durable true if we are declaring a durable exchange (the exchange will survive a server restart) */ Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;
RabbitMQ聲明Queue與Exchange的方法類型,一樣使用durable參數表示是否將該Queue進行持久化操做,下面是其中一個方法
/** * Declare a queue * @param queue the name of the queue * @param durable true if we are declaring a durable queue (the queue will survive a server restart) * @param exclusive true if we are declaring an exclusive queue (restricted to this connection) * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) * @param arguments other properties (construction arguments) for the queue */ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
部分參數說明:
消息的持久化須要在生產者發送消息時設置消息屬性,以代表該消息時持久化消息。下面是消息發送的一個API
/** * Publish a message. * * Publishing to a non-existent exchange will result in a channel-level * protocol exception, which closes the channel. * * @param exchange the exchange to publish the message to * @param routingKey the routing key * @param props other properties for the message - routing headers etc * @param body the message body */ void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
部分參數說明:
BasicProperties定義以下:
public BasicProperties( String contentType,//消息類型如:text/plain String contentEncoding,//編碼 Map<String,Object> headers, Integer deliveryMode,//1:nonpersistent 2:persistent Integer priority,//優先級 String correlationId, String replyTo,//反饋隊列 String expiration,//expiration到期時間 String messageId, Date timestamp, String type, String userId, String appId, String clusterId)
其中RabbitMQ提供了屬性實現已經更簡單的配置消息屬性:
/** Empty basic properties, with no fields set */ BasicProperties.MINIMAL_BASIC /** Empty basic properties, with only deliveryMode set to 2 (persistent) */ BasicProperties.MINIMAL_PERSISTENT_BASIC /** Content-type "application/octet-stream", deliveryMode 1 (nonpersistent), priority zero */ BasicProperties.BASIC /** Content-type "application/octet-stream", deliveryMode 2 (persistent), priority zero */ BasicProperties.PERSISTENT_BASIC /** Content-type "text/plain", deliveryMode 1 (nonpersistent), priority zero */ BasicProperties.TEXT_PLAIN /** Content-type "text/plain", deliveryMode 2 (persistent), priority zero */ BasicProperties.PERSISTENT_TEXT_PLAIN
固然可使用時編程自定義設置消息屬性:
AMQP.BasicProperties.Builder builder = new Builder(); BasicProperties properties = builder .deliveryMode(2) .build(); channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, MESSAGE.getBytes(StandardCharsets.UTF_8));
寫入文件前會有一個Buffer,大小爲1M,數據在寫入文件時,首先會寫入到這個Buffer,若是Buffer已滿,則會將Buffer寫入到文件(未必刷到磁盤)。
有個固定的刷盤時間:25ms,也就是無論Buffer滿不滿,每一個25ms,Buffer裏的數據及未刷新到磁盤的文件內容一定會刷到磁盤。
每次消息寫入後,若是沒有後續寫入請求,則會直接將已寫入的消息刷到磁盤:使用Erlang的receive x after 0實現,只要進程的信箱裏沒有消息,則產生一個timeout消息,而timeout會觸發刷盤操做。
TTL(Time To Live)表示存活時間。RabbitMQ中能夠對Queue和Message設置TTL,以控制Queue和Message的存活時間。
隊列的存活時間指的是Queue在自動刪除前能夠處於未使用狀態的時間。未使用狀態指的是Queue上沒有Consumer、Queue沒有被從新聲明。隊列的存活時間在隊列第一次聲明時經過指定隊列的屬性"x-expires"指定,單位是毫秒,代碼以下:
Map<String, Object> queueArgs = new HashMap<>(); // 設置1分鐘過時 queueArgs.put("x-expires", 60000); channel.queueDeclare("queue", false, false, false, queueArgs);
消息的存活時間指的是消息在隊列中的存活時間,超過該時間消息將被刪除或者不能傳遞給消費者。消息的存活時間能夠經過設置每條消息的存活時間或者設置某條隊列中的因此存活時間,當二者都有時,時間小的有效。
設置消息屬性
針對每條消息能夠在發送消息時設置消息屬性
// 設置消息屬性-TTL爲30s BasicProperties properties = new BasicProperties.Builder() .expiration("30000").build(); channel.basicPublish("exchange", "kanyuxia", properties, "hello".getBytes(StandardCharsets.UTF_8));
設置隊列屬性
經過設置隊列中消息的TTL屬性,而後傳入該隊列的全部消息都有該TTL屬性
Map<String, Object> queueArgs = new HashMap<>(); queueArgs.put("x-message-ttl", 30000); channel.queueDeclare("queue", false, false, false, queueArgs);
https://www.jianshu.com/p/64357bf35808?utm_campaign=maleskine&utm_content=note&utm_medium=seo_notes&utm_source=recommendation http://blog.csdn.net/wanbf123/article/details/78052419 http://blog.csdn.net/u013256816/article/details/60875666 http://blog.csdn.net/u013256816/article/details/54916011