近幾日學習了一下rabbitmq消息中間件,因爲好久好久以前已經學過,並且自己入門難度也不高,因此學習起來較爲簡單,現將全部內容及與spring、springboot整合方法記錄:html
首先消息中間件定義:java
1.什麼是MQ
消息隊列(Message Queue,簡稱MQ),從字面意思上看,本質是個隊列,FIFO先入先出,只不過隊列中存放的內容是message而已。
其主要用途:不一樣進程Process/線程Thread之間通訊。
爲何會產生消息隊列?有幾個緣由:
不一樣進程(process)之間傳遞消息時,兩個進程之間耦合程度太高,改動一個進程,引起必須修改另外一個進程,爲了隔離這兩個進程,在兩進程間抽離出一層(一個模塊),全部兩進程之間傳遞的消息,都必須經過消息隊列來傳遞,單獨修改某一個進程,不會影響另外一個;
不一樣進程(process)之間傳遞消息時,爲了實現標準化,將消息的格式規範化了,而且,某一個進程接受的消息太多,一會兒沒法處理完,而且也有前後順序,必須對收到的消息進行排隊,所以誕生了事實上的消息隊列;
git
我從別人博客中看到了幾篇文章,寫的很是棒 如今將其記錄github
https://mp.weixin.qq.com/s?__biz=MzAxOTc0NzExNg==&mid=2665513507&idx=1&sn=d6db79c1ae03ba9260fb0fb77727bb54&chksm=80d67a60b7a1f376e7ad1e2c3276e8b565f045b1c7e21ef90926f69d99f969557737eb5d8128&mpshare=1&scene=1&srcid=1019awkBx8kaLyFohcuW4Ee7web
這個是經過故事的方式寫出消息中間件究竟是什麼正則表達式
https://mp.weixin.qq.com/s?__biz=MjM5ODYxMDA5OQ==&mid=2651960012&idx=1&sn=c6af5c79ecead98daa4d742e5ad20ce5&chksm=bd2d07108a5a8e0624ae6ad95001c4efe09d7ba695f2ddb672064805d771f3f84bee8123b8a6&mpshare=1&scene=1&srcid=04054h4e90lz5Qc2YKnLNuvYspring
這個是消息中間件的使用場景docker
https://github.com/jasonGeng88/blog/blob/master/201705/MQ.mdjson
這個故事是消息中間件在實際開發中的應用場合緩存
還有一個協議須要注意:
AMQP
還有rabbitmq的五種隊列
如今我將其解釋一下:
第一種.直接給 就是最簡單的一對一(經過隊列發)
第二種:能夠當作是第一種的擴展,就是一對多,你們共享同一個消息
第三種:其實就是將消息發送給交換機,而後交換機在把消息發送給綁定在此交換機的隊列上
第四種 路由模式,你能夠當作是根據具體關鍵字來發 或者說是服務每一個隊列承擔着一個或多個服務,消息會根據服務來發
第五種 topic模式,跟上面的差很少 不過他是至關於多匹配,至關於正則表達式
第六種 好像是遠程調用
而後是和安裝和使用,安裝的話我直接用的docker
其他安裝手段的話你必須得先安裝elrang語言,由於rabbitmq就是用這個語言編寫的
接下來是各類模式的使用
稍後我會傳到github去 其實難點很少 主要是幾個方法的使用及各類信道綁定啊之類的 大體流程是
獲取鏈接------建立通道-------建立通道----發佈信息
獲取鏈接------建立通道------根據知道的通道建立消費者------獲取到達消息
獲取鏈接------建立通道------建立交換機----------發佈信息
獲取鏈接------建立通道------申明隊列-------綁定交換機------定義消費者(根據隊列)------獲取到達消息
其他幾種方法無非就是多加了點關鍵字 沒有什麼差異
重點一:與spring整合
我見到的基本有兩種方式
第一種就是寫配置bean類
@Configuration public class HelloWorldConfiguration { protected final String helloWorldQueueName = "hello.world.queue"; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.13.132"); connectionFactory.setUsername("wujifu"); connectionFactory.setPassword("123456"); connectionFactory.setVirtualHost("/vhose_1"); return connectionFactory; } @Bean public AmqpAdmin amqpAdmin() { return new RabbitAdmin(connectionFactory()); } @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); //The routing key is set to the name of the queue by the broker for the default exchange. template.setRoutingKey(this.helloWorldQueueName); //Where we will synchronously receive messages from template.setDefaultReceiveQueue(this.helloWorldQueueName); return template; } @Bean // Every queue is bound to the default direct exchange public Queue helloWorldQueue() { return new Queue(this.helloWorldQueueName); } /* @Bean public Binding binding() { return declare(new Binding(helloWorldQueue(), defaultDirectExchange())); }*/ /* @Bean public TopicExchange helloExchange() { return declare(new TopicExchange("hello.world.exchange")); }*/ /* public Queue declareUniqueQueue(String namePrefix) { Queue queue = new Queue(namePrefix + "-" + UUID.randomUUID()); rabbitAdminTemplate().declareQueue(queue); return queue; } // if the default exchange isn't configured to your liking.... @Bean Binding declareP2PBinding(Queue queue, DirectExchange exchange) { return declare(new Binding(queue, exchange, queue.getName())); } @Bean Binding declarePubSubBinding(String queuePrefix, FanoutExchange exchange) { return declare(new Binding(declareUniqueQueue(queuePrefix), exchange)); } @Bean Binding declarePubSubBinding(UniqueQueue uniqueQueue, TopicExchange exchange) { return declare(new Binding(uniqueQueue, exchange)); } @Bean Binding declarePubSubBinding(String queuePrefix, TopicExchange exchange, String routingKey) { return declare(new Binding(declareUniqueQueue(queuePrefix), exchange, routingKey)); }*/ }
還有一種是配置文件的方法
<!-- 公共部分 --> <!-- 建立鏈接類 鏈接安裝好的 rabbitmq --> <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> <constructor-arg value="localhost" /> <!-- username,訪問RabbitMQ服務器的帳戶,默認是guest --> <property name="username" value="${rmq.manager.user}" /> <!-- username,訪問RabbitMQ服務器的密碼,默認是guest --> <property name="password" value="${rmq.manager.password}" /> <!-- host,RabbitMQ服務器地址,默認值"localhost" --> <property name="host" value="${rmq.ip}" /> <!-- port,RabbitMQ服務端口,默認值爲5672 --> <property name="port" value="${rmq.port}" /> <!-- channel-cache-size,channel的緩存數量,默認值爲25 --> <property name="channel-cache-size" value="50" /> <!-- cache-mode,緩存鏈接模式,默認值爲CHANNEL(單個connection鏈接,鏈接以後關閉,自動銷燬) --> <property name="cache-mode" value="CHANNEL" /> </bean> <!--或者這樣配置,connection-factory元素實際就是註冊一個org.springframework.amqp.rabbit.connection.CachingConnectionFactory實例 <rabbit:connection-factory id="connectionFactory" host="${rmq.ip}" port="${rmq.port}" username="${rmq.manager.user}" password="${rmq.manager.password}" />--> <rabbit:admin connection-factory="connectionFactory"/> <!--定義消息隊列,durable:是否持久化,若是想在RabbitMQ退出或崩潰的時候,不會失去全部的queue和消息,須要同時標誌隊列(queue)和交換機(exchange)是持久化的,即rabbit:queue標籤和rabbit:direct-exchange中的durable=true,而消息(message)默認是持久化的能夠看類org.springframework.amqp.core.MessageProperties中的屬性public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;exclusive: 僅建立者可使用的私有隊列,斷開後自動刪除;auto_delete: 當全部消費客戶端鏈接斷開後,是否自動刪除隊列 --> <rabbit:queue name="spittle.alert.queue.1" id="queue_1" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue name="spittle.alert.queue.2" id="queue_2" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue name="spittle.alert.queue.3" id="queue_3" durable="true" auto-delete="false" exclusive="false" /> <!--綁定隊列,rabbitmq的exchangeType經常使用的三種模式:direct,fanout,topic三種,咱們用direct模式,即rabbit:direct-exchange標籤,Direct交換器很簡單,若是是Direct類型,就會將消息中的RoutingKey與該Exchange關聯的全部Binding中的BindingKey進行比較,若是相等,則發送到該Binding對應的Queue中。有一個須要注意的地方:若是找不到指定的exchange,就會報錯。但routing key找不到的話,不會報錯,這條消息會直接丟失,因此此處要當心,auto-delete:自動刪除,若是爲Yes,則該交換機全部隊列queue刪除後,自動刪除交換機,默認爲false --> <rabbit:direct-exchange id="spittle.fanout" name="spittle.fanout" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="spittle.alert.queue.1" key="{alert.queue.1}"></rabbit:binding> <rabbit:binding queue="spittle.alert.queue.2" key="{alert.queue.2}"></rabbit:binding> <rabbit:binding queue="spittle.alert.queue.3" key="{alert.queue.3}"></rabbit:binding> </rabbit:bindings> </rabbit:fanout-exchange> <!-- 生產者部分 --> <!-- 發送消息的producer類,也就是生產者 --> <bean id="msgProducer" class="com.asdf.sdf.ClassA"> <!-- value中的值就是producer中的的routingKey,也就是隊列名稱,它與上面的rabbit:bindings標籤中的key必須相同 --> <property name="queueName" value="{alert.queue.1}"/> </bean> <!-- spring amqp默認的是jackson 的一個插件,目的將生產者生產的數據轉換爲json存入消息隊列,因爲fastjson的速度快於jackson,這裏替換爲fastjson的一個實現 --> <bean id="jsonMessageConverter" class="com.jy.utils.FastJsonMessageConverter"></bean> <!-- 或者配置jackson --> <!-- <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> --> <rabbit:template exchange="test-exchange" id="rabbitTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" /> <!-- 消費者部分 --> <!-- 自定義接口類 --> <bean id="testHandler" class="com.rabbit.TestHandler"></bean> <!-- 用於消息的監聽的代理類MessageListenerAdapter --> <bean id="testQueueListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter" > <!-- 類名 --> <constructor-arg ref="testHandler" /> <!-- 方法名 --> <property name="defaultListenerMethod" value="handlerTest"></property> <property name="messageConverter" ref="jsonMessageConverter"></property> </bean> <!-- 配置監聽acknowledeg="manual"設置手動應答,它可以保證即便在一個worker處理消息的時候用CTRL+C來殺掉這個worker,或者一個consumer掛了(channel關閉了、connection關閉了或者TCP鏈接斷了),也不會丟失消息。由於RabbitMQ知道沒發送ack確認消息致使這個消息沒有被徹底處理,將會對這條消息作re-queue處理。若是此時有另外一個consumer鏈接,消息會被從新發送至另外一個consumer會一直重發,直到消息處理成功,監聽容器acknowledge="auto" concurrency="30"設置發送次數,最多發送30次 --> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" concurrency="20"> <rabbit:listener queues="spittle.alert.queue.1" ref="testQueueListenerAdapter" /> <rabbit:listener queues="spittle.alert.queue.2" ref="testQueueListenerAdapter" /> <rabbit:listener queues="spittle.alert.queue.2" ref="testQueueListenerAdapter" /> </rabbit:listener-container>
其他幾種exchange
fanOut:
<!-- Fanout 扇出,顧名思義,就是像風扇吹麪粉同樣,吹獲得處都是。若是使用fanout類型的exchange,那麼routing key就不重要了。由於凡是綁定到這個exchange的queue,都會受到消息。 -->
<rabbit:fanout-exchange name="delayed_message_exchange" durable="true" auto-delete="false" id="delayed_message_exchange">
<rabbit:bindings>
<rabbit:binding queue="test_delay_queue"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!-- 發送端不是按固定的routing key發送消息,而是按字符串「匹配」發送,接收端一樣如此 --> <rabbit:topic-exchange name="message-exchange" durable="true" auto-delete="false" id="message-exchange"> <rabbit:bindings> <rabbit:binding queue="Q1" pattern="error.*.log" /> <rabbit:binding queue="Q2" pattern="error.level1.log" /> <rabbit:binding queue="Q3" pattern="error.level2.log" /> </rabbit:bindings> </rabbit:topic-exchange>
還有相關的消費者和提供者
@Resource private RabbitTemplate rabbitTemplate; private String queueName; public void sendMessage(CommonMessage msg){ try { logger.error("發送信息開始"); System.out.println(rabbitTemplate.getConnectionFactory().getHost()); //發送信息 queueName交換機,就是上面的routingKey msg.getSource() 爲 test_key rabbitTemplate.convertAndSend(queueName,msg.getSource(), msg); //若是是普通字符串消息須要先序列化,再發送消息 //rabbitTemplate.convertAndSend(queueName,msg.getSource(), SerializationUtils.serialize(msg)); logger.error("發送信息結束"); } catch (Exception e) { e.printStackTrace(); } } public void setQueueName(String queueName) { this.queueName = queueName; }
public class TestHandler { @Override public void handlerTest(CommonMessage commonMessage) { System.out.println("DetailQueueConsumer: " + new String(message.getBody())); } }
如今說說我我的的理解
其實思路與其餘幾個工具或者說方法 的用法都是同樣的 在配置文件(配置類中配置好類工廠 模板 而後再函數中直接拿bean或者模板直接用就行了)
接下來是與springboot的整合
與springboot的整合我用到了父子模塊的用法 如今記錄一下,否則會忘!就是建立一個空的springboot項目,而後右擊添加新項目 新項目的存儲位置必定要處於父項目的目錄下
而後具體整合也和spring差很少 甚至能夠說是比spring還要簡單一點
1.在配置文件中寫上屬性
2.寫一個配置類
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Author : JCccc * @CreateTime : 2019/9/3 * @Description : **/ @Configuration public class DirectRabbitConfig { //隊列 起名:TestDirectQueue @Bean public Queue TestDirectQueue() { return new Queue("TestDirectQueue",true); } //Direct交換機 起名:TestDirectExchange @Bean DirectExchange TestDirectExchange() { return new DirectExchange("TestDirectExchange"); } //綁定 將隊列和交換機綁定, 並設置用於匹配鍵:TestDirectRouting @Bean Binding bindingDirect() { return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting"); } }
簡單接口進行消息的推送
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.Map; import java.util.UUID; /** * @Author : JCccc * @CreateTime : 2019/9/3 * @Description : **/ @RestController public class SendMessageController { @Autowired RabbitTemplate rabbitTemplate; //使用RabbitTemplate,這提供了接收/發送等等方法 @GetMapping("/sendDirectMessage") public String sendDirectMessage() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "test message, hello!"; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String,Object> map=new HashMap<>(); map.put("messageId",messageId); map.put("messageData",messageData); map.put("createTime",createTime); //將消息攜帶綁定鍵值:TestDirectRouting 發送到交換機TestDirectExchange rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map); return "ok"; } }
而後是消費者方的寫法 也是同樣的 此處必須注意兩邊都同時得有配置文件 由於比較分紅了兩個模塊了
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Author : JCccc * @CreateTime : 2019/9/3 * @Description : **/ @Configuration public class DirectRabbitConfig { //隊列 起名:TestDirectQueue @Bean public Queue TestDirectQueue() { return new Queue("TestDirectQueue",true); } //Direct交換機 起名:TestDirectExchange @Bean DirectExchange TestDirectExchange() { return new DirectExchange("TestDirectExchange"); } //綁定 將隊列和交換機綁定, 並設置用於匹配鍵:TestDirectRouting @Bean Binding bindingDirect() { return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting"); } }
而後建立消費類
@Component @RabbitListener(queues = "TestDirectQueue")//監聽的隊列名稱 TestDirectQueue public class DirectReceiver { @RabbitHandler public void process(Map testMessage) { System.out.println("DirectReceiver消費者收到消息 : " + testMessage.toString()); } }
這裏面消費者是用、
@EnableRabbit和@Configuration一塊兒使用,能夠加在類或者方法上,這個註解開啓了容器對註冊的bean的@RabbitListener檢查。
@RabbitListener 和 @RabbitHandler結合使用,不一樣類型的消息使用不一樣的方法來處理。
其他模式的使用也差很少 就是稍微多點配置
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Author : JCccc * @CreateTime : 2019/9/3 * @Description : **/ @Configuration public class TopicRabbitConfig { //綁定鍵 public final static String man = "topic.man"; public final static String woman = "topic.woman"; @Bean public Queue firstQueue() { return new Queue(TopicRabbitConfig.man); } @Bean public Queue secondQueue() { return new Queue(TopicRabbitConfig.woman); } @Bean TopicExchange exchange() { return new TopicExchange("topicExchange"); } //將firstQueue和topicExchange綁定,並且綁定的鍵值爲topic.man //這樣只要是消息攜帶的路由鍵是topic.man,纔會分發到該隊列 @Bean Binding bindingExchangeMessage() { return BindingBuilder.bind(firstQueue()).to(exchange()).with(man); } //將secondQueue和topicExchange綁定,並且綁定的鍵值爲用上通配路由鍵規則topic.# // 這樣只要是消息攜帶的路由鍵是以topic.開頭,都會分發到該隊列 @Bean Binding bindingExchangeMessage2() { return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#"); } }
workqueue方法就是多建立幾個隊列 多綁定幾回就好了
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Author : JCccc * @CreateTime : 2019/9/3 * @Description : **/ @Configuration public class FanoutRabbitConfig { /** * 建立三個隊列 :fanout.A fanout.B fanout.C * 將三個隊列都綁定在交換機 fanoutExchange 上 * 由於是扇型交換機, 路由鍵無需配置,配置也不起做用 */ @Bean public Queue queueA() { return new Queue("fanout.A"); } @Bean public Queue queueB() { return new Queue("fanout.B"); } @Bean public Queue queueC() { return new Queue("fanout.C"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean Binding bindingExchangeA() { return BindingBuilder.bind(queueA()).to(fanoutExchange()); } @Bean Binding bindingExchangeB() { return BindingBuilder.bind(queueB()).to(fanoutExchange()); } @Bean Binding bindingExchangeC() { return BindingBuilder.bind(queueC()).to(fanoutExchange()); } }
到此 具體的入門級用法都已經講完了 具體的模型我會傳到github上,接下來還有一些重點關於消息確認的知識(能夠用來確保消息不丟失)
RabbitMQ的消息確認有兩種。
一種是消息發送確認。這種是用來確認生產者將消息發送給交換器,交換器傳遞給隊列的過程當中,消息是否成功投遞。發送確認分爲兩步,一是確認是否到達交換器,二是確認是否到達隊列。
第二種是消費接收確認。這種是確認消費者是否成功消費了隊列中的消息。
(1)ConfirmCallback
經過實現ConfirmCallBack接口,消息發送到交換器Exchange後觸發回調。
使用該功能須要開啓確認,spring-boot中配置以下:
spring.rabbitmq.publisher-confirms = true
(2)ReturnCallback
經過實現ReturnCallback接口,若是消息從交換器發送到對應隊列失敗時觸發(好比根據發送消息時指定的routingKey找不到隊列時會觸發)
使用該功能須要開啓確認,spring-boot中配置以下:
spring.rabbitmq.publisher-returns = true
根據前面的知識(深刻了解RabbitMQ工做原理及簡單使用、Rabbit的幾種工做模式介紹與實踐)咱們知道,若是要保證消息的可靠性,須要對消息進行持久化處理,然而消息持久化除了須要代碼的設置以外,還有一個重要步驟是相當重要的,那就是保證你的消息順利進入Broker(代理服務器),如圖所示:
正常狀況下,若是消息通過交換器進入隊列就能夠完成消息的持久化,但若是消息在沒有到達broker以前出現意外,那就形成消息丟失,有沒有辦法能夠解決這個問題?
RabbitMQ有兩種方式來解決這個問題:
事務的實現主要是對信道(Channel)的設置,主要的方法有三個:
channel.txSelect()聲明啓動事務模式;
channel.txComment()提交事務;
channel.txRollback()回滾事務;
從上面的能夠看出事務都是以tx開頭的,tx應該是transaction extend(事務擴展模塊)的縮寫,若是有準確的解釋歡迎在博客下留言。
咱們來看具體的代碼實現:
// 建立鏈接 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(config.UserName); factory.setPassword(config.Password); factory.setVirtualHost(config.VHost); factory.setHost(config.Host); factory.setPort(config.Port); Connection conn = factory.newConnection(); // 建立信道 Channel channel = conn.createChannel(); // 聲明隊列 channel.queueDeclare(_queueName, true, false, false, null); String message = String.format("時間 => %s", new Date().getTime()); try { channel.txSelect(); // 聲明事務 // 發送消息 channel.basicPublish("", _queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); channel.txCommit(); // 提交事務 } catch (Exception e) { channel.txRollback(); } finally { channel.close(); conn.close(); }
反正就是提交失敗就回滾 可是效率很低
從上面能夠看出,非事務模式的性能是事務模式的性能高149倍,個人電腦測試是這樣的結果,不一樣的電腦配置略有差別,但結論是同樣的,事務模式的性能要差不少,那有沒有既能保證消息的可靠性又能兼顧性能的解決方案呢?那就是接下來要講的Confirm發送方確認模式。
Confirm發送方確認模式使用和事務相似,也是經過設置Channel進行發送方確認的。
Confirm的三種實現方式:
方式一:channel.waitForConfirms()普通發送方確認模式;
方式二:channel.waitForConfirmsOrDie()批量確認模式;
方式三:channel.addConfirmListener()異步監聽發送方確認模式;
// 建立鏈接 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(config.UserName); factory.setPassword(config.Password); factory.setVirtualHost(config.VHost); factory.setHost(config.Host); factory.setPort(config.Port); Connection conn = factory.newConnection(); // 建立信道 Channel channel = conn.createChannel(); // 聲明隊列 channel.queueDeclare(config.QueueName, false, false, false, null); // 開啓發送方確認模式 channel.confirmSelect(); String message = String.format("時間 => %s", new Date().getTime()); channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8")); if (channel.waitForConfirms()) { System.out.println("消息發送成功" ); }
看代碼能夠知道,咱們只須要在推送消息以前,channel.confirmSelect()聲明開啓發送方確認模式,再使用channel.waitForConfirms()等待消息被服務器確認便可。
// 建立鏈接 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(config.UserName); factory.setPassword(config.Password); factory.setVirtualHost(config.VHost); factory.setHost(config.Host); factory.setPort(config.Port); Connection conn = factory.newConnection(); // 建立信道 Channel channel = conn.createChannel(); // 聲明隊列 channel.queueDeclare(config.QueueName, false, false, false, null); // 開啓發送方確認模式 channel.confirmSelect(); for (int i = 0; i < 10; i++) { String message = String.format("時間 => %s", new Date().getTime()); channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8")); } channel.waitForConfirmsOrDie(); //直到全部信息都發布,只要有一個未確認就會IOException System.out.println("所有執行完成");
以上代碼能夠看出來channel.waitForConfirmsOrDie(),使用同步方式等全部的消息發送以後纔會執行後面代碼,只要有一個消息未被確認就會拋出IOException異常。
// 建立鏈接 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(config.UserName); factory.setPassword(config.Password); factory.setVirtualHost(config.VHost); factory.setHost(config.Host); factory.setPort(config.Port); Connection conn = factory.newConnection(); // 建立信道 Channel channel = conn.createChannel(); // 聲明隊列 channel.queueDeclare(config.QueueName, false, false, false, null); // 開啓發送方確認模式 channel.confirmSelect(); for (int i = 0; i < 10; i++) { String message = String.format("時間 => %s", new Date().getTime()); channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8")); } //異步監聽確認和未確認的消息 channel.addConfirmListener(new ConfirmListener() { @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("未確認消息,標識:" + deliveryTag); } @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println(String.format("已確認消息,標識:%d,多個消息:%b", deliveryTag, multiple)); } });
異步模式的優勢,就是執行效率高,不須要等待消息執行完,只須要監聽消息便可,以上異步返回的信息以下:
能夠看出,代碼是異步執行的,消息確認有多是批量確認的,是否批量確認在於返回的multiple的參數,此參數爲bool值,若是true表示批量執行了deliveryTag這個值之前的全部消息,若是爲false的話表示單條確認。
最好補充一點:因爲我本身整理的可能不夠全面 因此我放幾個連接
https://blog.csdn.net/qq_35387940/article/details/100514134 https://www.cnblogs.com/nizuimeiabc1/p/9608763.html 整合篇
https://blog.csdn.net/u013256816/article/details/55515234 消息確認機制