消息轉換器java
package com.wyg.rabbitmq.springamqp; import com.wyg.rabbitmq.springamqp.convert.MyPngMesssageConvert; import com.wyg.rabbitmq.springamqp.convert.MyPDFMessageConvert; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.amqp.support.ConsumerTagStrategy; import org.springframework.amqp.support.converter.ContentTypeDelegatingMessageConverter; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import com.wyg.rabbitmq.springamqp.convert.MyTextMessageConvert; /** * RabbitConfig * * @author wyg0405@gmail.com * @date 2019-11-25 15:11 * @since JDK1.8 * @version V1.0 */ @Configuration public class RabbitConfig { @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(); cachingConnectionFactory.setAddresses("localhost:5672"); cachingConnectionFactory.setUsername("guest"); cachingConnectionFactory.setPassword("guest"); cachingConnectionFactory.setVirtualHost("/"); return cachingConnectionFactory; } /** * RabbitAdmin注入容器 * * @param connectionFactory * @return * @throws @author * wyg0405@gmail.com * @date 2019/11/25 16:35 */ @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); /* * autoStartup 必需要設爲 true ,不然Spring容器不會加載RabbitAdmin類 */ rabbitAdmin.setAutoStartup(true); return rabbitAdmin; } /** * RabbitTemplate注入 * * @param connectionFactory * @return * @throws @author * wyg0405@gmail.com * @date 2019/11/25 16:37 */ public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); return rabbitTemplate; } /** * SimpleMessageListenerContainer注入 * * @param connectionFactory * @return * @throws @author * wyg0405@gmail.com * @date 2019/11/25 17:16 */ @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); // 監聽多個queue container.addQueueNames("test01"); //container.addQueueNames("test01", "test02", "test03"); // 設置當前消費者數量 container.setConcurrentConsumers(1); // 設置最大的消費者數量 container.setMaxConcurrentConsumers(5); // 設置不要重回隊列 container.setDefaultRequeueRejected(false); // 設置自動簽收 container.setAcknowledgeMode(AcknowledgeMode.AUTO); // 設置消費端tag策略 container.setConsumerTagStrategy(new ConsumerTagStrategy() { @Override public String createConsumerTag(String queue) { return queue + "_" + System.currentTimeMillis(); } }); // 方式二,使用適配器 MessageListenerAdapter adapter = new MessageListenerAdapter(new MyMessageListenerDelegate()); // 自定處理消息方法,不設置默認爲handleMessage adapter.setDefaultListenerMethod("consumeMsg"); // 自定義消息轉換器 // adapter.setMessageConverter(new MyTextMessageConvert()); // 1.自定義消息轉換器 Json,實際上json對應的是Map類型參數 /* Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter(); adapter.setMessageConverter(jsonMessageConverter);*/ // 2.Java對象 /*Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter(); DefaultJackson2JavaTypeMapper defaultJackson2JavaTypeMapper = new DefaultJackson2JavaTypeMapper(); defaultJackson2JavaTypeMapper.setTrustedPackages("*"); jsonMessageConverter.setJavaTypeMapper(defaultJackson2JavaTypeMapper); adapter.setMessageConverter(jsonMessageConverter);*/ // 3.Java對象多映射關係 /*Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter(); DefaultJackson2JavaTypeMapper defaultJackson2JavaTypeMapper = new DefaultJackson2JavaTypeMapper(); Map<String, Class<?>> map = new HashMap<>(); map.put("order", Order.class); map.put("user", User.class); defaultJackson2JavaTypeMapper.setTrustedPackages("*"); defaultJackson2JavaTypeMapper.setIdClassMapping(map); jsonMessageConverter.setJavaTypeMapper(defaultJackson2JavaTypeMapper); adapter.setMessageConverter(jsonMessageConverter);*/ // 4.全局轉換器 ContentTypeDelegatingMessageConverter converter = new ContentTypeDelegatingMessageConverter(); MyTextMessageConvert textMessageConvert = new MyTextMessageConvert(); converter.addDelegate("text", textMessageConvert); converter.addDelegate("appliction/text", textMessageConvert); converter.addDelegate("text/plain", textMessageConvert); Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter("*"); converter.addDelegate("json", jsonMessageConverter); converter.addDelegate("application/json", jsonMessageConverter); // png converter.addDelegate("png", new MyPngMesssageConvert()); converter.addDelegate("image/png", new MyPngMesssageConvert()); // pdf converter.addDelegate("pdf", new MyPDFMessageConvert()); adapter.setMessageConverter(converter); container.setMessageListener(adapter); return container; } }
package com.wyg.rabbitmq.springamqp; import java.io.File; import java.util.Map; import com.wyg.rabbitmq.springamqp.convert.Order; import com.wyg.rabbitmq.springamqp.convert.User; /** * * @author wyg0405@gmail.com * @date 2019-11-29 14:23 * @since JDK1.8 * @version V1.0 */ public class MyMessageListenerDelegate { // 默認方法 public void handleMessage(byte[] body) { System.out.println("默認處理方法,message:" + new String(body)); } // 自定義處理方法 public void consumeMsg(byte[] msgBody) { System.out.println("自定義處理方法,message:" + new String(msgBody)); } // 自定義處理String類型消息方法 public void consumeMsg(String msgBody) { System.out.println("自定義處理String消息方法,message:" + new String(msgBody)); } public void method01(String msgBody) { System.out.println("method01處理String消息方法,message:" + new String(msgBody)); } public void method02(String msgBody) { System.out.println("method02處理String消息方法,message:" + new String(msgBody)); } public void method03(String msgBody) { System.out.println("method03處理String消息方法,message:" + new String(msgBody)); } // json 對應map類型參數 public void consumeMsg(Map msgBody) { System.out.println("自定義json處理方法,message:" + msgBody.toString()); } // Java對象 對應Order類型參數 public void consumeMsg(Order msgBody) { System.out.println("自定義Order對象處理方法,message:" + msgBody.toString()); } // Java對象 對應User類型參數 public void consumeMsg(User user) { System.out.println("自定義User對象處理方法,message:" + user.toString()); } // File類型的參數 public void consumeMsg(File file) { System.out.println("自定義File對象處理方法,message:" + file.getPath()); } }
package com.wyg.rabbitmq.springamqp.convert; import java.io.Serializable; /** * * @author wyg0405@gmail.com * @date 2019-12-01 15:47 * @since JDK1.8 * @version V1.0 */ public class Order implements Serializable { private static final long serialVersionUID = -4975357142857575433L; private String id; private String content; private double price; public Order() {} public Order(String id, String content, double price) { this.id = id; this.content = content; this.price = price; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } @Override public String toString() { final StringBuilder sb = new StringBuilder("{"); sb.append("\"id\":\"").append(id).append('\"'); sb.append(",\"content\":\"").append(content).append('\"'); sb.append(",\"price\":").append(price); sb.append('}'); return sb.toString(); } }
package com.wyg.rabbitmq.springamqp.convert; import java.io.Serializable; /** * * @author wyg0405@gmail.com * @date 2019-12-01 17:20 * @since JDK1.8 * @version V1.0 */ public class User implements Serializable { private static final long serialVersionUID = 2959945432292661959L; private String id; private String name; private int age; public User() {} public User(String id, String name, int age) { this.id = id; this.name = name; this.age = age; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } @Override public String toString() { final StringBuilder sb = new StringBuilder("{"); sb.append("\"id\":\"").append(id).append('\"'); sb.append(",\"name\":\"").append(name).append('\"'); sb.append(",\"age\":").append(age); sb.append('}'); return sb.toString(); } }
自定義文本消息轉換器spring
package com.wyg.rabbitmq.springamqp.convert; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.MessageConversionException; import org.springframework.amqp.support.converter.MessageConverter; /** * 自定義消息轉換器 * * @author wyg0405@gmail.com * @date 2019-12-01 15:11 * @since JDK1.8 * @version V1.0 */ public class MyTextMessageConvert implements MessageConverter { @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { return new Message(object.toString().getBytes(), messageProperties); } @Override public Object fromMessage(Message message) throws MessageConversionException { String contentType = message.getMessageProperties().getContentType(); if (null != contentType && contentType.contains("text")) { // 轉換爲String return new String(message.getBody()); } return message.getBody(); } }
自定義png轉換器json
package com.wyg.rabbitmq.springamqp.convert; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.MessageConversionException; import org.springframework.amqp.support.converter.MessageConverter; import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; import java.nio.file.Files; /** * * @author wyg0405@gmail.com * @date 2019-12-01 19:32 * @since JDK1.8 * @version V1.0 */ public class MyPngMesssageConvert implements MessageConverter { @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { return null; } @Override public Object fromMessage(Message message) throws MessageConversionException { System.out.println("---------PNGMessageConvert---------"); String contentType = message.getMessageProperties().getContentType(); String extName = (String)message.getMessageProperties().getHeaders().get("extName"); String filename = System.currentTimeMillis() + extName; // 將文件流讀到本地 String filePath="E:\\"+filename; File file= new File(filePath); try { Files.copy(new ByteArrayInputStream(message.getBody()),file.toPath()); } catch (IOException e) { e.printStackTrace(); } return file; } }
自定義pdf轉換器app
package com.wyg.rabbitmq.springamqp.convert; import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; import java.nio.file.Files; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.MessageConversionException; import org.springframework.amqp.support.converter.MessageConverter; /** * * @author wyg0405@gmail.com * @date 2019-12-01 19:34 * @since JDK1.8 * @version V1.0 */ public class MyPDFMessageConvert implements MessageConverter { @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { return null; } @Override public Object fromMessage(Message message) throws MessageConversionException { System.out.println("---------PDFMessageConvert---------"); String contentType = message.getMessageProperties().getContentType(); String extName = (String)message.getMessageProperties().getHeaders().get("extName"); String filename = System.currentTimeMillis() + extName; // 將文件流讀到本地 String filePath = "E:\\" + filename; File file = new File(filePath); try { Files.copy(new ByteArrayInputStream(message.getBody()), file.toPath()); } catch (IOException e) { e.printStackTrace(); } return file; } }
單元測試ide
package com.wyg.rabbitmq.springamqp; import java.io.*; import java.lang.reflect.Field; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.json.JsonMapper; import com.wyg.rabbitmq.springamqp.convert.Order; import com.wyg.rabbitmq.springamqp.convert.User; @RunWith(SpringRunner.class) @SpringBootTest public class RabbitConfigTest { @Autowired RabbitAdmin rabbitAdmin; @Autowired private RabbitTemplate rabbitTemplate; @Autowired private SimpleMessageListenerContainer simpleMessageListenerContainer; @Test public void testSimpleMessageListenerContainerSendMsg() { // 分別向 隊列 "test01", "test02", "test03" 發消息,"test01", "test02", // "test03"與springdemo.direct已經綁定,routingKey都爲orderRoutingKey for (int i = 0; i < 3; i++) { rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", ("第" + i + "條消息").getBytes()); } } @Test public void sendTextMsg() { // 分別向 隊列 "test01", "test02", "test03" 發消息,"test01", "test02", // "test03"與springdemo.direct已經綁定,routingKey都爲orderRoutingKey for (int i = 0; i < 3; i++) { MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("application/text"); String body = "第" + i + "條消息"; Message msg = new Message(body.getBytes(), messageProperties); rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", msg); } } @Test public void sendJsonMsg() throws JsonProcessingException { // 分別向 隊列 "test01", "test02", "test03" 發消息,"test01", "test02", // "test03"與springdemo.direct已經綁定,routingKey都爲orderRoutingKey Order order = new Order("1", "吃的喝的", 19); ObjectMapper mapper = new JsonMapper(); String json = mapper.writeValueAsString(order); System.out.println("order to json:" + json); MessageProperties messageProperties = new MessageProperties(); // 這裏 ContentType 必定要寫成 "application/json" messageProperties.setContentType("application/json"); Message msg = new Message(json.getBytes(), messageProperties); rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", msg); } @Test public void sendJavaTypeMsg() throws JsonProcessingException { // 分別向 隊列 "test01", "test02", "test03" 發消息,"test01", "test02", // "test03"與springdemo.direct已經綁定,routingKey都爲orderRoutingKey Order order = new Order("1", "吃的喝的", 19); ObjectMapper mapper = new JsonMapper(); String order2Json = mapper.writeValueAsString(order); System.out.println("order to json:" + order2Json); MessageProperties messageProperties = new MessageProperties(); // 這裏 ContentType 必定要寫成 "application/json" messageProperties.setContentType("application/json"); // __TypeId__ 爲固定形式 messageProperties.getHeaders().put("__TypeId__", "com.wyg.rabbitmq.springamqp.convert.Order"); Message msg = new Message(order2Json.getBytes(), messageProperties); rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", msg); } @Test public void sendJavaTypeMappingMsg() throws JsonProcessingException { // 分別向 隊列 "test01", "test02", "test03" 發消息,"test01", "test02", // "test03"與springdemo.direct已經綁定,routingKey都爲orderRoutingKey // 發送Order類型 Order order = new Order("1", "吃的喝的", 19); ObjectMapper mapper = new JsonMapper(); String order2Json = mapper.writeValueAsString(order); System.out.println("order to json:" + order2Json); MessageProperties messageProperties = new MessageProperties(); // 這裏 ContentType 必定要寫成 "application/json" messageProperties.setContentType("application/json"); // __TypeId__ 爲固定形式, order標籤 messageProperties.getHeaders().put("__TypeId__", "order"); Message msg = new Message(order2Json.getBytes(), messageProperties); rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", msg); // 發送User類型 User user = new User("111", "jack", 20); String user2Json = mapper.writeValueAsString(user); System.out.println("User to Json:" + user2Json); MessageProperties messageProperties2 = new MessageProperties(); // __TypeId__ 爲固定形式, user標籤 messageProperties2.getHeaders().put("__TypeId__", "user"); Message usermsg = new Message(user2Json.getBytes(), messageProperties2); rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", usermsg); } @Test public void sendPNGMsg() throws IOException { // 分別向 隊列 "test01", "test02", "test03" 發消息,"test01", "test02", // "test03"與springdemo.direct已經綁定,routingKey都爲orderRoutingKey String filePath="C:\\Users\\wyg04\\Desktop\\image\\green.png"; byte[] allBytes = Files.readAllBytes(Paths.get(filePath)); MessageProperties messageProperties = new MessageProperties(); // 這裏 ContentType 必定要寫成 "image/png"或"png" messageProperties.setContentType("png"); // 拓展名 messageProperties.getHeaders().put("extName", ".png"); Message msg = new Message(allBytes, messageProperties); rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", msg); } @Test public void sendPDFMsg() throws IOException { // 分別向 隊列 "test01", "test02", "test03" 發消息,"test01", "test02", // "test03"與springdemo.direct已經綁定,routingKey都爲orderRoutingKey String filePath="C:\\Users\\wyg04\\Desktop\\深度工做.pdf"; byte[] allBytes = Files.readAllBytes(Paths.get(filePath)); MessageProperties messageProperties = new MessageProperties(); // 這裏 ContentType 必定要寫成 "pdf" messageProperties.setContentType("pdf"); // 拓展名 messageProperties.getHeaders().put("extName", ".pdf"); Message msg = new Message(allBytes, messageProperties); rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", msg); } }