1. 🔍環境配置
第一節咱們先來搞一下環境的配置,上一篇中咱們已經引入了自動配置的包,咱們既然使用了自動配置的方式,那RabbitMQ的鏈接信息咱們直接放在配置文件中就好了,就像咱們須要用到JDBC鏈接的時候去配置一下DataSource同樣。java
如圖所示,咱們只須要指明一下鏈接的IP+端口號和用戶名密碼就好了,這裏我用的是默認的用戶名與密碼,不寫的話默認也都是guest,端口號也是默認5672。json
主要咱們須要看一下手動確認消息的配置,須要配置成manual纔是手動確認,往後還會有其餘的配置項,眼下咱們配置這一個就能夠了。數組
接下來咱們要配置一個Queue,上一篇中咱們往一個名叫erduo的隊列中發送消息,當時是咱們手動定義的此隊列,這裏咱們也須要手動配置,聲明一個Bean就能夠了。數據結構
@Configuration public class RabbitmqConfig { @Bean public Queue erduo() { // 其三個參數:durable exclusive autoDelete // 通常只設置一下持久化便可 return new Queue("erduo",true); } }
就這麼簡單聲明一下就能夠了,固然了RabbitMQ畢竟是一個獨立的組件,若是你在RabbitMQ中經過其餘方式已經建立過一個名叫erduo的隊列了,你這裏也能夠不聲明,這裏起到的一個效果就是若是你沒有這個隊列,會按照你聲明的方式幫你建立這個隊列。app
配置完環境以後,咱們就能夠以SpringBoot的方式來編寫生產者和消費者了。工具
2. 📕生產者與RabbitTemplate
和上一篇的節奏同樣,咱們先來編寫生產者,不過此次我要引入一個新的工具:RabbitTemplate。測試
聽它的這個名字就知道,又是一個拿來即用的工具類,Spring家族這點就很舒服,什麼東西都給你封裝一遍,讓你用起來更方便更順手。ui
RabbitTemplate實現了標準AmqpTemplate接口,功能大體能夠分爲發送消息和接受消息。編碼
咱們這裏是在生產者中來用,主要就是使用它的發送消息功能:send和convertAndSend方法。spa
// 發送消息到默認的Exchange,使用默認的routing key void send(Message message) throws AmqpException; // 使用指定的routing key發送消息到默認的exchange void send(String routingKey, Message message) throws AmqpException; // 使用指定的routing key發送消息到指定的exchange void send(String exchange, String routingKey, Message message) throws AmqpException;
send方法是發送byte數組的數據的模式,這裏表明消息內容的對象是Message對象,它的構造方法就是傳入byte數組數據,因此咱們須要把咱們的數據轉成byte數組而後構形成一個Message對象再進行發送。
// Object類型,能夠傳入POJO void convertAndSend(Object message) throws AmqpException; void convertAndSend(String routingKey, Object message) throws AmqpException; void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
convertAndSend方法是能夠傳入POJO對象做爲參數,底層是有一個MessageConverter幫咱們自動將數據轉換成byte類型或String或序列化類型。
因此這裏支持的傳入對象也只有三種:byte類型,String類型和實現了Serializable接口的POJO。
介紹完了,咱們能夠看一下代碼:
@Slf4j @Component("rabbitProduce") public class RabbitProduce { @Autowired private RabbitTemplate rabbitTemplate; public void send() { String message = "Hello 我是做者和耳朵,歡迎關注我。" + LocalDateTime.now().toString(); System.out.println("Message content : " + message); // 指定消息類型 MessageProperties props = MessagePropertiesBuilder.newInstance() .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN).build(); rabbitTemplate.send(Producer.QUEUE_NAME,new Message(message.getBytes(StandardCharsets.UTF_8),props)); System.out.println("消息發送完畢。"); } public void convertAndSend() { User user = new User(); System.out.println("Message content : " + user); rabbitTemplate.convertAndSend(Producer.QUEUE_NAME,user); System.out.println("消息發送完畢。"); } }
這裏我特地寫明瞭兩個例子,一個用來測試send,另外一個用來測試convertAndSend。
send方法裏咱們看下來和以前的代碼是幾乎同樣的,定義一個消息,而後直接send,可是這個構造消息的構造方法可能比咱們想的要多一個參數,咱們原來講的只要把數據轉成二進制數組放進去便可,如今看來還要多放一個參數了。
MessageProperties,是的咱們須要多放一個MessageProperties對象,從他的名字咱們也能夠看出它的功能就是附帶一些參數,可是某些參數是少不了的,不帶不行。
好比個人代碼這裏就是設置了一下消息的類型,消息的類型有不少種能夠是二進制類型,文本類型,或者序列化類型,JSON類型,我這裏設置的就是文本類型,指定類型是必須的,也能夠爲咱們拿到消息以後要將消息轉換成什麼樣的對象提供一個參考。
convertAndSend方法就要簡單太多,這裏我放了一個User對象拿來測試用,直接指定隊列而後放入這個對象便可。
Tips:User必須實現Serializable接口,否則的話調用此方法的時候會拋出IllegalArgumentException異常。
代碼完成以後咱們就能夠調用了,這裏我寫一個測試類進行調用:
@SpringBootTest public class RabbitProduceTest { @Autowired private RabbitProduce rabbitProduce; @Test public void sendSimpleMessage() { rabbitProduce.send(); rabbitProduce.convertAndSend(); } }
效果以下圖~
同時在控制檯使用命令rabbitmqctl.bat list_queues查看隊列-erduo如今的狀況:
如此一來,咱們的生產者測試就算完成了,如今消息隊列裏兩條消息了,並且消息類型確定不同,一個是咱們設置的文本類型,一個是自動設置的序列化類型。
3. 📗消費者與RabbitListener
既然隊列裏面已經有消息了,接下來咱們就要看咱們該如何經過新的方式拿到消息並消費與確認了。
消費者這裏咱們要用到@RabbitListener來幫咱們拿到指定隊列消息,它的用法很簡單也很複雜,咱們能夠先來講簡單的方式,直接放到方法上,指定監聽的隊列就好了。
@Slf4j @Component("rabbitConsumer") public class RabbitConsumer { @RabbitListener(queues = Producer.QUEUE_NAME) public void onMessage(Message message, Channel channel) throws Exception { System.out.println("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); System.out.println("消息已確認"); } }
這段代碼就表明onMessage方法會處理erduo(Producer.QUEUE_NAME是常量字符串"erduo")隊列中的消息。
咱們能夠看到這個方法裏面有兩個參數,Message和Channel,若是用不到Channel能夠不寫此參數,可是Message消息必定是要的,它表明了消息自己。
咱們能夠想一想,咱們的程序從RabbitMQ之中拉回一條條消息以後,要以怎麼樣的方式展現給咱們呢?
沒錯,就是封裝爲一個個Message對象,這裏面放入了一條消息的全部信息,數據結構是什麼樣一會我一run你就能看到了。
同時這裏咱們使用Channel作一個消息確認的操做,這裏的DeliveryTag表明的是這個消息在隊列中的序號,這個信息存放在MessageProperties中。
4. 📖SpringBoot 啓動!
編寫完生產者和消費者,同時已經運行過生產者往消息隊列裏面放了兩條信息,接下來咱們能夠直接啓動消息,查看消費狀況:
在我紅色框線標記的地方能夠看到,由於咱們有了消費者因此項目啓動後先和RabbitMQ創建了一個鏈接進行監聽隊列。
隨後就開始消費咱們隊列中的兩條消息:
第一條信息是contentType=text/plain類型,因此直接就在控制檯上打印出了具體內容。
第二條信息是contentType=application/x-java-serialized-object,在打印的時候只打印了一個內存地址+字節大小。
無論怎麼說,數據咱們是拿到了,也就是表明咱們的消費是沒有問題的,同時也都進行了消息確認操做,從數據上看,整個消息能夠分爲兩部分:body和MessageProperties。
咱們能夠單獨使用一個註解拿到這個body的內容 - @Payload
@RabbitListener(queues = Producer.QUEUE_NAME) public void onMessage(@Payload String body, Channel channel) throws Exception { System.out.println("Message content : " + body); }
也能夠單獨使用一個註解拿到MessageProperties的headers屬性,headers屬性在截圖裏也能夠看到,只不過是個空的 - @Headers。
@RabbitListener(queues = Producer.QUEUE_NAME) public void onMessage(@Payload String body, @Headers Map<String,Object> headers) throws Exception { System.out.println("Message content : " + body); System.out.println("Message headers : " + headers); }
這兩個註解都算是擴展知識,我仍是更喜歡直接拿到所有,全都要!!!
上面咱們已經完成了消息的發送與消費,整個過程咱們能夠再次回想一下,一切都和我畫的這張圖上同樣的軌跡:
只不過咱們一直沒有指定Exchage一直使用的默認路由,但願你們好好記住這張圖。
5. 📘@RabbitListener與@RabbitHandler
下面再來補一些知識點,有關@RabbitListener與@RabbitHandler。
@RabbitListener上面咱們已經簡單的進行了使用,稍微擴展一下它實際上是能夠監聽多個隊列的,就像這樣:
@RabbitListener(queues = { "queue1", "queue2" }) public void onMessage(Message message, Channel channel) throws Exception { System.out.println("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); System.out.println("消息已確認"); }
還有一些其餘的特性如綁定之類的,這裏再也不贅述由於太硬編碼了通常用不上。
下面來講說這節要主要講的一個特性:@RabbitListener和@RabbitHandler的搭配使用。
前面咱們沒有提到,@RabbitListener註解實際上是能夠註解在類上的,這個註解在類上標誌着這個類監聽某個隊列或某些隊列。
這兩個註解的搭配使用就要讓@RabbitListener註解在類上,而後用@RabbitHandler註解在方法上,根據方法參數的不一樣自動識別並去消費,寫個例子給你們看一看更直觀一些。
@Slf4j @Component("rabbitConsumer") @RabbitListener(queues = Producer.QUEUE_NAME) public class RabbitConsumer { @RabbitHandler public void onMessage(@Payload String message){ System.out.println("Message content : " + message); } @RabbitHandler public void onMessage(@Payload User user) { System.out.println("Message content : " + user); } }
你們能夠看看這個例子,咱們先用@RabbitListener監聽erduo隊列中的消息,而後使用@RabbitHandler註解了兩個方法。
- 第一個方法的body類型是String類型,這就表明着這個方法只能處理文本類型的消息。
- 第二個方法的body類型是User類型,這就表明着這個方法只能處理序列化類型且爲User類型的消息。
這兩個方法正好對應着咱們第二節中測試類會發送的兩種消息,因此咱們往RabbitMQ中發送兩條測試消息,用來測試這段代碼,看看效果:
都在控制檯上如常打印了,若是@RabbitHandler註解的方法中沒有一個的類型能夠和你消息的類型對的上,好比消息都是byte數組類型,這裏沒有對應的方法去接收,系統就會在控制檯不斷的報錯,若是你出現這個狀況就證實你類型寫的不正確。
假設你的erduo隊列中會出現三種類型的消息:byte,文本和序列化,那你就必需要有對應的處理這三種消息的方法,否則消息發過來的時候就會由於沒法正確轉換而報錯。
並且使用了@RabbitHandler註解以後就不能再和以前同樣使用Message作接收類型。
@RabbitHandler public void onMessage(Message message, Channel channel) throws Exception { System.out.println("Message content : " + message); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); System.out.println("消息已確認"); }
這樣寫的話會報類型轉換異常的,因此兩者選其一。
同時上文個人@RabbitHandler沒有進行消息確認,你們能夠本身試一下進行消息確認。
6. 📙消息的序列化轉換
經過上文咱們已經知道,能被自動轉換的對象只有byte[]、String、java序列化對象(實現了Serializable接口的對象),可是並非全部的Java對象都會去實現Serializable接口,並且序列化的過程當中使用的是JDK自帶的序列化方法,效率低下。
因此咱們更廣泛的作法是:使用Jackson先將數據轉換成JSON格式發送給RabbitMQ,再接收消息的時候再用Jackson將數據反序列化出來。
這樣作能夠完美解決上面的痛點:消息對象既沒必要再去實現Serializable接口,也有比較高的效率(Jackson序列化效率業界應該是最好的了)。
默認的消息轉換方案是消息轉換頂層接口-MessageConverter的一個子類:SimpleMessageConverter,咱們若是要換到另外一個消息轉換器只須要替換掉這個轉換器就好了。
上圖是MessageConverter結構樹的結構樹,能夠看到除了SimpleMessageConverter以外還有一個Jackson2JsonMessageConverter,咱們只須要將它定義爲Bean,就能夠直接使用這個轉換器了。
@Bean public MessageConverter jackson2JsonMessageConverter() { return new Jackson2JsonMessageConverter(jacksonObjectMapper); }
這樣就能夠了,這裏的jacksonObjectMapper能夠不傳入,可是默認的ObjectMapper方案對JDK8的時間日期序列化會不太友好,具體能夠參考個人上一篇文章:從LocalDateTime序列化探討全局一致性序列化,總的來講就是定義了本身的ObjectMapper。
同時爲了接下來測試方便,我又定義了一個專門測試JSON序列化的隊列:
@Bean public Queue erduoJson() { // 其三個參數:durable exclusive autoDelete // 通常只設置一下持久化便可 return new Queue("erduo_json",true); }
如此以後就能夠進行測試了,先是生產者代碼:
public void sendObject() { Client client = new Client(); System.out.println("Message content : " + client); rabbitTemplate.convertAndSend(RabbitJsonConsumer.JSON_QUEUE,client); System.out.println("消息發送完畢。"); }
我又從新定義了一個Client對象,它和以前測試使用的User對象成員變量都是同樣的,不同的是它沒有實現Serializable接口。
同時爲了保留以前的測試代碼,我又新建了一個RabbitJsonConsumer,用於測試JSON序列化的相關消費代碼,裏面定義了一個靜態變量:JSON_QUEUE = "erduo_json";
因此這段代碼是將Client對象做爲消息發送到"erduo_json"隊列中去,隨後咱們在測試類中run一下進行一次發送。
緊着是消費者代碼:
@Slf4j @Component("rabbitJsonConsumer") @RabbitListener(queues = RabbitJsonConsumer.JSON_QUEUE) public class RabbitJsonConsumer { public static final String JSON_QUEUE = "erduo_json"; @RabbitHandler public void onMessage(Client client, @Headers Map<String,Object> headers, Channel channel) throws Exception { System.out.println("Message content : " + client); System.out.println("Message headers : " + headers); channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG),false); System.out.println("消息已確認"); } }
有了上文的經驗以後,這段代碼理解起來也是很簡單了吧,同時給出了上一節沒寫的如何在@RabbitHandler模式下進行消息簽收。
咱們直接來看看效果:
在打印的Headers裏面,日後翻能夠看到contentType=application/json,這個contentType是代表了消息的類型,這裏正是說明咱們新的消息轉換器生效了,將全部消息都轉換成了JSON類型。
後記
這兩篇講完了RabbitMQ的基本收發消息,包括手動配置和自動配置的兩種方式,這些你們仔細研讀以後應該會對RabbitMQ收發消息沒什麼疑問了~
不過咱們一直以來發消息時都是使用默認的交換機,下篇將會講述一下RabbitMQ的幾種交換機類型,以及其使用方式。