MessageConverter 即消息轉換器html
咱們在進行發送消息的時候,正常狀況下消息體爲二進制的數據方式進行傳輸,若是但願內部幫咱們進行轉換,或者指定自定義的轉換器,就須要用到 MessageConverter 了。java
自定義經常使用轉換器: MessageConverter, 通常來講都須要實現這個接口,而後重寫如下兩個方法。git
toMessage: java 對象轉換爲 Message
fromMessage: Message 對象轉換爲 Java 對象
複製代碼
轉換器類別:github
json 轉換器: jackson2JsonMessageConverter 能夠進行 java 對象的轉換功能
DefaultJackson2JavaTypeMapper 映射器:能夠進行Java對象的映射關係
自定義二進制轉換器: 好比圖片類型、PDF、PPT、流媒體
複製代碼
代碼地址: https://github.com/hmilyos/rabbitmqdemo.git rabbitmq-api 項目下
複製代碼
先建立一個 Java 實體 Orderspring
public class Order {
private String id;
private String name;
private String content;
public Order() {
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public Order(String id, String name, String content) {
this.id = id;
this.name = name;
this.content = content;
}
@Override
public String toString() {
return "Order{" +
"id='" + id + '\'' + ", name='" + name + '\'' + ", content='" + content + '\'' +
'}';
}
}
複製代碼
接着在上一篇的 RabbitMQConfig 裏面 配置支持 json 格式的轉換器json
@Bean //connectionFactory 也是要和最上面方法名保持一致
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue001(), queue002(), queue003()); //監聽的隊列
container.setConcurrentConsumers(1); //當前的消費者數量
container.setMaxConcurrentConsumers(5); // 最大的消費者數量
container.setDefaultRequeueRejected(false); //是否重回隊列
container.setAcknowledgeMode(AcknowledgeMode.AUTO); //簽收模式
container.setExposeListenerChannel(true);
container.setConsumerTagStrategy(new ConsumerTagStrategy() { //消費端的標籤策略
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
//3 支持json格式的轉換器
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
return container;
}
複製代碼
在委派 adapter 裏面聲明入參爲 Map 的消費方法 api
public void consumeMessage(Map messageBody) {
log.info("map方法, 消息內容:" + messageBody);
}
複製代碼
功能就完成了,接着寫個單元測試,注意 ContentType 必定要是 json !!安全
@Test
public void testSendJsonMessage() throws Exception {
Order order = new Order();
order.setId("001");
order.setName("test1001消息訂單");
order.setContent("test1001訂單描述信息");
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(order);
log.info("order 4 json: " + json);
MessageProperties messageProperties = new MessageProperties();
//這裏注意必定要修改contentType爲 application/json
messageProperties.setContentType("application/json");
Message message = new Message(json.getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.order", message);
}
複製代碼
運行單元測試,消息就被消費了 bash
messageContainer 修改爲以下的app
@Bean //connectionFactory 也是要和最上面方法名保持一致
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue001(), queue002(), queue003()); //監聽的隊列
container.setConcurrentConsumers(1); //當前的消費者數量
container.setMaxConcurrentConsumers(5); // 最大的消費者數量
container.setDefaultRequeueRejected(false); //是否重回隊列
container.setAcknowledgeMode(AcknowledgeMode.AUTO); //簽收模式
container.setExposeListenerChannel(true);
container.setConsumerTagStrategy(new ConsumerTagStrategy() { //消費端的標籤策略
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
// 4 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java對象轉換
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
return container;
}
複製代碼
public void consumeMessage(Order order) {
log.info("order對象, 消息內容, id: " + order.getId() +
", name: " + order.getName() +
", content: "+ order.getContent());
}
複製代碼
@Test
public void testSendJavaMessage() throws Exception {
Order order = new Order();
order.setId("1002");
order.setName("test1002消息訂單");
order.setContent("test1002訂單描述信息");
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(order);
log.info("order java: " + json);
MessageProperties messageProperties = new MessageProperties();
//這裏注意必定要修改contentType爲 application/json
messageProperties.setContentType("application/json");
//注意這裏要寫你的實體類路徑
messageProperties.getHeaders().put("__TypeId__", "com.hmily.rabbitmqapi.spring.domain.Order");
Message message = new Message(json.getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.order", message);
}
複製代碼
運行單元測試
報錯提示:如出現 If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
的異常提示,這是由於 Jackson 在把字節流轉換爲 Java 對象時發生安全提醒,粗暴的解決方式以下:
新建一個 EnableAllJackson2JavaTypeMapper 類,其繼承 DefaultJackson2JavaTypeMapper 這個類,而後在這裏配置容許轉換哪些對象,我這是是直接容許全部。
而後在 剛纔的 SimpleMessageListenerContainer
裏面, new EnableAllJackson2JavaTypeMapper()
改成 new EnableAllJackson2JavaTypeMapper()
,便可。
public class Packaged {
private String id;
private String name;
private String description;
public Packaged() {
}
public Packaged(String id, String name, String description) {
this.id = id;
this.name = name;
this.description = description;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
@Override
public String toString() {
return "Packaged{" +
"id='" + id + '\'' + ", name='" + name + '\'' + ", description='" + description + '\'' +
'}';
}
}
複製代碼
messageContainer 修改爲以下的
@Bean //connectionFactory 也是要和最上面方法名保持一致
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue001(), queue002(), queue003()); //監聽的隊列
container.setConcurrentConsumers(1); //當前的消費者數量
container.setMaxConcurrentConsumers(5); // 最大的消費者數量
container.setDefaultRequeueRejected(false); //是否重回隊列
container.setAcknowledgeMode(AcknowledgeMode.AUTO); //簽收模式
container.setExposeListenerChannel(true);
container.setConsumerTagStrategy(new ConsumerTagStrategy() { //消費端的標籤策略
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
// 5 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java對象多映射轉換
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
idClassMapping.put("order", com.hmily.rabbitmqapi.spring.domain.Order.class);
idClassMapping.put("packaged", com.hmily.rabbitmqapi.spring.domain.Packaged.class);
javaTypeMapper.setIdClassMapping(idClassMapping);
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
return container;
}
複製代碼
public void consumeMessage(Packaged pack) {
log.info("package對象, 消息內容, id: " + pack.getId() +
", name: " + pack.getName() +
", content: "+ pack.getDescription());
}
複製代碼
@Test
public void testSendMappingMessage() throws Exception {
ObjectMapper mapper = new ObjectMapper();
Order order = new Order();
order.setId("1001");
order.setName("1001訂單消息");
order.setContent("1001訂單描述信息");
String json1 = mapper.writeValueAsString(order);
log.info("order java: " + json1);
MessageProperties messageProperties1 = new MessageProperties();
//這裏注意必定要修改contentType爲 application/json
messageProperties1.setContentType("application/json");
messageProperties1.getHeaders().put("__TypeId__", "order");
Message message1 = new Message(json1.getBytes(), messageProperties1);
rabbitTemplate.send("topic001", "spring.order", message1);
Packaged pack = new Packaged();
pack.setId("1002");
pack.setName("1002包裹消息");
pack.setDescription("1002包裹描述信息");
String json2 = mapper.writeValueAsString(pack);
log.info("pack java: " + json2);
MessageProperties messageProperties2 = new MessageProperties();
//這裏注意必定要修改contentType爲 application/json
messageProperties2.setContentType("application/json");
messageProperties2.getHeaders().put("__TypeId__", "packaged");
Message message2 = new Message(json2.getBytes(), messageProperties2);
rabbitTemplate.send("topic001", "spring.pack", message2);
}
複製代碼
注意這裏面就不是寫類的路徑了,而是寫剛纔起的別名了
由於junitTest的關係,它發送完就關閉了,還有一條消息沒被消費
先寫轉換處理
public class ImageMessageConverter implements MessageConverter {
private static final Logger log = LoggerFactory.getLogger(ImageMessageConverter.class);
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
throw new MessageConversionException(" convert error ! ");
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
log.info("-----------Image MessageConverter----------");
Object _extName = message.getMessageProperties().getHeaders().get("extName");
String extName = _extName == null ? "png" : _extName.toString();
byte[] body = message.getBody();
String fileName = UUID.randomUUID().toString();
String path = "G:/test/file/new/" + fileName + "." + extName;
File f = new File(path);
try {
Files.copy(new ByteArrayInputStream(body), f.toPath());
} catch (IOException e) {
e.printStackTrace();
}
return f;
}
}
複製代碼
消息接收
public void consumeMessage(File file) {
log.info("文件對象 方法, 消息內容:" + file.getName());
}
複製代碼
聲明一個全局的轉換器
@Bean //connectionFactory 也是要和最上面方法名保持一致
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue001(), queue002(), queue003()); //監聽的隊列
container.setConcurrentConsumers(1); //當前的消費者數量
container.setMaxConcurrentConsumers(5); // 最大的消費者數量
container.setDefaultRequeueRejected(false); //是否重回隊列
container.setAcknowledgeMode(AcknowledgeMode.AUTO); //簽收模式
container.setExposeListenerChannel(true);
container.setConsumerTagStrategy(new ConsumerTagStrategy() { //消費端的標籤策略
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
// 6 ext convert
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
//全局的轉換器:
ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();
TextMessageConverter textConvert = new TextMessageConverter();
convert.addDelegate("text", textConvert);
convert.addDelegate("html/text", textConvert);
convert.addDelegate("xml/text", textConvert);
convert.addDelegate("text/plain", textConvert);
Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter();
convert.addDelegate("json", jsonConvert);
convert.addDelegate("application/json", jsonConvert);
ImageMessageConverter imageConverter = new ImageMessageConverter();
convert.addDelegate("image/png", imageConverter);
convert.addDelegate("image", imageConverter);
PDFMessageConverter pdfConverter = new PDFMessageConverter();
convert.addDelegate("application/pdf", pdfConverter);
adapter.setMessageConverter(convert);
container.setMessageListener(adapter);
return container;
}
複製代碼
編寫單元測試來測試圖片
@Test
public void testSendExtConverterMessage() throws Exception {
byte[] body = Files.readAllBytes(Paths.get("G:/test/file", "1001.png"));
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("image/png");
messageProperties.getHeaders().put("extName", "png");
Message message = new Message(body, messageProperties);
rabbitTemplate.send("", "image_queue", message);
}
複製代碼
原圖片的本地文件夾路徑
public class PDFMessageConverter implements MessageConverter {
private static final Logger log = LoggerFactory.getLogger(PDFMessageConverter.class);
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
throw new MessageConversionException(" convert error ! ");
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
log.info("-----------PDF MessageConverter----------");
byte[] body = message.getBody();
String fileName = UUID.randomUUID().toString();
String path = "G:/test/file/new/" + fileName + ".pdf";
File f = new File(path);
try {
Files.copy(new ByteArrayInputStream(body), f.toPath());
} catch (IOException e) {
e.printStackTrace();
}
return f;
}
}
複製代碼
@Test
public void testSendExtConverterMessage() throws Exception {
// byte[] body = Files.readAllBytes(Paths.get("G:/test/file", "1001.png"));
// MessageProperties messageProperties = new MessageProperties();
// messageProperties.setContentType("image/png");
// messageProperties.getHeaders().put("extName", "png");
// Message message = new Message(body, messageProperties);
// rabbitTemplate.send("", "image_queue", message);
byte[] body = Files.readAllBytes(Paths.get("G:/test/file", "AliTech101_RD.pdf"));
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/pdf");
Message message = new Message(body, messageProperties);
rabbitTemplate.send("", "pdf_queue", message);
}
複製代碼