① 中高級java程序員的必備的升職加薪利器;
② 在分佈式架構體系技術棧中的不可或缺的技術;
③rabbitmq做爲開源免費的消息中間件,其優點顯著;
④國內大多數的銀行系統,採用了-該消息中間件,做爲技術方案中的一個組合;
⑤rabbitmq能夠有效的解決咱們在項目中遇到的工程性能技術問題.html
① 異步通訊java
有些業務不想也不須要當即處理消息。消息隊列提供了異步處理機制,容許用戶把一個消息放入隊列,但並不當即處理它。想向隊列中放入多少消息就放多少,而後在須要的時候再去處理它們。程序員
② 解耦spring
下降工程間的強依賴程度,針對異構系統進行適配。在項目啓動之初來預測未來項目會碰到什麼需求,是極其困難的。經過消息系統在處理過程當中間插入了一個隱含的、基於數據的接口層,兩邊的處理過程都要實現這一接口,當應用發生變化時,能夠獨立的擴展或修改兩邊的處理過程,只要確保它們遵照一樣的接口約束。docker
③ 冗餘編程
有些狀況下,處理數據的過程會失敗。除非數據被持久化,不然將形成丟失。消息隊列把數據進行持久化直到它們已經被徹底處理,經過這一方式規避了數據丟失風險。許多消息隊列所採用的」插入-獲取-刪除」範式中,在把一個消息從隊列中刪除以前,須要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。json
④ 擴展性windows
由於消息隊列解耦了你的處理過程,因此增大消息入隊和處理的頻率是很容易的,只要另外增長處理過程便可。不須要改變代碼、不須要調節參數。便於分佈式擴容。centos
⑤ 過載保護安全
在訪問量劇增的狀況下,應用仍然須要繼續發揮做用,可是這樣的突發流量沒法提取預知;若是覺得了能處理這類瞬間峯值訪問爲標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列可以使關鍵組件頂住突發的訪問壓力,而不會由於突發的超負荷的請求而徹底崩潰。
⑥ 可恢復性
系統的一部分組件失效時,不會影響到整個系統。消息隊列下降了進程間的耦合度,因此即便一個處理消息的進程掛掉,加入隊列中的消息仍然能夠在系統恢復後被處理。
⑦ 順序保證
在大多使用場景下,數據處理的順序都很重要。大部分消息隊列原本就是排序的,而且能保證數據會按照特定的順序來處理。
⑧ 緩衝
在任何重要的系統中,都會有須要不一樣的處理時間的元素。消息隊列經過一個緩衝層來幫助任務最高效率的執行,該緩衝有助於控制和優化數據流通過系統的速度。以調節系統響應時間。
⑨ 數據流處理
分佈式系統產生的海量數據流,如:業務日誌、監控數據、用戶行爲等,針對這些數據流進行實時或批量採集彙總,而後進行大數據分析是當前互聯網的必備技術,經過消息隊列完成此類數據收集是最好的選擇。
ActiveMQ | RocketMQ | RabbitMQ | Kafka |
---|---|---|---|
Apache下的一個子項目。使用Java徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現,少許代碼就能夠高效地實現高級應用場景。可插拔的傳輸協議支持,好比:in-VM, TCP, SSL, NIO, UDP, multicast, JGroups and JXTA transports。RabbitMQ、ZeroMQ、ActiveMQ均支持經常使用的多種語言客戶端 C++、Java、.Net,、Python、 Php、 Ruby等。 | 阿里系下開源的一款分佈式、隊列模型的消息中間件,原名Metaq,3.0版本名稱改成RocketMQ,是阿里參照kafka設計思想使用java實現的一套mq。同時將阿里系內部多款mq產品(Notify、metaq)進行整合,只維護核心功能,去除了全部其餘運行時依賴,保證核心功能最簡化,在此基礎上配合阿里上述其餘開源產品實現不一樣場景下mq的架構,目前主要多用於訂單交易系統。可以保證嚴格的消息順序 提供針對消息的過濾功能 提供豐富的消息拉取模式 高效的訂閱者水平擴展能力 實時的消息訂閱機制 億級消息堆積能力 | 使用Erlang編寫的一個開源的消息隊列,自己支持不少的協議:AMQP,XMPP, SMTP,STOMP,也正是如此,使的它變的很是重量級,更適合於企業級的開發。同時實現了Broker架構,核心思想是生產者不會將消息直接發送給隊列,消息在發送給客戶端時先在中心隊列排隊。對路由(Routing),負載均衡(Load balance)、數據持久化都有很好的支持。多用於進行企業級的ESB整合。 | Apache下的一個子項目,使用scala實現的一個高性能分佈式Publish/Subscribe消息隊列系統,具備如下特性:快速持久化:經過磁盤順序讀寫與零拷貝機制,能夠在O(1)的系統開銷下進行消息持久化;高吞吐:在一臺普通的服務器上既能夠達到10W/s的吞吐速率;高堆積:支持topic下消費者較長時間離線,消息堆積量大;徹底的分佈式系統:Broker、Producer、Consumer都原生自動支持分佈式,依賴zookeeper自動實現複雜均衡;支持Hadoop數據並行加載:對於像Hadoop的同樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。 |
docker search rabbitmq:management
查詢rabbitmqdocker pull rabbitmq:management
從鏡像倉庫拉取鏡像docker run -d --name rabbitmq --publish 5671:5671 --publish 5672:5672 --publish 4369:4369 --publish 25672:25672 --publish 15671:15671 --publish 15672:15672 rabbitmq:management
建立容器❓ 發送端沒有設置接收的隊列名稱, rabbitMQ如何處理?
😀 若沒有設置「queue」的名字時,會根據「Routing Key」去找是否存在接收隊列的名稱和「Routing key」一致的隊列。存在,則能夠傳遞消息成功,不然,沒法傳遞數據。使用的隊列是(AMQP default),如果找不到接收隊列,會刪除傳遞的數據。
RabbitMQ的優勢:
由於開發語言爲Erlang語言,而Erlang語言最初做爲交換機使用。所以,對於數據的通訊能夠達到Socket的水平,所以,性能較好。
虛擬主機,用於進行邏輯分離,最上層的消息路由;
一個Virtual host裏面能夠有多個Exchange和Queue;
可是,在同一個Virtual host中不能存在同名的Exchange和Queue。
name: 名稱
Type:交換機類型:direct,topic,fanout,headers;
DuDurability:是否須要持久化;
Auto delete:當最後一個綁定到exchange上的隊列刪除後,自動刪除該exchange;
InterInternal:當前exchange是否用於rabbitMQ內部使用,默認爲false;
概念:全部發送到Direct Exchange的消息被轉發到Routing key中指定的Queue。
操做:可使用RabbitMQ自帶的Exchange:default Exchange,因此不須要將Exchange進行任何綁定操做,消息傳遞時,Routing Key徹底匹配時,才能被隊列接收,不然該消息被丟棄。
概念:
全部發送到Topic Exchange的消息被轉發到全部關心Routing key中指定的Topic的Queue上。
Exchange將Routing key和某個Topic進行模糊匹配,此時隊列須要綁定一個Topic。
可使用通配符進行模糊匹配;"#":匹配一個或多個詞;"*":只能匹配一個詞;
eg: "log.#「可以匹配到"log.ingo.aa」;
eg: 「log.*」 可以匹配到"log.erro"
注意事項
使用log.的時候, 查看是否會出現兩個routing key, 一個爲log.#、log.;?
queue綁定了routing key 不會自動刪除routing key;?
概念:
不處理路由鍵,只須要簡單的將隊列綁定到交換機上;
發送到交換機的消息都會被轉發到與該交換機綁定的全部隊列上;
是轉發消息最快的;
不須要使用routing key, 直接轉發到隊列,減小了不少匹配規則,因此,速度很快;
能夠綁頂Exchange和Exchange、Queue之間的聯繫關係;也就是說,能夠Exchange之間進行綁定,發送消息的路徑就是:Queue-> Exchange->Exchange->Queue。
Binding中能夠包含Routing Key或參數。
參數:
Durability:是否持久化;Durable:持久化;Transient:不持久化;
Auto Delete:當最後一個監聽被移除的時候,該Queue也會被刪除。
本質上就是一段數據,由properties和payload(body)組成。
經常使用屬性:delivery mode:1不持久化 2持久化, headers(自定義屬性)。
其餘屬性:
content_type:消息格式,eg:json、xml等等。
content——encoding:消息編碼;
priority:優先級。優先級高的優先被消費,可是,在集羣模式下,沒法保證消息消費的順序性。
correlation_id:消息惟一id,經常使用業務id+時間戳組成;能夠用於ACK以及冪等性的控制。
replay_to:消息失敗時,返回到哪一個隊列。
expiration:消息的過時時間。
message_id/timestamp/type/user_id/app_id/cluster_id;
建立Maven項目,添加必要依賴
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.7.3</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.3</version> </dependency>
編寫主要代碼
//消息生產者 public class MqProducer { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { //1. 建立並配置connectionFactory的鏈接參數 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.31.118"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); // 2.獲取鏈接 Connection connection = connectionFactory.newConnection(); //3. 經過connection獲取channel Channel channel = connection.createChannel(); //4. 經過channel聲明queue channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } } //消息消費者 public class MqConsumer { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { //1. 建立並配置connectionFactory的鏈接參數 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.31.118"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); // 2.獲取鏈接 Connection connection = connectionFactory.newConnection(); //3. 經過connection獲取channel Channel channel = connection.createChannel(); //4. 經過channel聲明queue channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }
一般RabbitMQ Server的宕機和C端的程序死掉,會致使消息不可以被正確的消費;那麼如何確保work queue中的message被正確的消費和不會丟失?
若是消息隊列中的須要處理的消息所消耗的時間和系統資源是不一樣的,消息分配的不合理可能會致使系統資源的浪費和不合理的使用.好比全部的奇數消息(Weight)都被分發給了C1處理,全部的偶數消息(Light)都被分配給了C2處理,結果會發現C2早早的處理完消息,閒置在那;而C1卻仍然在忙碌的工做中.
如何讓C端可以合理的處理消息隊列呢?
但該方案中的若是每一個Message都是Weight的,那麼會致使Queue堆積過多消息,能夠經過多加C端來消化掉.
int prefetchCount = 1; channel.basicQos(prefetchCount);
代碼編寫
//生產者 public class MqProducer { private final static String QUEUE_NAME = "taskQueue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //1. 建立並配置connectionFactory的鏈接參數 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.31.118"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); // 2.獲取鏈接 Connection connection = connectionFactory.newConnection(); //3. 經過connection獲取channel Channel channel = connection.createChannel(); //4. 經過channel聲明queue channel.queueDeclare(QUEUE_NAME, false, false, false, null); //5. 持續的生產消息 while (true){ String message = "Hello World! "+ LocalDateTime.now().toString(); TimeUnit.SECONDS.sleep(1); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Send '" + message + "'"); } } } //消費者 public class MqConsumer { private final static String QUEUE_NAME = "taskQueue"; public static void main(String[] argv) throws Exception { //1. 建立並配置connectionFactory的鏈接參數 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.31.118"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); // 2.獲取鏈接 Connection connection = connectionFactory.newConnection(); //建立線程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 4, 4, TimeUnit.SECONDS, new LinkedBlockingDeque<>()); threadPoolExecutor.execute(() ->{ try { TimeUnit.SECONDS.sleep(4); consumeMsg(connection); } catch (IOException | InterruptedException e) { e.printStackTrace(); } }); threadPoolExecutor.execute(() ->{ try { TimeUnit.SECONDS.sleep(4); consumeMsg(connection); } catch (IOException | InterruptedException e) { e.printStackTrace(); } }); threadPoolExecutor.shutdown(); } public static void consumeMsg(Connection connection) throws IOException { //3. 經過connection獲取channel Channel channel = connection.createChannel(); //4. 經過channel聲明queue channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message; try { message = new String(delivery.getBody(), "UTF-8"); System.out.println(Thread.currentThread().getName()+" Received '" + message + "'"); } finally { channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); System.out.println(Thread.currentThread().getName()+" Received Done"); } }; channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } }
Exchange的類型包含direct,topic,fanout,header
//生產者 public class MqProducer { private final static String EXCHANGE_NAME = "fanoutExchange"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //1. 建立並配置connectionFactory的鏈接參數 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.31.118"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); // 2.獲取鏈接 Connection connection = connectionFactory.newConnection(); //3. 經過connection獲取channel Channel channel = connection.createChannel(); //4. 經過channel聲明exchange & queue channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); channel.queueDeclare("queue01", false, false, false, null); channel.queueDeclare("queue02", false, false, false, null); channel.queueDeclare("queue03", false, false, false, null); //5. bind exchange和queue channel.queueBind("queue01",EXCHANGE_NAME,""); channel.queueBind("queue02",EXCHANGE_NAME,""); channel.queueBind("queue03",EXCHANGE_NAME,""); //6. 持續的生產消息 while (true){ String message = "Hello World! "+ LocalDateTime.now().toString(); TimeUnit.SECONDS.sleep(1); channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes()); System.out.println(" [x] Send '" + message + "'"); } } } //消費者 public class MqConsumer { private final static String EXCHANGE_NAME = "fanoutExchange"; public static void main(String[] argv) throws Exception { //1. 建立並配置connectionFactory的鏈接參數 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.31.118"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); // 2.獲取鏈接 Connection connection = connectionFactory.newConnection(); //3. 經過connection獲取channel Channel channel = connection.createChannel(); //4. 經過channel聲明queue channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message; try { message = new String(delivery.getBody(), "UTF-8"); System.out.println(Thread.currentThread().getName()+" Received '" + message + "'"); } finally { channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); System.out.println(Thread.currentThread().getName()+" Received Done"); } }; channel.basicConsume("queue01", false, deliverCallback, consumerTag -> { }); channel.basicConsume("queue02", false, deliverCallback, consumerTag -> { }); channel.basicConsume("queue03", false, deliverCallback, consumerTag -> { }); } }
rabbitmq默認狀況下,不配置exchange的話,會使用默認的amqp的default exchange (類型爲direct);
//生產者 public class MqProducer { private final static String EXCHANGE_NAME = "directExchange"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //1. 建立並配置connectionFactory的鏈接參數 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.31.118"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); // 2.獲取鏈接 Connection connection = connectionFactory.newConnection(); //3. 經過connection獲取channel Channel channel = connection.createChannel(); //4. 經過channel聲明exchange & queue channel.exchangeDeclare(EXCHANGE_NAME, "direct"); channel.queueDeclare("directQueue01", false, false, false, null); channel.queueDeclare("directQueue02", false, false, false, null); //5. bind exchange和queue channel.queueBind("directQueue01",EXCHANGE_NAME,"error"); channel.queueBind("directQueue02",EXCHANGE_NAME,"error"); channel.queueBind("directQueue02",EXCHANGE_NAME,"info"); channel.queueBind("directQueue02",EXCHANGE_NAME,"warning"); //6. 持續的生產消息 // 6.1 生產error消息 new Thread(()->{ while (true){ String message = "Error! "+ LocalDateTime.now().toString(); try { TimeUnit.SECONDS.sleep(1); channel.basicPublish(EXCHANGE_NAME,"error",null,message.getBytes()); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); // 6.2 生產info消息 new Thread(()->{ while (true){ String message = "Info! "+ LocalDateTime.now().toString(); try { TimeUnit.SECONDS.sleep(1); channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes()); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); // 6.3 生產warning消息 new Thread(()->{ while (true){ String message = "Warning! "+ LocalDateTime.now().toString(); try { TimeUnit.SECONDS.sleep(1); channel.basicPublish(EXCHANGE_NAME,"warning",null,message.getBytes()); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } } //消費者 public class MqConsumer { private final static String EXCHANGE_NAME = "directExchange"; public static void main(String[] argv) throws Exception { //1. 建立並配置connectionFactory的鏈接參數 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.31.118"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); // 2.獲取鏈接 Connection connection = connectionFactory.newConnection(); //3. 經過connection獲取channel Channel channel = connection.createChannel(); //4. 經過channel聲明queue channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message; try { message = new String(delivery.getBody(), "UTF-8"); System.out.println(Thread.currentThread().getName()+" Received '" + message + "'"); } finally { channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); System.out.println(Thread.currentThread().getName()+" Received Done"); } }; channel.basicConsume("directQueue01", false, deliverCallback, consumerTag -> { }); channel.basicConsume("directQueue02", false, deliverCallback, consumerTag -> { }); } }
//生產者 public class MqProducer { private final static String EXCHANGE_NAME = "topicExchange"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //1. 建立並配置connectionFactory的鏈接參數 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.31.118"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); // 2.獲取鏈接 Connection connection = connectionFactory.newConnection(); //3. 經過connection獲取channel Channel channel = connection.createChannel(); //4. 經過channel聲明exchange & queue channel.exchangeDeclare(EXCHANGE_NAME, "topic"); channel.queueDeclare("topicQueue01", false, false, false, null); channel.queueDeclare("topicQueue02", false, false, false, null); //5. bind exchange和queue channel.queueBind("topicQueue01",EXCHANGE_NAME,"*.topic.animal.#"); channel.queueBind("topicQueue02",EXCHANGE_NAME,"*.topic.plant.#"); //6. 持續的生產消息 String[] routingKeys=new String[]{"a.topic.animal.dog","p.topic.plant.tree","p.topic.plant.grass","m.topic.animal.cat.yellowCat","p.topic.plant.fruit.apple"}; while(true){ int i = new Random().nextInt(5); System.out.println(i+routingKeys[i]); channel.basicPublish(EXCHANGE_NAME,routingKeys[i],null,routingKeys[i].getBytes()); } } } //消費者 public class MqConsumer { private final static String EXCHANGE_NAME = "topicExchange"; public static void main(String[] argv) throws Exception { //1. 建立並配置connectionFactory的鏈接參數 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.31.118"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); // 2.獲取鏈接 Connection connection = connectionFactory.newConnection(); //3. 經過connection獲取channel Channel channel = connection.createChannel(); //4. 經過channel聲明queue DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message; try { message = new String(delivery.getBody(), "UTF-8"); System.out.println(Thread.currentThread().getName()+" Received '" + message + "'"); } finally { channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); System.out.println(Thread.currentThread().getName()+" Received Done"); } }; channel.basicConsume("topicQueue01", false, deliverCallback, consumerTag -> { }); channel.basicConsume("topicQueue02", false, deliverCallback, consumerTag -> { }); } }
TODO
package com.gh.basic.middleware.rabbitmq; /** * PublisherConfirms * * @author: Golphin * @date: 2019/10/12 * @description: */ import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmCallback; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.time.Duration; import java.util.UUID; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.function.BooleanSupplier; public class PublisherConfirms { static final int MESSAGE_COUNT = 50_000; static Connection createConnection() throws Exception { ConnectionFactory cf = new ConnectionFactory(); cf.setHost("192.168.31.118"); cf.setUsername("guest"); cf.setPassword("guest"); return cf.newConnection(); } public static void main(String[] args) throws Exception { publishMessagesIndividually(); publishMessagesInBatch(); handlePublishConfirmsAsynchronously(); } static void publishMessagesIndividually() throws Exception { try (Connection connection = createConnection()) { Channel ch = connection.createChannel(); String queue = UUID.randomUUID().toString(); ch.queueDeclare(queue, false, false, true, null); ch.confirmSelect(); long start = System.nanoTime(); for (int i = 0; i < MESSAGE_COUNT; i++) { String body = String.valueOf(i); ch.basicPublish("", queue, null, body.getBytes()); ch.waitForConfirmsOrDie(5_000); } long end = System.nanoTime(); System.out.format("Published %,d messages individually in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis()); } } static void publishMessagesInBatch() throws Exception { try (Connection connection = createConnection()) { Channel ch = connection.createChannel(); String queue = UUID.randomUUID().toString(); ch.queueDeclare(queue, false, false, true, null); ch.confirmSelect(); int batchSize = 100; int outstandingMessageCount = 0; long start = System.nanoTime(); for (int i = 0; i < MESSAGE_COUNT; i++) { String body = String.valueOf(i); ch.basicPublish("", queue, null, body.getBytes()); outstandingMessageCount++; if (outstandingMessageCount == batchSize) { ch.waitForConfirmsOrDie(5_000); outstandingMessageCount = 0; } } if (outstandingMessageCount > 0) { ch.waitForConfirmsOrDie(5_000); } long end = System.nanoTime(); System.out.format("Published %,d messages in batch in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis()); } } static void handlePublishConfirmsAsynchronously() throws Exception { try (Connection connection = createConnection()) { Channel ch = connection.createChannel(); String queue = UUID.randomUUID().toString(); ch.queueDeclare(queue, false, false, true, null); ch.confirmSelect(); ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>(); ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> { if (multiple) { ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap( sequenceNumber, true ); confirmed.clear(); } else { outstandingConfirms.remove(sequenceNumber); } }; ch.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> { String body = outstandingConfirms.get(sequenceNumber); System.err.format( "Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n", body, sequenceNumber, multiple ); cleanOutstandingConfirms.handle(sequenceNumber, multiple); }); long start = System.nanoTime(); for (int i = 0; i < MESSAGE_COUNT; i++) { String body = String.valueOf(i); outstandingConfirms.put(ch.getNextPublishSeqNo(), body); ch.basicPublish("", queue, null, body.getBytes()); } if (!waitUntil(Duration.ofSeconds(60), () -> outstandingConfirms.isEmpty())) { throw new IllegalStateException("All messages could not be confirmed in 60 seconds"); } long end = System.nanoTime(); System.out.format("Published %,d messages and handled confirms asynchronously in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis()); } } static boolean waitUntil(Duration timeout, BooleanSupplier condition) throws InterruptedException { int waited = 0; while (!condition.getAsBoolean() && waited < timeout.toMillis()) { Thread.sleep(100L); waited = +100; } return condition.getAsBoolean(); } }
TODO