SpringAMQP 消息轉換器 - MessageConverter

MessageConverter 即消息轉換器html

咱們在進行發送消息的時候,正常狀況下消息體爲二進制的數據方式進行傳輸,若是但願內部幫咱們進行轉換,或者指定自定義的轉換器,就須要用到 MessageConverter 了。java

自定義經常使用轉換器: MessageConverter, 通常來講都須要實現這個接口,而後重寫如下兩個方法。git

toMessage:  java 對象轉換爲 Message

fromMessage:  Message 對象轉換爲 Java 對象
複製代碼

轉換器類別:github

json 轉換器: jackson2JsonMessageConverter  能夠進行 java 對象的轉換功能

DefaultJackson2JavaTypeMapper 映射器:能夠進行Java對象的映射關係

自定義二進制轉換器: 好比圖片類型、PDF、PPT、流媒體
複製代碼

代碼示例:

代碼地址:  https://github.com/hmilyos/rabbitmqdemo.git  rabbitmq-api 項目下
複製代碼

1. json 轉換器

先建立一個 Java 實體 Orderspring

public class Order {

    private String id;

    private String name;

    private String content;

    public Order() {
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    public Order(String id, String name, String content) {
        this.id = id;
        this.name = name;
        this.content = content;
    }

    @Override
    public String toString() {
        return "Order{" +
                "id='" + id + '\'' + ", name='" + name + '\'' + ", content='" + content + '\'' +
                '}';
    }
}
複製代碼

接着在上一篇的 RabbitMQConfig 裏面 配置支持 json 格式的轉換器json

@Bean   //connectionFactory 也是要和最上面方法名保持一致
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueues(queue001(), queue002(), queue003());    //監聽的隊列
        container.setConcurrentConsumers(1);    //當前的消費者數量
        container.setMaxConcurrentConsumers(5); //  最大的消費者數量
        container.setDefaultRequeueRejected(false); //是否重回隊列
        container.setAcknowledgeMode(AcknowledgeMode.AUTO); //簽收模式
        container.setExposeListenerChannel(true);
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {    //消費端的標籤策略
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + UUID.randomUUID().toString();
            }
        });

        //3  支持json格式的轉換器
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        adapter.setDefaultListenerMethod("consumeMessage");
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        adapter.setMessageConverter(jackson2JsonMessageConverter);
        container.setMessageListener(adapter);
        return container;
    }
複製代碼

在委派 adapter 裏面聲明入參爲 Map 的消費方法 api

public void consumeMessage(Map messageBody) {
        log.info("map方法, 消息內容:" + messageBody);
    }
複製代碼

功能就完成了,接着寫個單元測試,注意 ContentType 必定要是 json !!安全

@Test
    public void testSendJsonMessage() throws Exception {

        Order order = new Order();
        order.setId("001");
        order.setName("test1001消息訂單");
        order.setContent("test1001訂單描述信息");
        ObjectMapper mapper = new ObjectMapper();
        String json = mapper.writeValueAsString(order);
        log.info("order 4 json: " + json);

        MessageProperties messageProperties = new MessageProperties();
        //這裏注意必定要修改contentType爲 application/json
        messageProperties.setContentType("application/json");
        Message message = new Message(json.getBytes(), messageProperties);

        rabbitTemplate.send("topic001", "spring.order", message);
    }
複製代碼

運行單元測試,消息就被消費了 bash

2. DefaultJackson2JavaTypeMapper 轉換 Java 對象

messageContainer 修改爲以下的app

@Bean   //connectionFactory 也是要和最上面方法名保持一致
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueues(queue001(), queue002(), queue003());    //監聽的隊列
        container.setConcurrentConsumers(1);    //當前的消費者數量
        container.setMaxConcurrentConsumers(5); //  最大的消費者數量
        container.setDefaultRequeueRejected(false); //是否重回隊列
        container.setAcknowledgeMode(AcknowledgeMode.AUTO); //簽收模式
        container.setExposeListenerChannel(true);
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {    //消費端的標籤策略
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + UUID.randomUUID().toString();
            }
        });

        // 4  DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java對象轉換
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        adapter.setDefaultListenerMethod("consumeMessage");
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
        jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
        adapter.setMessageConverter(jackson2JsonMessageConverter);
        container.setMessageListener(adapter);
        return container;
    }
複製代碼

public void consumeMessage(Order order) {
        log.info("order對象, 消息內容, id: " + order.getId() +
                ", name: " + order.getName() +
                ", content: "+ order.getContent());
    }
複製代碼
@Test
    public void testSendJavaMessage() throws Exception {
        Order order = new Order();
        order.setId("1002");
        order.setName("test1002消息訂單");
        order.setContent("test1002訂單描述信息");
        ObjectMapper mapper = new ObjectMapper();
        String json = mapper.writeValueAsString(order);
        log.info("order java: " + json);

        MessageProperties messageProperties = new MessageProperties();
        //這裏注意必定要修改contentType爲 application/json
        messageProperties.setContentType("application/json");
        //注意這裏要寫你的實體類路徑
        messageProperties.getHeaders().put("__TypeId__", "com.hmily.rabbitmqapi.spring.domain.Order");
        Message message = new Message(json.getBytes(), messageProperties);

        rabbitTemplate.send("topic001", "spring.order", message);
    }
複製代碼

運行單元測試

報錯提示:如出現 If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*). 的異常提示,這是由於 Jackson 在把字節流轉換爲 Java 對象時發生安全提醒,粗暴的解決方式以下:

新建一個 EnableAllJackson2JavaTypeMapper 類,其繼承 DefaultJackson2JavaTypeMapper 這個類,而後在這裏配置容許轉換哪些對象,我這是是直接容許全部。

而後在 剛纔的 SimpleMessageListenerContainer 裏面, new EnableAllJackson2JavaTypeMapper() 改成 new EnableAllJackson2JavaTypeMapper() ,便可。

再改進一下代碼,轉換 2 個 Java 對象

public class Packaged {
    private String id;

    private String name;

    private String description;

    public Packaged() {
    }

    public Packaged(String id, String name, String description) {
        this.id = id;
        this.name = name;
        this.description = description;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getDescription() {
        return description;
    }

    public void setDescription(String description) {
        this.description = description;
    }

    @Override
    public String toString() {
        return "Packaged{" +
                "id='" + id + '\'' + ", name='" + name + '\'' + ", description='" + description + '\'' +
                '}';
    }
}
複製代碼

messageContainer 修改爲以下的

@Bean   //connectionFactory 也是要和最上面方法名保持一致
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueues(queue001(), queue002(), queue003());    //監聽的隊列
        container.setConcurrentConsumers(1);    //當前的消費者數量
        container.setMaxConcurrentConsumers(5); //  最大的消費者數量
        container.setDefaultRequeueRejected(false); //是否重回隊列
        container.setAcknowledgeMode(AcknowledgeMode.AUTO); //簽收模式
        container.setExposeListenerChannel(true);
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {    //消費端的標籤策略
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + UUID.randomUUID().toString();
            }
        });

        // 5 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java對象多映射轉換
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        adapter.setDefaultListenerMethod("consumeMessage");
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();

        Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
        idClassMapping.put("order", com.hmily.rabbitmqapi.spring.domain.Order.class);
        idClassMapping.put("packaged", com.hmily.rabbitmqapi.spring.domain.Packaged.class);

        javaTypeMapper.setIdClassMapping(idClassMapping);

        jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
        adapter.setMessageConverter(jackson2JsonMessageConverter);
        container.setMessageListener(adapter);
        return container;
    }
複製代碼

public void consumeMessage(Packaged pack) {
        log.info("package對象, 消息內容, id: " + pack.getId() +
                ", name: " + pack.getName() +
                ", content: "+ pack.getDescription());
    }
複製代碼
@Test
    public void testSendMappingMessage() throws Exception {
        ObjectMapper mapper = new ObjectMapper();
        Order order = new Order();
        order.setId("1001");
        order.setName("1001訂單消息");
        order.setContent("1001訂單描述信息");
        String json1 = mapper.writeValueAsString(order);
        log.info("order java: " + json1);

        MessageProperties messageProperties1 = new MessageProperties();
        //這裏注意必定要修改contentType爲 application/json
        messageProperties1.setContentType("application/json");
        messageProperties1.getHeaders().put("__TypeId__", "order");
        Message message1 = new Message(json1.getBytes(), messageProperties1);
        rabbitTemplate.send("topic001", "spring.order", message1);

        Packaged pack = new Packaged();
        pack.setId("1002");
        pack.setName("1002包裹消息");
        pack.setDescription("1002包裹描述信息");
        String json2 = mapper.writeValueAsString(pack);
        log.info("pack java: " + json2);

        MessageProperties messageProperties2 = new MessageProperties();
        //這裏注意必定要修改contentType爲 application/json
        messageProperties2.setContentType("application/json");
        messageProperties2.getHeaders().put("__TypeId__", "packaged");
        Message message2 = new Message(json2.getBytes(), messageProperties2);
        rabbitTemplate.send("topic001", "spring.pack", message2);
    }
複製代碼

注意這裏面就不是寫類的路徑了,而是寫剛纔起的別名了

由於junitTest的關係,它發送完就關閉了,還有一條消息沒被消費

能夠上管控臺確認一下
這時候直接運行一下咱們項目 RabbitmqApiApplication ,就把剛纔剩餘的那條消息消費了

3. 二進制轉換器

先寫轉換處理

public class ImageMessageConverter implements MessageConverter {

    private static final Logger log = LoggerFactory.getLogger(ImageMessageConverter.class);

	@Override
	public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
		throw new MessageConversionException(" convert error ! ");
	}

	@Override
	public Object fromMessage(Message message) throws MessageConversionException {
		log.info("-----------Image MessageConverter----------");
		
		Object _extName = message.getMessageProperties().getHeaders().get("extName");
		String extName = _extName == null ? "png" : _extName.toString();
		
		byte[] body = message.getBody();
		String fileName = UUID.randomUUID().toString();
		String path = "G:/test/file/new/" + fileName + "." + extName;
		File f = new File(path);
		try {
			Files.copy(new ByteArrayInputStream(body), f.toPath());
		} catch (IOException e) {
			e.printStackTrace();
		}
		return f;
	}

}

複製代碼

消息接收

public void consumeMessage(File file) {
        log.info("文件對象 方法, 消息內容:" + file.getName());
    }
複製代碼

聲明一個全局的轉換器

@Bean   //connectionFactory 也是要和最上面方法名保持一致
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueues(queue001(), queue002(), queue003());    //監聽的隊列
        container.setConcurrentConsumers(1);    //當前的消費者數量
        container.setMaxConcurrentConsumers(5); //  最大的消費者數量
        container.setDefaultRequeueRejected(false); //是否重回隊列
        container.setAcknowledgeMode(AcknowledgeMode.AUTO); //簽收模式
        container.setExposeListenerChannel(true);
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {    //消費端的標籤策略
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + UUID.randomUUID().toString();
            }
        });

        // 6 ext convert
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        adapter.setDefaultListenerMethod("consumeMessage");

        //全局的轉換器:
        ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();

        TextMessageConverter textConvert = new TextMessageConverter();
        convert.addDelegate("text", textConvert);
        convert.addDelegate("html/text", textConvert);
        convert.addDelegate("xml/text", textConvert);
        convert.addDelegate("text/plain", textConvert);

        Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter();
        convert.addDelegate("json", jsonConvert);
        convert.addDelegate("application/json", jsonConvert);

        ImageMessageConverter imageConverter = new ImageMessageConverter();
        convert.addDelegate("image/png", imageConverter);
        convert.addDelegate("image", imageConverter);

        PDFMessageConverter pdfConverter = new PDFMessageConverter();
        convert.addDelegate("application/pdf", pdfConverter);


        adapter.setMessageConverter(convert);
        container.setMessageListener(adapter);

        return container;
    }
複製代碼

編寫單元測試來測試圖片

@Test
    public void testSendExtConverterMessage() throws Exception {
        byte[] body = Files.readAllBytes(Paths.get("G:/test/file", "1001.png"));
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("image/png");
        messageProperties.getHeaders().put("extName", "png");
        Message message = new Message(body, messageProperties);
        rabbitTemplate.send("", "image_queue", message);
    }
複製代碼

原圖片的本地文件夾路徑

運行測試後,圖片生成到指定的目錄下

此次來試試PDF
public class PDFMessageConverter implements MessageConverter {

    private static final Logger log = LoggerFactory.getLogger(PDFMessageConverter.class);

	@Override
	public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
		throw new MessageConversionException(" convert error ! ");
	}

	@Override
	public Object fromMessage(Message message) throws MessageConversionException {
		log.info("-----------PDF MessageConverter----------");
		
		byte[] body = message.getBody();
		String fileName = UUID.randomUUID().toString();
		String path = "G:/test/file/new/" + fileName + ".pdf";
		File f = new File(path);
		try {
			Files.copy(new ByteArrayInputStream(body), f.toPath());
		} catch (IOException e) {
			e.printStackTrace();
		}
		return f;
	}

}
複製代碼
@Test
    public void testSendExtConverterMessage() throws Exception {
//        byte[] body = Files.readAllBytes(Paths.get("G:/test/file", "1001.png"));
//        MessageProperties messageProperties = new MessageProperties();
//        messageProperties.setContentType("image/png");
//        messageProperties.getHeaders().put("extName", "png");
//        Message message = new Message(body, messageProperties);
//        rabbitTemplate.send("", "image_queue", message);

        byte[] body = Files.readAllBytes(Paths.get("G:/test/file", "AliTech101_RD.pdf"));
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("application/pdf");
        Message message = new Message(body, messageProperties);
        rabbitTemplate.send("", "pdf_queue", message);
    }
複製代碼

驗證 PDF 的處理是否成功了。
相關文章
相關標籤/搜索