前面兩篇博文,分別介紹了RabbitMq的核心知識點,以及整合SpringBoot的demo應用;接下來也該進入正題,看一下SpringBoot的環境下,如何玩轉rabbitmqgit
本篇內容主要爲消息發送,包括如下幾點github
RabbitTemplate
發送消息的基本使用姿式AbstractMessageConverter
<!-- more -->spring
咱們藉助SpringBoot 2.2.1.RELEASE
+ rabbitmq 3.7.5
來完整項目搭建與測試json
項目pom.xml以下數組
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
配置文件application.yml
內容以下springboot
spring: rabbitmq: virtual-host: / username: admin password: admin port: 5672 host: 127.0.0.1
經過前面rabbitmq的知識點學習,咱們能夠知道發送端的主要邏輯 「將消息發送給exchange,而後根據不一樣的策略分發給對應的queue」app
本篇博文主要討論的是消息發送,爲了後續的實例演示,咱們定義一個topic模式的exchange,並綁定一個的queue;(由於對發送端而言,不一樣的exchange類型,對發送端的使用姿式影響並不大,有影響的是消費者)ide
public class MqConstants { public static final String exchange = "topic.e"; public static final String routing = "r"; public final static String queue = "topic.a"; } @Configuration public class MqConfig { @Bean public TopicExchange topicExchange() { return new TopicExchange(MqConstants.exchange); } @Bean public Queue queue() { // 建立一個持久化的隊列 return new Queue(MqConstants.queue, true); } @Bean public Binding binding(TopicExchange topicExchange, Queue queue) { return BindingBuilder.bind(queue).to(topicExchange).with(MqConstants.routing); } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); } }
消息發送,主要藉助的是RabbitTemplate#convertAndSend
方法來實現,一般狀況下,咱們直接使用便可spring-boot
@Service public class BasicPublisher { @Autowired private RabbitTemplate rabbitTemplate; /** * 通常的用法,推送消息 * * @param ans * @return */ private String publish2mq1(String ans) { String msg = "Durable msg = " + ans; System.out.println("publish: " + msg); rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg); return msg; } }
上面的核心點就一行rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg);
請注意
經過上面的方式,發送的消息默認是持久化的,當持久化的消息,分發到持久化的隊列時,會有消息的落盤操做;
在某些場景下,咱們對消息的完整性要求並無那麼嚴格,反而更在乎mq的性能,丟失一些數據也能夠接受;這個時候咱們可能須要定製一下發送的消息屬性(好比將消息設置爲非持久化的)
下面提供兩種姿式,推薦第二種
/** * 推送一個非持久化的消息,這個消息推送到持久化的隊列時,mq重啓,這個消息會丟失;上面的持久化消息不會丟失 * * @param ans * @return */ private String publish2mq2(String ans) { MessageProperties properties = new MessageProperties(); properties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); Message message = rabbitTemplate.getMessageConverter().toMessage("NonDurable = " + ans, properties); rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, message); System.out.println("publish: " + message); return message.toString(); } private String publish2mq3(String ans) { String msg = "Define msg = " + ans; rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setHeader("ta", "測試"); message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); return message; } }); return msg; }
注意
MessagePostProcessor
來定製消息屬性MessagePostProcessor
對象,請定義一個通用的對象,能複用就複用經過查看rabbitTemplate#convertAndSend
的接口定義,咱們知道發送的消息能夠是Object類型,那麼是否是意味着任何對象,均可以推送給mq呢?
下面是一個測試case
private String publish2mq4(String ans) { NonSerDO nonSerDO = new NonSerDO(18, ans); System.out.println("publish: " + nonSerDO); rabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, nonSerDO); return nonSerDO.toString(); } @Data public static class NonSerDO { private Integer age; private String name; public NonSerDO(int age, String name) { this.age = age; this.name = name; } }
當咱們調用上面的publish2mq4
方法時,並不會是想象中的直接成功,相反拋出一個參數類型異常
爲何會出現這個問題呢?從堆棧分析,咱們知道RabbitTemplate默認是利用SimpleMessageConverter
來實現封裝Message邏輯的,核心代碼爲
// 下面代碼來自 org.springframework.amqp.support.converter.SimpleMessageConverter#createMessage @Override protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { byte[] bytes = null; if (object instanceof byte[]) { bytes = (byte[]) object; messageProperties.setContentType(MessageProperties.CONTENT_TYPE_BYTES); } else if (object instanceof String) { try { bytes = ((String) object).getBytes(this.defaultCharset); } catch (UnsupportedEncodingException e) { throw new MessageConversionException( "failed to convert to Message content", e); } messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN); messageProperties.setContentEncoding(this.defaultCharset); } else if (object instanceof Serializable) { try { bytes = SerializationUtils.serialize(object); } catch (IllegalArgumentException e) { throw new MessageConversionException( "failed to convert to serialized Message content", e); } messageProperties.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT); } if (bytes != null) { messageProperties.setContentLength(bytes.length); return new Message(bytes, messageProperties); } throw new IllegalArgumentException(getClass().getSimpleName() + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName()); }
上面邏輯很明確的指出了,只接受byte數組,string字符串,可序列化對象(這裏使用的是jdk的序列化方式來實現對象和byte數組之間的互轉)
天然而然的,咱們會想有沒有其餘的MessageConverter
來友好的支持任何類型的對象
接下來咱們但願經過自定義一個json序列化方式的MessageConverter來解決上面的問題
一個比較簡單的實現(利用FastJson來實現序列化/反序列化)
public static class SelfConverter extends AbstractMessageConverter { @Override protected Message createMessage(Object object, MessageProperties messageProperties) { messageProperties.setContentType("application/json"); return new Message(JSON.toJSONBytes(object), messageProperties); } @Override public Object fromMessage(Message message) throws MessageConversionException { return JSON.parse(message.getBody()); } }
從新定義一個rabbitTemplate
,並設置它的消息轉換器爲自定義的SelfConverter
@Bean public RabbitTemplate jsonRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(new SelfConverter()); return rabbitTemplate; }
而後再次測試一下
@Service public class JsonPublisher { @Autowired private RabbitTemplate jsonRabbitTemplate; private String publish1(String ans) { Map<String, Object> msg = new HashMap<>(8); msg.put("msg", ans); msg.put("type", "json"); msg.put("version", 123); System.out.println("publish: " + msg); jsonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg); return msg.toString(); } private String publish2(String ans) { BasicPublisher.NonSerDO nonSerDO = new BasicPublisher.NonSerDO(18, "SELF_JSON" + ans); System.out.println("publish: " + nonSerDO); jsonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, nonSerDO); return nonSerDO.toString(); } }
mq內接收到的推送消息以下
上面雖然實現了Json格式的消息轉換,可是比較簡陋;並且這麼基礎通用的功能,按照Spring全家桶的一向做風,確定是有現成可用的,沒錯,這就是Jackson2JsonMessageConverter
因此咱們的使用姿式也能夠以下
//定義RabbitTemplate @Bean public RabbitTemplate jacksonRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); return rabbitTemplate; } // 測試代碼 @Autowired private RabbitTemplate jacksonRabbitTemplate; private String publish3(String ans) { Map<String, Object> msg = new HashMap<>(8); msg.put("msg", ans); msg.put("type", "jackson"); msg.put("version", 456); System.out.println("publish: " + msg); jacksonRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg); return msg.toString(); }
下面是經過Jackson序列化消息後的內容,與咱們自定義的有一些不一樣,多了headers
和content_encoding
本篇博文主要的知識點以下
RabbitTemplate#convertAndSend
來實現消息分發MessagePostProcessor
來自定義消息的屬性(請注意默認投遞的消息時持久化的)SimpleMessageConverter
,只支持分發byte數組,字符串和可序列化的對象;不知足上面三個條件的方法調用會拋異常MessageConverter
接口,來定義本身的消息封裝類,解決上面的問題在RabbitMq的知識點博文中,明確提到了,爲了確保消息被brocker正確接收,提供了消息確認機制和事務機制兩種case,那麼若是須要使用這兩種方式,消息生產者須要怎麼作呢?
限於篇幅,下一篇博文將帶來在消息確認機制/事務機制下的發送消息使用姿式
系列博文
項目源碼
盡信書則不如,以上內容,純屬一家之言,因我的能力有限,不免有疏漏和錯誤之處,如發現 bug 或者有更好的建議,歡迎批評指正,不吝感激
下面一灰灰的我的博客,記錄全部學習和工做中的博文,歡迎你們前去逛逛