RabbitAdmin類能夠很好的才注意RabbitMQ,在Spring中直接進行諸如便可。html
注意:java
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.cp</groupId> <artifactId>rabbitmq-spring</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>rabbitmq-spring</name> <description>rabbitmq-spring</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.14.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
@Configuration @ComponentScan({"com.cp.spring.*"}) public class RabbitMQConfig { //至關於<Bean id="connectionFactory"></Bean> @Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses("127.0.0.1:5672"); connectionFactory.setUsername("user_cp"); connectionFactory.setPassword("123456"); connectionFactory.setVirtualHost("/vhost_cp"); return connectionFactory; } //形參名稱要與bean的方法名保持一致 @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } }
@RunWith(SpringRunner.class) @SpringBootTest public class ApplicationTests { @Test public void contextLoads() { } @Autowired private RabbitAdmin rabbitAdmin; @Test public void testAdmin() throws Exception { //直連監聽 rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false)); rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false)); rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false)); rabbitAdmin.declareQueue(new Queue("test.direct.queue", false)); rabbitAdmin.declareQueue(new Queue("test.topic.queue", false)); rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false)); //第一個參數:具體的隊列 第二個參數:綁定的類型 第三個參數:交換機 第四個參數:路由key 第五個參數:arguments 參數 rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE, "test.direct", "direct", new HashMap<>())); //BindingBuilder 鏈式編程 rabbitAdmin.declareBinding( BindingBuilder .bind(new Queue("test.topic.queue", false)) //直接建立隊列 .to(new TopicExchange("test.topic", false, false)) //直接建立交換機 創建關聯關係 .with("user.#")); //指定路由Key rabbitAdmin.declareBinding( BindingBuilder .bind(new Queue("test.fanout.queue", false)) .to(new FanoutExchange("test.fanout", false, false))); //清空隊列數據 rabbitAdmin.purgeQueue("test.topic.queue", false); } }
經過以上代碼,能夠自行測試一下結果。mysql
實現了InitializingBean
接口,代表在Bean配置加載完後再加載RabbitAdmin配置。找到afterPropertiesSet()方法中最要的initialize()初始化方法。git
this.applicationContext.getBeansOfType(Collection.class, false, false).values()
能夠看到Exchange、Queue、Binding都是從Spring容器中獲取三種類型,加載到上方定義的contextExchanges、contextQueues、contextBindings三種容器中。
後續的源碼中,也能夠看出經過篩選Spring容器中RabbitMQ的信息以後,再去創建RabbitMQ服務器的鏈接。主要經過Spring以@Bean的方式,將配置加載到Spring容器以後,再從容器中獲取相關信息,再去創建鏈接。github
-使用SpringAMQP去聲明,就須要使用SpringAMQP的以下模式,即聲明@Bean方式面試
@Configuration @ComponentScan({"com.cp.spring.*"}) public class RabbitMQConfig { //至關於<Bean id="connectionFactory"></Bean> @Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses("127.0.0.1:5672"); connectionFactory.setUsername("user_cp"); connectionFactory.setPassword("123456"); connectionFactory.setVirtualHost("/vhost_cp"); return connectionFactory; } //形參名稱要與bean的方法名保持一致 @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } /** * 針對消費者配置 * 1. 設置交換機類型 * 2. 將隊列綁定到交換機 FanoutExchange: 將消息分發到全部的綁定隊列,無routingkey的概念 HeadersExchange :經過添加屬性key-value匹配 DirectExchange:按照routingkey分發到指定隊列 TopicExchange:多關鍵字匹配 */ @Bean public TopicExchange exchange001() { return new TopicExchange("topic001", true, false); } @Bean public Queue queue001() { return new Queue("queue001", true); //隊列持久 } @Bean public Binding binding001() { return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*"); } @Bean public TopicExchange exchange002() { return new TopicExchange("topic002", true, false); } @Bean public Queue queue002() { return new Queue("queue002", true); //隊列持久 } @Bean public Binding binding002() { return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*"); } @Bean public Queue queue003() { return new Queue("queue003", true); //隊列持久 } @Bean public Binding binding003() { //同一個Exchange綁定了2個隊列 return BindingBuilder.bind(queue003()).to(exchange001()).with("mq.*"); } }
再次運行ApplicationTests類中testAdmin()方法,能夠在控制檯中,查看到一個Exchange綁定兩個Queue。spring
RabbitTemplate,即消息模板sql
該類提供了豐富的發送消息方法,包括可靠性投遞消息方法、回調監聽消息接口ConfirmCallback、返回值確認接口ReturnCallback等等。一樣咱們須要進行注入到Spring容器中,而後直接使用apache
在與SPring整合時須要實例化,可是在與SpringBoot整合時,在配置文件裏添加配置便可編程
在RabbitMQConfig類中寫RabbitTemplate配置
@Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); return rabbitTemplate; }
在ApplicationTests測試類中添加測試方法,進行測試。
@Autowired private RabbitTemplate rabbitTemplate; @Test public void testSendMessage() throws Exception { //1 建立消息 MessageProperties messageProperties = new MessageProperties(); messageProperties.getHeaders().put("desc", "信息描述.."); messageProperties.getHeaders().put("type", "自定義消息類型.."); //消息體,與參數 Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties); //轉換併發送 //MessagePostProcessor 在消息發送完畢後再作一次轉換進行再加工,匿名接口,須要重寫方法 rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { System.err.println("------添加額外的設置---------"); message.getMessageProperties().getHeaders().put("desc", "額外修改的信息描述"); message.getMessageProperties().getHeaders().put("attr", "額外新加的屬性"); return message; } }); }
運行前,能夠看到queue001
中是沒有消息的。
運行testSendMessage()方法。並獲取消息。
@Test public void testSendMessage2() throws Exception { //1 建立消息 MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("text/plain"); Message message = new Message("mq 消息1234".getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.abc", message); rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!"); rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!"); }
咱們往topic001中發送了兩條消息,topic002中發送了一條消息。運行testSendMessage2() 接下來再查看下管控臺
。
能夠看到topic001中已經有了三條消息,剛纔發送的消息也還在。GetMessage並非消費消息,而只是獲取消息。
簡單消息監聽容器
注意:
思考
SimpleMessageListenerContainer爲何能夠動態感知配置變動?
配置中添加以下代碼:
@Bean public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); //添加多個隊列進行監聽 container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf()); //當前消費者數量 container.setConcurrentConsumers(1); //最大消費者數量 container.setMaxConcurrentConsumers(5); //設置重回隊列,通常設置false container.setDefaultRequeueRejected(false); //設置自動簽收機制 container.setAcknowledgeMode(AcknowledgeMode.AUTO); //設置listener外露 container.setExposeListenerChannel(true); //消費端標籤生成策略 container.setConsumerTagStrategy(new ConsumerTagStrategy() { @Override public String createConsumerTag(String queue) { //每一個消費端都有本身獨立的標籤 return queue + "_" + UUID.randomUUID().toString(); } }); //消息監聽 container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { String msg = new String(message.getBody()); System.err.println("----------消費者: " + msg); } }); return container; }
運行以前寫的testSendMessage2()方法,查看管控臺中的相關信息以及控制檯打印信息
MessageListenerAdapter 即消息監聽適配器
咱們把以前的消息監聽代碼註釋,能夠不用直接加消息監聽,而是採用MessageListenerAdapter的方式,經過適配器方式1,咱們來學習下如何使用默認的handleMessage,自定義方法名,自定義轉換器。
使用默認handleMessage
//消息監聽 /*container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { String msg = new String(message.getBody()); System.err.println("----------消費者: " + msg); } });*/ MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); container.setMessageListener(adapter);
MessageListenerAdapter 適配器類,熟悉適配器模式的朋友確定瞭解適配器模式的話,能夠經過適配器,適配本身的實現,這裏咱們適配自定義的MessageDelegate
類。咱們就能夠不採用監聽的方式,採用適配的方式。
自定義MessageDelegate
public class MessageDelegate { public void handleMessage(byte[] messageBody) { System.err.println("默認方法, 消息內容:" + new String(messageBody)); } }
MessageDelegate類中,方法名與參數handleMessage(byte[] messageBody)
是固定的。爲何呢?
MessageListenerAdapter源碼分析
咱們來看下MessageListenerAdapter底層代碼
MessageListenerAdapter類中
public static final String ORIGINAL_DEFAULT_LISTENER_METHOD = "handleMessage";
默認方法名就是叫handleMessage。固然也能夠本身去指定設置。經過messageListenerAdapter的代碼咱們能夠看出以下核心屬性
測試一下默認使用的handleMessage方法。啓動ApplicationTests類,運行testSendMessage()測試方法。
自定義方法名
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); container.setMessageListener(adapter);
修改MessageDelegate()類
public class MessageDelegate { public void consumeMessage(byte[] messageBody) { System.err.println("字節數組方法, 消息內容:" + new String(messageBody)); } }
自定義TextMessageConverter轉換器
public class TextMessageConverter implements MessageConverter { @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { return new Message(object.toString().getBytes(), messageProperties); } @Override public Object fromMessage(Message message) throws MessageConversionException { String contentType = message.getMessageProperties().getContentType(); if(null != contentType && contentType.contains("text")) { return new String(message.getBody()); } return message.getBody(); } }
修改RabbitMQConfig類
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); adapter.setMessageConverter(new TextMessageConverter()); container.setMessageListener(adapter);
修改MessageDelegate類
public class MessageDelegate { public void consumeMessage(String messageBody) { System.err.println("字符串方法, 消息內容:" + messageBody); } }
運行testSendMessage4Text()測試方法
@Test public void testSendMessage2() throws Exception { //1 建立消息 MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("text/plain"); Message message = new Message("mq 消息1234".getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.abc", message); rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!"); rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!"); }
注意:在發消息的時候,必須符合本身的轉換器。
打印結果
自定義隊列名稱和方法名稱。
/** * 2 適配器方式: 咱們的隊列名稱 和 方法名稱 也能夠進行一一的匹配 * / MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setMessageConverter(new TextMessageConverter()); Map<String, String> queueOrTagToMethodName = new HashMap<>(); queueOrTagToMethodName.put("queue001", "method1"); queueOrTagToMethodName.put("queue002", "method2"); adapter.setQueueOrTagToMethodName(queueOrTagToMethodName); container.setMessageListener(adapter);
public class MessageDelegate { public void method1(String messageBody) { System.err.println("method1 收到消息內容:" + new String(messageBody)); } public void method2(String messageBody) { System.err.println("method2 收到消息內容:" + new String(messageBody)); } }
運行 測試方法
@Test public void testSendMessage4Text() throws Exception { //1 建立消息 MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("text/plain"); Message message = new Message("mq 消息1234".getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.abc", message); rabbitTemplate.send("topic002", "rabbit.abc", message); }
運行結果:
咱們在進行發送消息的時候,正常狀況下消息體爲二進制的數據方式進行傳輸,若是但願內部幫咱們進行轉換,或者指定自定義的轉換器,就須要用到MessageConverter
其實咱們在介紹MessageListenerAdapter的時候,中間就介紹到了TextMessageConverter轉換器,將二進制數據轉換成字符串數據。
修改RabbitMQConfig類
// 1.1 支持json格式的轉換器 MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); //重點,加入json格式的轉換器 json對應Map對象 Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); adapter.setMessageConverter(jackson2JsonMessageConverter); container.setMessageListener(adapter);
修改MessageDelegate
public class MessageDelegate { //json對應Map對象 public void consumeMessage(Map messageBody) { System.err.println("map方法, 消息內容:" + messageBody); } }
定義一個Order對象
public class Order { private String id; private String name; private String content; ...省略get/set等方法 }
定義測試方法
@Test public void testSendJsonMessage() throws Exception { Order order = new Order(); order.setId("001"); order.setName("消息訂單"); order.setContent("描述信息"); ObjectMapper mapper = new ObjectMapper(); String json = mapper.writeValueAsString(order); System.err.println("order 4 json: " + json); MessageProperties messageProperties = new MessageProperties(); //這裏注意必定要修改contentType爲 application/json messageProperties.setContentType("application/json"); Message message = new Message(json.getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.order", message); }
打印結果:
修改RabbitMQConfig類
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); //須要將javaTypeMapper放入到Jackson2JsonMessageConverter對象中 DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper(); jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper); adapter.setMessageConverter(jackson2JsonMessageConverter); container.setMessageListener(adapter);
修改MessageDelegate
public class MessageDelegate { public void consumeMessage(Order order) { System.err.println("order對象, 消息內容, id: " + order.getId() + ", name: " + order.getName() + ", content: "+ order.getContent()); } }
定義測試方法
@Test public void testSendJavaMessage() throws Exception { Order order = new Order(); order.setId("001"); order.setName("訂單消息"); order.setContent("訂單描述信息"); ObjectMapper mapper = new ObjectMapper(); String json = mapper.writeValueAsString(order); System.err.println("order 4 json: " + json); MessageProperties messageProperties = new MessageProperties(); //這裏注意必定要修改contentType爲 application/json messageProperties.setContentType("application/json"); //添加typeid 與類的全路徑 messageProperties.getHeaders().put("__TypeId__", "com.cp.spring.entity.Order"); Message message = new Message(json.getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.order", message); }
打印結果:
修改RabbitMQConfig類
//1.3 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java對象多映射轉換 MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper(); //key表示標籤 對應一個類的具體全路徑。類和標籤綁定以後,標籤是order,意思就是轉換成order類 Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>(); idClassMapping.put("order", com.cp.spring.entity.Order.class); idClassMapping.put("packaged", com.cp.spring.entity.Packaged.class); javaTypeMapper.setIdClassMapping(idClassMapping); //一層套一層 jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper); adapter.setMessageConverter(jackson2JsonMessageConverter); container.setMessageListener(adapter);
修改MessageDelegate
public class MessageDelegate { //json對應Map對象 public void consumeMessage(Order order) { System.err.println("order對象, 消息內容, id: " + order.getId() + ", name: " + order.getName() + ", content: "+ order.getContent()); } public void consumeMessage(Packaged pack) { System.err.println("package對象, 消息內容, id: " + pack.getId() + ", name: " + pack.getName() + ", content: "+ pack.getDescription()); } }
定義一個Packaged對象
public class Packaged { private String id; private String name; private String description; ...省略get/set等方法 }
定義測試方法
@Test public void testSendMappingMessage() throws Exception { ObjectMapper mapper = new ObjectMapper(); Order order = new Order(); order.setId("001"); order.setName("訂單消息"); order.setContent("訂單描述信息"); String json1 = mapper.writeValueAsString(order); System.err.println("order 4 json: " + json1); MessageProperties messageProperties1 = new MessageProperties(); //這裏注意必定要修改contentType爲 application/json messageProperties1.setContentType("application/json"); //設置的是標籤,而不是全路徑 messageProperties1.getHeaders().put("__TypeId__", "order"); Message message1 = new Message(json1.getBytes(), messageProperties1); rabbitTemplate.send("topic001", "spring.order", message1); Packaged pack = new Packaged(); pack.setId("002"); pack.setName("包裹消息"); pack.setDescription("包裹描述信息"); String json2 = mapper.writeValueAsString(pack); System.err.println("pack 4 json: " + json2); MessageProperties messageProperties2 = new MessageProperties(); //這裏注意必定要修改contentType爲 application/json messageProperties2.setContentType("application/json"); //設置的是標籤,而不是全路徑 messageProperties2.getHeaders().put("__TypeId__", "packaged"); Message message2 = new Message(json2.getBytes(), messageProperties2); rabbitTemplate.send("topic001", "spring.pack", message2); }
打印結果:
在經過單元測試運行testSendMappingMessage()方法時會存在一個問題:委派對象MessageDelegate可能會收不到對象。
由於單元測試spring容器在運行完畢以後就中止,不會等到消費者消費完消息以後再中止,因此須要經過正常啓動springboot項目,能夠看到正常消費消息。
修改RabbitMQConfig類
@Bean public Queue queue_image() { return new Queue("image_queue", true); //隊列持久 } @Bean public Queue queue_pdf() { return new Queue("pdf_queue", true); //隊列持久 } //1.4 ext convert MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); //全局的轉換器:全部小的Converter均可以放到這個大的Converter中 ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter(); TextMessageConverter textConvert = new TextMessageConverter(); //text走文本轉換器 convert.addDelegate("text", textConvert); convert.addDelegate("html/text", textConvert); convert.addDelegate("xml/text", textConvert); convert.addDelegate("text/plain", textConvert); //json走json轉換器 Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter(); convert.addDelegate("json", jsonConvert); convert.addDelegate("application/json", jsonConvert); //圖片走圖片轉換器 ImageMessageConverter imageConverter = new ImageMessageConverter(); convert.addDelegate("image/png", imageConverter); convert.addDelegate("image", imageConverter); //pdf走pdf轉換器 PDFMessageConverter pdfConverter = new PDFMessageConverter(); convert.addDelegate("application/pdf", pdfConverter); adapter.setMessageConverter(convert); container.setMessageListener(adapter);
修改MessageDelegate
public class MessageDelegate { public void handleMessage(byte[] messageBody) { System.err.println("默認方法, 消息內容:" + new String(messageBody)); } public void consumeMessage(byte[] messageBody) { System.err.println("字節數組方法, 消息內容:" + new String(messageBody)); } public void consumeMessage(String messageBody) { System.err.println("字符串方法, 消息內容:" + messageBody); } public void method1(String messageBody) { System.err.println("method1 收到消息內容:" + new String(messageBody)); } public void method2(String messageBody) { System.err.println("method2 收到消息內容:" + new String(messageBody)); } //json對應Map對象 public void consumeMessage(Map messageBody) { System.err.println("map方法, 消息內容:" + messageBody); } public void consumeMessage(Order order) { System.err.println("order對象, 消息內容, id: " + order.getId() + ", name: " + order.getName() + ", content: "+ order.getContent()); } public void consumeMessage(Packaged pack) { System.err.println("package對象, 消息內容, id: " + pack.getId() + ", name: " + pack.getName() + ", content: "+ pack.getDescription()); } public void consumeMessage(File file) { System.err.println("文件對象 方法, 消息內容:" + file.getName()); } }
添加PDFMessageConverter
public class PDFMessageConverter implements MessageConverter { @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { throw new MessageConversionException(" convert error ! "); } @Override public Object fromMessage(Message message) throws MessageConversionException { System.err.println("-----------PDF MessageConverter----------"); byte[] body = message.getBody(); String fileName = UUID.randomUUID().toString(); String path = "d:/010_test/" + fileName + ".pdf"; File f = new File(path); try { Files.copy(new ByteArrayInputStream(body), f.toPath()); } catch (IOException e) { e.printStackTrace(); } return f; } }
添加ImageMessageConverter
public class ImageMessageConverter implements MessageConverter { @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { throw new MessageConversionException(" convert error ! "); } @Override public Object fromMessage(Message message) throws MessageConversionException { System.err.println("-----------Image MessageConverter----------"); Object _extName = message.getMessageProperties().getHeaders().get("extName"); String extName = _extName == null ? "png" : _extName.toString(); byte[] body = message.getBody(); String fileName = UUID.randomUUID().toString(); //將接受到的圖片放到該位置 String path = "d:/010_test/" + fileName + "." + extName; File f = new File(path); try { Files.copy(new ByteArrayInputStream(body), f.toPath()); } catch (IOException e) { e.printStackTrace(); } return f; } }
定義測試方法
@Test public void testSendExtConverterMessage() throws Exception { // byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "picture.png")); // MessageProperties messageProperties = new MessageProperties(); // messageProperties.setContentType("image/png"); // messageProperties.getHeaders().put("extName", "png"); // Message message = new Message(body, messageProperties); // rabbitTemplate.send("", "image_queue", message); byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "mysql.pdf")); MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("application/pdf"); Message message = new Message(body, messageProperties); rabbitTemplate.send("", "pdf_queue", message); }
能夠本身測試下圖片和pdf的保存。
源碼地址:https://gitee.com/573059382/rabbitmq-demos
歡迎關注我的微信公衆號:Coder編程
獲取最新原創技術文章和免費學習資料,更有大量精品思惟導圖、面試資料、PMP備考資料等你來領,方便你隨時隨地學習技術知識!
新建了一個qq羣:315211365,歡迎你們進羣交流一塊兒學習。謝謝了!也能夠介紹給身邊有須要的朋友。
文章收錄至
Github: https://github.com/CoderMerlin/coder-programming
Gitee: https://gitee.com/573059382/coder-programming
歡迎關注並star~
參考文章:
《RabbitMQ消息中間件精講》
推薦文章:
消息中間件——RabbitMQ(六)理解Exchange交換機核心概念!