MQ(消息隊列)做爲現代比較流行的技術,在互聯網應用平臺中做爲中間件,主要解決了應用解耦、異步通訊、流量削鋒、服務總線等問題,爲實現高併發、高可用、高伸縮的企業應用提供了條件。html
目前市面比較流行的消息隊列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等,而每種產品又有着獨特的着重點,可根據業務須要進行選擇。git
這裏有對主流MQ的優缺點的一些描述與比較。github
- 安裝
一、下載Erlangredis
官網下載,安裝完成配置環境變量spring
二、安裝RabbitMqmongodb
官網下載,直接安裝瀏覽器
啓用管理界面:rabbitmq-plugins.bat enable rabbitmq_management服務器
進入%RM%/sbin雙擊 rabbitmq-server.bat,啓動成功後以下:併發
啓動完成瀏覽器訪問http://localhost:15672,用戶名/密碼:guest/guest,進入界面後以下:app
- 介紹
RabbitMq是遵循了AMQP協議的開源消息隊列的代理服務軟件。
- 跨平臺,支持多種語言
- 實現了AMQP協議;
- 知足高併發需求
- 支持集羣部署
- 支持多插件,可視化視圖
- 社區活躍
- 等等
核心元件包括:
ConnectionFactory(鏈接管理器):應用程序與Rabbit之間創建鏈接的管理器,程序代碼中使用;
Channel(信道):消息推送使用的通道;
Exchange(交換器):用於接受、分配消息;
Queue(隊列):用於存儲生產者的消息;
RoutingKey(路由鍵):用於把生成者的數據分配到交換器上;
BindingKey(綁定鍵):用於把交換器的消息綁定到隊列上;
- 使用
先看一段簡單的代碼,來理解下消息隊列的工做原理:
Connection connection = AmqpConnectionFactory.getConnection(new AmqpConfig()); final Channel channel = connection.createChannel(); channel.queueDeclare("q.demo",false,false,true,null); channel.basicPublish("","q.demo",null,"消息".getBytes()); channel.basicConsume("q.demo",false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } });
- 首先獲取鏈接,以前下載的rabbitmq server,與之創建鏈接
- 打開通訊channel,能夠理解爲打開一個tcp通訊,只不過爲節省資源虛擬了一個通道
- 定義一個隊列
- 向隊列中發送消息
- 從隊列中消費消息
其實這裏還有個過程被忽略了,實際上是使用了默認處理,在第4步中,實際上是向消息路由發佈消息,且該消息路由的routingKey與隊列名稱相同,
The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted.
而實際中的用法可能更爲複雜些,可是 原理上其實都同樣,只不過在消息的處理過程當中會添加一些策略來應對不一樣的應用場景,同時爲了保證消息的可靠性,會引入一些確認機制,固然這些都是後話,先在剛的基礎上看一下另外一個示例,一樣有生產者-消費做爲模型:
生成者:
public void produce(String message) throws IOException, TimeoutException, InterruptedException { Connection connection = AmqpConnectionFactory.getConnection(new AmqpConfig()); //基於信道的通訊 Channel channel = connection.createChannel(); /** * 交換機名稱、交換機類型、是否持久化、是否自動刪除、是否內部使用、參數 */ channel.exchangeDeclare(Common.EXCHANGE_X1,ExType.Direct.value(),false,false,false,null);//申明交換機 /** * 消息發送到指定交換機、routing key、是否重發、是否、基礎屬性、消息內容 * mandatory:(true)沒有隊列,消息返回;(false)沒有隊列,消息丟棄 * immediate:(true)沒有消費者,消息返回;(false) */ int count = 0; while (count++ <100){ TimeUnit.SECONDS.sleep(1); channel.basicPublish(Common.EXCHANGE_X1,Common.ROUTING_KEY1,false,false,null,(message+ new Date()).getBytes()); } channel.close(); AmqpConnectionFactory.close(connection); } public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Producer producer = new Producer(); producer.produce("消息 "); }
大體意思就是講生產的消息綁定到指定的交換機上,而且指定交換機類型以及routing key。
消費者:
public void consume() throws IOException { Connection connection = AmqpConnectionFactory.getConnection(new AmqpConfig()); final Channel channel = connection.createChannel();//創建通訊通道 /** * 隊列名稱、是否持久化、是否被該鏈接獨佔、自動刪除、參數 */ channel.queueDeclare(Common.QUEUE_Q1,false,false,false,null);//申明隊列 /** * 交換機名稱、交換機類型、是否持久化、是否自動刪除、是否內部使用、參數 */ channel.exchangeDeclare(Common.EXCHANGE_X1,ExType.Direct.value(),false,false,false,null);//申明交換機 /** * 隊列名稱、交換機名稱、binding Key */ channel.queueBind(Common.QUEUE_Q1,Common.EXCHANGE_X1,Common.BINDING_KEY1);//將消息交換機與隊列綁定 /** * 隊列名稱、自動ACK、消費者標記、非本地、是否被該鏈接獨佔、參數 */ channel.basicConsume(Common.QUEUE_Q1,false,"c1",false,false,null,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // super.handleDelivery(consumerTag, envelope, properties, body); System.out.println(new String(body)); /** * 消息索引、批量確認 */ channel.basicAck(envelope.getDeliveryTag(),false); } }); } public static void main(String[] args) throws IOException { Consumer consumer = new Consumer(); consumer.consume(); }
相對要複雜些,主要是從隊列中讀取消息並顯示,那麼隊列的消息則是經過交換機與設置的binding key來決定。
如今來梳理下整個流程:
- 在生產者中創建與mq服務的鏈接,建立通道
- 定義消息交換機,注意次數有不少參數,如今咱們僅關注其名稱與類型
- 循環100次向指定交換機中發佈消息,並設置routing key
- 在消費者中創建鏈接,建立通道
- 定義消息隊列
- 定義消息交換機
- 將消息隊列綁定到交換機上,並定義binding key
- 從隊列中讀取消息
其中有些方面須要注意的:
- 生產者與消費者啓動不分前後
- 兩個地方都定義了消息交換機,與上一個對應。由於若是有該交換已經存在,則不會從新建立,但若是屬性不一樣,則會報錯
- 其中涉及到了routing key、binding key,主要是完成消息從匹配的交換機拿取消息,固然這個不是必須的,由於受到exchange的type影像
- 生產者的的消息都是發送到交換機,而消費的消息都是從隊列中拿
到這裏有必要說下Exchange的type,主要有如下類型:
- Fanout:轉發消息到全部綁定隊列
- Direct:direct 類型的行爲是"先匹配, 再投送". 即在綁定時設定一個 routing key, 消息的routing key 匹配時, 纔會被交換器投送到綁定的隊列中去.
- Topic:按規則轉發消息(最靈活)
- Headers:設置 header attribute 參數類型的交換機
經過這幾種類型的exchange,能夠定製出很是複雜的業務模型。
上面能夠說從簡單的應用層面瞭解了Rabbit Mq,由於網上有太多的知識,對一些組件與工做模型等都講解,而且圖文並茂,全部不必作過多重複的工做,下面從可靠性的角度來學習,同時以前咱們在聲明隊列、交換機等會有一些參數,具體幹嗎用的,可能有些會說到,畢竟是學習階段,不可能那麼全面,也不會那麼準確,權做參考。
下面看一個簡單的im,僅僅實現兩我的間的固化通訊,jack and rose:
首先說下思路,兩個用戶間的通訊,會涉及到每一個用戶的接收和發送,咱們知道,通常聊天時,接收和發送是互不影響的,也不會有任何依賴關係,也就是發送和接收的消息是兩條線,那麼咱們就須要爲每一個用戶分別開通一個發送和接收的線程,這樣兩個行爲就不會有任何影響。而後看下怎麼發送信息,就是經過mq開通一個channel,將消息發送到對應的exchange,進而講消息推送到匹配的消息隊列中,而另外一方接收,則從指定的隊列中取得消息並展示出來。那麼接收發送就造成了完整的過程。固然這個和以前說的接收發送不一樣,剛纔指的是同一用戶,如今指用戶到用戶,這個發送與接收本就是一個過程的兩個階段。下面看下代碼實現,既然兩個用戶的行爲如此相像,咱們就提出一個抽象類來實現共同的部分:
public abstract class AbstractUser { private static String EXCHANGE = "x.user"; private String id = UUID.randomUUID().toString(); private String name; private Connection connection = AmqpConnectionFactory.getConnection(new AmqpConfig()); public AbstractUser(String name){ this.name = name; } public void start() throws IOException { System.out.println(name +" 上線了.."); Channel channel = connection.createChannel(); channel.basicQos(1);//流量控制 String queueName = getResQueue(); channel.exchangeDeclare(EXCHANGE,ExType.Direct.value()); channel.queueDeclare(queueName,false,false,false,null); channel.queueBind(queueName,EXCHANGE,getResBindingKey()); //專門接收的線程 new Thread(()->{ try { channel.basicConsume(queueName,false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { if(!id.equals(properties.getCorrelationId())){ System.out.println(properties.getAppId()+" : "+new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); }else{ channel.basicNack(envelope.getDeliveryTag(),false,true); } } }); } catch (IOException e) { e.printStackTrace(); } }).start(); //專門發送的線程 new Thread(()->{ Scanner scanner = new Scanner(System.in); AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().correlationId(id).appId(name).build(); while (true){ try { channel.basicPublish(EXCHANGE,getPusRoutingKey(),properties,scanner.nextLine().getBytes()); } catch (IOException e) { e.printStackTrace(); } } }).start(); } /** * 發送消息時指定routing key,在接收方binding key須要與之對應 * @return */ public abstract String getPusRoutingKey(); /** * 接收消息的隊列 * @return */ public abstract String getResQueue(); /** * 接收消息的binding key,與發送方的routing key對應 * @return */ public abstract String getResBindingKey(); }
JACK:
public class Jack extends AbstractUser{ public Jack(String name) { super(name); } @Override public String getPusRoutingKey() { return "toRose"; } @Override public String getResQueue() { return "q.rose"; } @Override public String getResBindingKey() { return "toJack"; } public static void main(String[] args) throws IOException { Jack jack = new Jack("jack"); jack.start(); } }
ROSE:
public class Rose extends AbstractUser{ public Rose(String name) { super(name); } @Override public String getPusRoutingKey() { return "toJack"; } @Override public String getResQueue() { return "q.jack"; } @Override public String getResBindingKey() { return "toRose"; } public static void main(String[] args) throws IOException { Rose rose = new Rose("rose"); rose.start(); } }
其中要注意的是,在接收的時候,開始設計時是共用了一個隊列,因此會出現本身給本身發信息,因此在發送消息時,爲消息添加了屬性,標識該消息的來源,那麼在讀取消息時,根據該屬性判斷是否爲本身的消息,若是是,則確認並消費該消息,若是不是,須要作一次nack的處理,並將消息從新放回隊列中,直到被其餘用戶消費爲止。咱們能夠看到,如今是兩我的的通訊,有一些固化的元素,好比routing key,兩個用戶通訊是須要優先肯定的,那麼真實的IM系統,會涉及到不少繁瑣的內容,好比消息發送失敗,消息發送超時、重發、多人聊天等等,會存在不少須要解決的問題。
jack和rose的聊天也結束了,那麼咱們在來看看其餘的一些知識點,一樣以消息的發送與消息接收爲一條線來進行下去。
在發送消息前,毫無疑問是先創建鏈接,打開虛擬通道,以後纔是定義交換機,發送消息(不用申明隊列)。那麼在申明交換機的時候,其實有不少個參數:
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;
- exchange:交換機名稱
- type:交換機類型,上面說到了, direct, topic, fanout, headers
- durable:是否持久化,也就是斷開鏈接後是否還存在
- autoDelete:自動刪除,當與該exchange上的隊列所有刪除後, 自動刪除,和上一個參數比較一下,好比durable=true,那麼若是該參數配置true,其實也會刪除(沒有queue)
- internal:是否內部交換機,不太知道應用場景
- arguments:其餘參數,好比DLX
在發送消息時,一樣有一些可配置參數:
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
- exchange:消息發送到的交換機
- routingKey:交換機路由key
- mandatory:true,若是交換機沒有匹配到對應的隊列,會將調用basic.return將該消費返回生成者;false,上述情形直接丟棄消息
- immediate:true,若是交換機關聯隊列沒有消費者,則不會將消息加入隊列;false,上述情形將調用basic.return將消息返回生產者。3.0後去掉了
- props:爲消息添加一些參數,好比過時時間
- body:消息主體
那麼這些參數主要幹嗎的?當時是保證系統的可靠性了。
那麼在消息的發送端,如何保證可靠性:
- 事務
try { channel.txSelect(); channel.basicPublish("exchange", "routingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, msg); channel.txCommit(); } catch (Exception ex) { channel.txRollback(); }
- 確認機制(推薦)
該機制主要是經過註冊一些事件來處理的,好比上面提到過的basic.return
channel.confirmSelect(); channel.basicPublish("yu.exchange", "yu.1", MessageProperties.PERSISTENT_TEXT_PLAIN, msg); boolean success = channel.waitForConfirms(10);
channel.addConfirmListener(new ConfirmListener() { public void handleAck(long l, boolean b) throws IOException { } public void handleNack(long l, boolean b) throws IOException { } }); channel.addReturnListener(new ReturnListener() { public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException{ } }); channel.confirmSelect(); channel.basicPublish("exchange", "routingKey",true, MessageProperties.PERSISTENT_TEXT_PLAIN, msg);
下面主要對第二種狀況驗證一下,記得在測試前,講相關的exchange與queue進行刪除,不然會影響測試結果:
public class Producer { public void produce(String message) throws IOException, TimeoutException, InterruptedException { Connection connection = AmqpConnectionFactory.getConnection(new AmqpConfig()); //基於信道的通訊 Channel channel = connection.createChannel(); /** * 交換機名稱、交換機類型、是否持久化、是否自動刪除、是否內部使用、參數 */ channel.exchangeDeclare(Common.EXCHANGE_X1,ExType.Direct.value(),false,false,false,null);//申明交換機 channel.confirmSelect();//確認機制 channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("消息發送成功!"); } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("消息發送失敗!"); } }); /** * mandatory=ture */ channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("從新處理消息!"); } }); /** * 消息發送到指定交換機、routing key、是否重發、是否、基礎屬性、消息內容 * mandatory:(true)沒有隊列,消息返回;(false)沒有隊列,消息丟棄 * immediate:(true)沒有消費者,消息返回;(false) */ int count = 0; while (count++ <10){ TimeUnit.SECONDS.sleep(1); channel.basicPublish(Common.EXCHANGE_X1,Common.ROUTING_KEY1,false,false,null,(new Date()+message).getBytes()); } channel.close(); AmqpConnectionFactory.close(connection); } public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Producer producer = new Producer(); producer.produce("消息 "); } }
咱們發送10條消息到交換機,控制檯打印以下,若是關閉鏈接可能最後一條消息打印不出來:
而後啓動消費者,恰好也消費了10條消息(須要先聲明下隊列,否則生產的消息都會被丟棄,mandatory=false)。
如今咱們作一些修改,將上面說到的修改mandatory=true,也就是沒有與交換機匹配的隊列時,將會重發,也就是調用上面咱們定義的ReturnListener:
與預期的同樣,只不過會發現,在調用handleReturn後會再次調用handleAck,也就是發送成功!
上面說的這些也就是消息發佈者的ack機制。
接下來看下消費者的ack:
咱們定義消費者時,通常會先定義隊列、交換機、將隊列與交換機綁定、發送消息。
聲明隊列:
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
- queue:隊列名稱
- durable:是否持久化
- exclusive:排他隊列,與該鏈接綁定,決定多個消費者是否能夠訪問這一隊列
- autoDelete:自動刪除,沒有消費者時自動刪除
- arguments:隊列參數,好比隊列過時時間
消息接收:
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;
- queue:綁定的隊列名
- autoAck:自動ack
- consumerTag:消費者標記
- noLocal:ture,不將同一鏈接中生產的消息傳遞給該消費者
- exclusive:排他
- arguments:擴展參數
- callback:消費回調
首先要知道,消息一段被消費,就會被移除,那麼咱們如何肯定消息是否被真實消費?由於從拿到消息到正則消掉該消息,都是有一個過程,可能任何環境都出現問題,可是被認爲消費而致使消息被移除,則可靠性就沒法獲得保證,做爲消費者和生產者同樣會有事務與ack兩種方式保證,只不過須要注意的是:
- autoAck=false手動應對的時候是支持事務的,也就是說即便你已經手動確認了消息已經收到了,但在確認消息會等事務的返回解決以後,在作決定是確認消息仍是從新放回隊列,若是你手動確認如今以後,又回滾了事務,那麼已事務回滾爲主,此條消息會從新放回隊列;
- autoAck=true若是自定確認爲true的狀況是不支持事務的,也就是說你即便在收到消息以後在回滾事務也是於事無補的,隊列已經把消息移除了;
那麼針對ack機制,主要有如下相關方法:
//消息索引,批量ack,只對小於該DeliveryTag的消息 // 批量確認 channel.basicAck(envelope.getDeliveryTag(),false); //其中 deliveryTag 和 requeue 的含義能夠參考 basicReject 方法。 multiple 參數 //設置爲 false 則表示拒絕編號爲 deliveryT坷的這一條消息,這時候 basicNack 和 basicReject 方法同樣; // multiple 參數設置爲 true 則表示拒絕 deliveryTag 編號以前所 有未被當前消費者確認的消息。 channel.basicNack(envelope.getDeliveryTag(),false,false); //一次只能拒絕一條 //其中 deliveryTag 能夠看做消息的編號 ,它是一個 64 位的長整型值,最大值是 9223372036854775807 // requeue 參數設置爲 true,則 RabbitMQ 會從新將這條消息存入隊列,以即可以發送給下一個訂閱的消費者; // requeue 參數設置爲 false,則 RabbitMQ 當即會把消息從隊列中移除,而不會把它發送給新的消費者 channel.basicReject(envelope.getDeliveryTag(),false);
在結束ack前,須要說明一點的是,消費者和生產者消息發送的成功與消費是否成功,並非消費者向生產者進行ack,而是針對mq服務器。對於生產者只是確保消息發送到服務器是否成功;對於消費者,只是確保消息是否從服務器被消費掉。
若是咱們對某條消息nack,有沒有requeue,那麼這條消息是否是真的就丟失了呢?這裏不得不引入另一個概念,死信,那麼與死信對應的有死信隊列XLD,同時死信的條件不僅剛說到的,在如下狀況都會觸發:
- 消息被拒絕(basic.reject/ basic.nack)而且requeue=false
- 消息TTL過時
- 隊列達到最大長度
關於第一點很少說,上面已經提到了,關於第二點TTL(time to live)關係到消息的過時時間,通常會從兩個角度分析,咱們知道,消息沒有消費前是在隊列中,那麼隊列的過時時間也會影響消息的過時時間,全部這個時間會從隊列過時時間()消息過時時間中取小。
隊列過時時間設置:
//申明隊列時設置 args.put("x-expires", 10000);//ms 隊列過時時間
消息過時時間設置:
//申明隊列時設置 args.put("x-message-ttl", 6000);//消息過時時間 //發佈消息時設置 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("6000").build();
首先如何定義死信隊列,這個是在隊列申明的時候,以參數的形式加入的(x-dead-letter-exchange):
channel.exchangeDeclare(Common.EXCHANGE_DLX_X1,ExType.Direct.value()); Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange",Common.EXCHANGE_DLX_X1);//死信交換機 channel.queueDeclare(Common.QUEUE_Q1,false,false,false,args);
下面來看下示例:
消息生產者:
public void produce(String message) throws IOException, TimeoutException, InterruptedException { Connection connection = AmqpConnectionFactory.getConnection(new AmqpConfig()); //基於信道的通訊 Channel channel = connection.createChannel(); /** * 交換機名稱、交換機類型、是否持久化、是否自動刪除、是否內部使用、參數 */ channel.exchangeDeclare(Common.EXCHANGE_X1,ExType.Direct.value(),false,false,false,null);//申明交換機 //爲了保證先啓動該類,交換機沒有綁定隊列致使消息丟失,優先處理,在消費者中也會有如下內容 channel.exchangeDeclare(Common.EXCHANGE_DLX_X1,ExType.Direct.value()); channel.queueDeclare(Common.QUEUE_DLX_Q1, false, false, false, null);//申明死信隊列 channel.queueBind(Common.QUEUE_DLX_Q1, Common.EXCHANGE_DLX_X1, Common.ROUTING_DLX_KEY1);//將消息交換機與隊列綁定 Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange",Common.EXCHANGE_DLX_X1);//死信交換機 args.put("x-dead-letter-routing-key", Common.ROUTING_DLX_KEY1); args.put("x-expires", 30000);//ms 隊列過時時間 args.put("x-message-ttl", 12000);//消息過時時間 channel.queueDeclare(Common.QUEUE_Q1,false,false,false ,args); channel.queueBind(Common.QUEUE_Q1,Common.EXCHANGE_X1,Common.BINDING_KEY1);//將消息交換機與隊列綁定 channel.confirmSelect();//確認機制 channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("消息發送成功!"); } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("消息發送失敗!"); } }); /** * mandatory=ture */ channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("從新處理消息!"); } }); /** * 消息發送到指定交換機、routing key、是否重發、是否、基礎屬性、消息內容 * mandatory:(true)沒有隊列,消息返回;(false)沒有隊列,消息丟棄 * immediate:(true)沒有消費者,消息返回;(false) */ int count = 0; // AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("12000").build(); while (count++ <10){ TimeUnit.MILLISECONDS.sleep(500); channel.basicPublish(Common.EXCHANGE_X1,Common.ROUTING_KEY1,false,false,null,(new Date()+message).getBytes()); } channel.close(); AmqpConnectionFactory.close(connection); } public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Producer producer = new Producer(); producer.produce("消息 "); }
消息消費者:
public void consume() throws IOException { Connection connection = AmqpConnectionFactory.getConnection(new AmqpConfig()); final Channel channel = connection.createChannel();//創建通訊通道 channel.exchangeDeclare(Common.EXCHANGE_DLX_X1,ExType.Direct.value()); Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange",Common.EXCHANGE_DLX_X1);//死信隊列 args.put("x-dead-letter-routing-key", Common.ROUTING_DLX_KEY1);//死信routing key 默認取 args.put("x-expires", 30000);//ms 隊列過時時間 args.put("x-message-ttl", 12000);//消息過時時間 /** * 隊列名稱、是否持久化、是否被該鏈接獨佔(只對申明鏈接可見,斷開鏈接刪除)、自動刪除、參數 */ channel.queueDeclare(Common.QUEUE_Q1,false,false,false,args);//申明隊列 /** * 交換機名稱、交換機類型、是否持久化、是否自動刪除、是否內部使用、參數 */ channel.exchangeDeclare(Common.EXCHANGE_X1,ExType.Direct.value(),false,false,false,null);//申明交換機 /** * 隊列名稱、交換機名稱、binding Key */ channel.queueBind(Common.QUEUE_Q1,Common.EXCHANGE_X1,Common.BINDING_KEY1);//將消息交換機與隊列綁定 /** * 隊列名稱、自動ACK、消費者標記、非本地、是否被該鏈接獨佔、參數 * 與basicGet對比,get 只取了隊列裏面的第一條消息 * 一種是主動去取,一種是監聽模式 */ channel.basicConsume(Common.QUEUE_Q1,false,"c1",false,false,null,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }); } public static void main(String[] args) throws IOException { Consumer consumer = new Consumer(); consumer.consume(); }
死信消費者:
public void consume() throws IOException { Connection connection = AmqpConnectionFactory.getConnection(new AmqpConfig()); final Channel channel = connection.createChannel();//創建通訊通道 channel.queueDeclare(Common.QUEUE_DLX_Q1, false, false, false, null);//申明隊列 channel.exchangeDeclare(Common.EXCHANGE_DLX_X1, ExType.Direct.value(), false, false, false, null);//申明交換機 channel.queueBind(Common.QUEUE_DLX_Q1, Common.EXCHANGE_DLX_X1, Common.ROUTING_DLX_KEY1);//將消息交換機與隊列綁定 channel.basicConsume(Common.QUEUE_DLX_Q1, true, "c2", false, false, null, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("死信:"+new String(body)); } }); } public static void main(String[] args) throws IOException { DlxConsumer consumer = new DlxConsumer(); consumer.consume(); }
測試過程當中可觀察rabbitmq服務檯數據變化,主要步驟大體以下:
- 執行消費者,測試會生成10條信息在正常的隊列中
- 在12秒內執行消息消費者,發現會打印出10條信息
- 執行步驟1
- 過12秒後執行消息消費者,發現不會打印任何信息
- 執行死信消費者,發現打印出10條信息
至此,關於rabbitmq的知識也差很少了,可是若是想搭建一個比較穩健的消息系統來處理系統中的各類異步任務,仍是須要將各類知識進行搭配。
擴展:
- 延遲隊列:經過死信實現,其實上面的例子中,去掉消費者,將死信消費者看作正常消費者,那麼就是延遲隊列了
- 重試機制:包含發送失敗重試與消費故障重試
- 隊列屬性:
Message TTL(x-message-ttl):設置隊列中的全部消息的生存週期(統一爲整個隊列的全部消息設置生命週期), 也能夠在發佈消息的時候單獨爲某個消息指定剩餘生存時間,單位毫秒, 相似於redis中的ttl,生存時間到了,消息會被從隊裏中刪除,注意是消息被刪除,而不是隊列被刪除, 特性Features=TTL, 單獨爲某條消息設置過時時間AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().expiration(「6000」); Auto Expire(x-expires): 當隊列在指定的時間沒有被訪問(consume, basicGet, queueDeclare…)就會被刪除,Features=Exp Max Length(x-max-length): 限定隊列的消息的最大值長度,超過指定長度將會把最先的幾條刪除掉, 相似於mongodb中的固定集合,例如保存最新的100條消息, Feature=Lim Max Length Bytes(x-max-length-bytes): 限定隊列最大佔用的空間大小, 通常受限於內存、磁盤的大小, Features=Lim B Dead letter exchange(x-dead-letter-exchange): 當隊列消息長度大於最大長度、或者過時的等,將從隊列中刪除的消息推送到指定的交換機中去而不是丟棄掉,Features=DLX Dead letter routing key(x-dead-letter-routing-key):將刪除的消息推送到指定交換機的指定路由鍵的隊列中去, Feature=DLK Maximum priority(x-max-priority):優先級隊列,聲明隊列時先定義最大優先級值(定義最大值通常不要太大),在發佈消息的時候指定該消息的優先級, 優先級更高(數值更大的)的消息先被消費 Lazy mode(x-queue-mode=lazy): Lazy Queues: 先將消息保存到磁盤上,不放在內存中,當消費者開始消費的時候才加載到內存中Master locator(x-queue-master-locator)