Spring整合RabbitMQ-05-MessageConvert

消息轉換器java

RabbitConfig

package com.wyg.rabbitmq.springamqp;

import com.wyg.rabbitmq.springamqp.convert.MyPngMesssageConvert;
import com.wyg.rabbitmq.springamqp.convert.MyPDFMessageConvert;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.ConsumerTagStrategy;
import org.springframework.amqp.support.converter.ContentTypeDelegatingMessageConverter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.wyg.rabbitmq.springamqp.convert.MyTextMessageConvert;

/**
 * RabbitConfig
 * 
 * @author wyg0405@gmail.com
 * @date 2019-11-25 15:11
 * @since JDK1.8
 * @version V1.0
 */

@Configuration
public class RabbitConfig {

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setAddresses("localhost:5672");
        cachingConnectionFactory.setUsername("guest");
        cachingConnectionFactory.setPassword("guest");
        cachingConnectionFactory.setVirtualHost("/");
        return cachingConnectionFactory;
    }

    /**
     * RabbitAdmin注入容器
     * 
     * @param connectionFactory
     * @return
     * @throws @author
     *             wyg0405@gmail.com
     * @date 2019/11/25 16:35
     */
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);

        /*
         *  autoStartup 必需要設爲 true ,不然Spring容器不會加載RabbitAdmin類
         */
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }

    /**
     * RabbitTemplate注入
     * 
     * @param connectionFactory
     * @return
     * @throws @author
     *             wyg0405@gmail.com
     * @date 2019/11/25 16:37
     */
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }

    /**
     * SimpleMessageListenerContainer注入
     * 
     * @param connectionFactory
     * @return
     * @throws @author
     *             wyg0405@gmail.com
     * @date 2019/11/25 17:16
     */
    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        // 監聽多個queue
        container.addQueueNames("test01");
        //container.addQueueNames("test01", "test02", "test03");
        // 設置當前消費者數量
        container.setConcurrentConsumers(1);
        // 設置最大的消費者數量
        container.setMaxConcurrentConsumers(5);
        // 設置不要重回隊列
        container.setDefaultRequeueRejected(false);
        // 設置自動簽收
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        // 設置消費端tag策略
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + System.currentTimeMillis();
            }
        });


        // 方式二,使用適配器
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MyMessageListenerDelegate());
        // 自定處理消息方法,不設置默認爲handleMessage
        adapter.setDefaultListenerMethod("consumeMsg");
        // 自定義消息轉換器
        // adapter.setMessageConverter(new MyTextMessageConvert());

        // 1.自定義消息轉換器 Json,實際上json對應的是Map類型參數
        /* Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter();
        adapter.setMessageConverter(jsonMessageConverter);*/

        // 2.Java對象
        /*Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter();
        DefaultJackson2JavaTypeMapper defaultJackson2JavaTypeMapper = new DefaultJackson2JavaTypeMapper();
        defaultJackson2JavaTypeMapper.setTrustedPackages("*");
        jsonMessageConverter.setJavaTypeMapper(defaultJackson2JavaTypeMapper);
        adapter.setMessageConverter(jsonMessageConverter);*/

        // 3.Java對象多映射關係
        /*Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter();
        DefaultJackson2JavaTypeMapper defaultJackson2JavaTypeMapper = new DefaultJackson2JavaTypeMapper();
        Map<String, Class<?>> map = new HashMap<>();
        map.put("order", Order.class);
        map.put("user", User.class);
        defaultJackson2JavaTypeMapper.setTrustedPackages("*");
        defaultJackson2JavaTypeMapper.setIdClassMapping(map);
        jsonMessageConverter.setJavaTypeMapper(defaultJackson2JavaTypeMapper);
        adapter.setMessageConverter(jsonMessageConverter);*/

        // 4.全局轉換器
        ContentTypeDelegatingMessageConverter converter = new ContentTypeDelegatingMessageConverter();
        MyTextMessageConvert textMessageConvert = new MyTextMessageConvert();
        converter.addDelegate("text", textMessageConvert);
        converter.addDelegate("appliction/text", textMessageConvert);
        converter.addDelegate("text/plain", textMessageConvert);

        Jackson2JsonMessageConverter jsonMessageConverter = new Jackson2JsonMessageConverter("*");
        converter.addDelegate("json", jsonMessageConverter);
        converter.addDelegate("application/json", jsonMessageConverter);

        // png
        converter.addDelegate("png", new MyPngMesssageConvert());
        converter.addDelegate("image/png", new MyPngMesssageConvert());

        // pdf
        converter.addDelegate("pdf", new MyPDFMessageConvert());
        adapter.setMessageConverter(converter);

        container.setMessageListener(adapter);
        return container;
    }

}

MyMessageListenerDelegate

package com.wyg.rabbitmq.springamqp;

import java.io.File;
import java.util.Map;

import com.wyg.rabbitmq.springamqp.convert.Order;
import com.wyg.rabbitmq.springamqp.convert.User;

/**
 *
 * @author wyg0405@gmail.com
 * @date 2019-11-29 14:23
 * @since JDK1.8
 * @version V1.0
 */

public class MyMessageListenerDelegate {

    // 默認方法
    public void handleMessage(byte[] body) {
        System.out.println("默認處理方法,message:" + new String(body));
    }

    // 自定義處理方法
    public void consumeMsg(byte[] msgBody) {
        System.out.println("自定義處理方法,message:" + new String(msgBody));
    }

    // 自定義處理String類型消息方法
    public void consumeMsg(String msgBody) {
        System.out.println("自定義處理String消息方法,message:" + new String(msgBody));
    }

    public void method01(String msgBody) {
        System.out.println("method01處理String消息方法,message:" + new String(msgBody));
    }

    public void method02(String msgBody) {
        System.out.println("method02處理String消息方法,message:" + new String(msgBody));
    }

    public void method03(String msgBody) {
        System.out.println("method03處理String消息方法,message:" + new String(msgBody));
    }

    // json 對應map類型參數
    public void consumeMsg(Map msgBody) {
        System.out.println("自定義json處理方法,message:" + msgBody.toString());
    }

    // Java對象 對應Order類型參數
    public void consumeMsg(Order msgBody) {
        System.out.println("自定義Order對象處理方法,message:" + msgBody.toString());
    }

    // Java對象 對應User類型參數
    public void consumeMsg(User user) {
        System.out.println("自定義User對象處理方法,message:" + user.toString());
    }

    // File類型的參數
    public void consumeMsg(File file) {
        System.out.println("自定義File對象處理方法,message:" + file.getPath());
    }
}

Order

package com.wyg.rabbitmq.springamqp.convert;

import java.io.Serializable;

/**
 *
 * @author wyg0405@gmail.com
 * @date 2019-12-01 15:47
 * @since JDK1.8
 * @version V1.0
 */

public class Order implements Serializable {

    private static final long serialVersionUID = -4975357142857575433L;
    private String id;
    private String content;
    private double price;

    public Order() {}

    public Order(String id, String content, double price) {
        this.id = id;
        this.content = content;
        this.price = price;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    public double getPrice() {
        return price;
    }

    public void setPrice(double price) {
        this.price = price;
    }

    @Override
    public String toString() {
        final StringBuilder sb = new StringBuilder("{");
        sb.append("\"id\":\"").append(id).append('\"');
        sb.append(",\"content\":\"").append(content).append('\"');
        sb.append(",\"price\":").append(price);
        sb.append('}');
        return sb.toString();
    }
}

User

package com.wyg.rabbitmq.springamqp.convert;

import java.io.Serializable;

/**
 *
 * @author wyg0405@gmail.com
 * @date 2019-12-01 17:20
 * @since JDK1.8
 * @version V1.0
 */

public class User implements Serializable {

    private static final long serialVersionUID = 2959945432292661959L;
    private String id;
    private String name;
    private int age;

    public User() {}

    public User(String id, String name, int age) {
        this.id = id;
        this.name = name;
        this.age = age;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    @Override
    public String toString() {
        final StringBuilder sb = new StringBuilder("{");
        sb.append("\"id\":\"").append(id).append('\"');
        sb.append(",\"name\":\"").append(name).append('\"');
        sb.append(",\"age\":").append(age);
        sb.append('}');
        return sb.toString();
    }
}

MyTextMessageConvert

自定義文本消息轉換器spring

package com.wyg.rabbitmq.springamqp.convert;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;

/**
 * 自定義消息轉換器
 * 
 * @author wyg0405@gmail.com
 * @date 2019-12-01 15:11
 * @since JDK1.8
 * @version V1.0
 */

public class MyTextMessageConvert implements MessageConverter {

    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        return new Message(object.toString().getBytes(), messageProperties);
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        String contentType = message.getMessageProperties().getContentType();
        if (null != contentType && contentType.contains("text")) {
            // 轉換爲String
            return new String(message.getBody());
        }
        return message.getBody();
    }

}

MyPngMesssageConvert

自定義png轉換器json

package com.wyg.rabbitmq.springamqp.convert;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;

/**
 *
 * @author wyg0405@gmail.com
 * @date 2019-12-01 19:32
 * @since JDK1.8
 * @version V1.0
 */

public class MyPngMesssageConvert implements MessageConverter {

    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        return null;
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        System.out.println("---------PNGMessageConvert---------");
        String contentType = message.getMessageProperties().getContentType();
        String extName = (String)message.getMessageProperties().getHeaders().get("extName");
        String filename = System.currentTimeMillis() + extName;
          // 將文件流讀到本地
          String filePath="E:\\"+filename;
          File file= new File(filePath);
          try {
            Files.copy(new ByteArrayInputStream(message.getBody()),file.toPath());
          } catch (IOException e) {
            e.printStackTrace();
          }
          return file;

    }
}

MyPDFMessageConvert

自定義pdf轉換器app

package com.wyg.rabbitmq.springamqp.convert;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;

/**
 *
 * @author wyg0405@gmail.com
 * @date 2019-12-01 19:34
 * @since JDK1.8
 * @version V1.0
 */

public class MyPDFMessageConvert implements MessageConverter {

    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        return null;
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        System.out.println("---------PDFMessageConvert---------");
        String contentType = message.getMessageProperties().getContentType();
        String extName = (String)message.getMessageProperties().getHeaders().get("extName");
        String filename = System.currentTimeMillis() + extName;
        // 將文件流讀到本地
        String filePath = "E:\\" + filename;
        File file = new File(filePath);
        try {
            Files.copy(new ByteArrayInputStream(message.getBody()), file.toPath());
        } catch (IOException e) {
            e.printStackTrace();
        }
        return file;

    }

}

單元測試ide

package com.wyg.rabbitmq.springamqp;

import java.io.*;
import java.lang.reflect.Field;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.wyg.rabbitmq.springamqp.convert.Order;
import com.wyg.rabbitmq.springamqp.convert.User;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitConfigTest {
    @Autowired
    RabbitAdmin rabbitAdmin;
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private SimpleMessageListenerContainer simpleMessageListenerContainer;
   
    @Test
    public void testSimpleMessageListenerContainerSendMsg() {
        // 分別向 隊列 "test01", "test02", "test03" 發消息,"test01", "test02",
        // "test03"與springdemo.direct已經綁定,routingKey都爲orderRoutingKey
        for (int i = 0; i < 3; i++) {
            rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", ("第" + i + "條消息").getBytes());

        }
    }

    @Test
    public void sendTextMsg() {
        // 分別向 隊列 "test01", "test02", "test03" 發消息,"test01", "test02",
        // "test03"與springdemo.direct已經綁定,routingKey都爲orderRoutingKey
        for (int i = 0; i < 3; i++) {
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("application/text");
            String body = "第" + i + "條消息";
            Message msg = new Message(body.getBytes(), messageProperties);
            rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", msg);

        }
    }

    @Test
    public void sendJsonMsg() throws JsonProcessingException {
        // 分別向 隊列 "test01", "test02", "test03" 發消息,"test01", "test02",
        // "test03"與springdemo.direct已經綁定,routingKey都爲orderRoutingKey
        Order order = new Order("1", "吃的喝的", 19);
        ObjectMapper mapper = new JsonMapper();
        String json = mapper.writeValueAsString(order);
        System.out.println("order to json:" + json);

        MessageProperties messageProperties = new MessageProperties();
        // 這裏 ContentType 必定要寫成 "application/json"
        messageProperties.setContentType("application/json");
        Message msg = new Message(json.getBytes(), messageProperties);
        rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", msg);

    }

    @Test
    public void sendJavaTypeMsg() throws JsonProcessingException {
        // 分別向 隊列 "test01", "test02", "test03" 發消息,"test01", "test02",
        // "test03"與springdemo.direct已經綁定,routingKey都爲orderRoutingKey
        Order order = new Order("1", "吃的喝的", 19);
        ObjectMapper mapper = new JsonMapper();
        String order2Json = mapper.writeValueAsString(order);
        System.out.println("order to json:" + order2Json);

        MessageProperties messageProperties = new MessageProperties();
        // 這裏 ContentType 必定要寫成 "application/json"
        messageProperties.setContentType("application/json");
        // __TypeId__ 爲固定形式
        messageProperties.getHeaders().put("__TypeId__", "com.wyg.rabbitmq.springamqp.convert.Order");
        Message msg = new Message(order2Json.getBytes(), messageProperties);
        rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", msg);

    }

    @Test
    public void sendJavaTypeMappingMsg() throws JsonProcessingException {
        // 分別向 隊列 "test01", "test02", "test03" 發消息,"test01", "test02",
        // "test03"與springdemo.direct已經綁定,routingKey都爲orderRoutingKey
        // 發送Order類型
        Order order = new Order("1", "吃的喝的", 19);
        ObjectMapper mapper = new JsonMapper();
        String order2Json = mapper.writeValueAsString(order);
        System.out.println("order to json:" + order2Json);
        MessageProperties messageProperties = new MessageProperties();
        // 這裏 ContentType 必定要寫成 "application/json"
        messageProperties.setContentType("application/json");
        // __TypeId__ 爲固定形式, order標籤
        messageProperties.getHeaders().put("__TypeId__", "order");
        Message msg = new Message(order2Json.getBytes(), messageProperties);
        rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", msg);

        // 發送User類型
        User user = new User("111", "jack", 20);
        String user2Json = mapper.writeValueAsString(user);
        System.out.println("User to Json:" + user2Json);

        MessageProperties messageProperties2 = new MessageProperties();
        // __TypeId__ 爲固定形式, user標籤
        messageProperties2.getHeaders().put("__TypeId__", "user");
        Message usermsg = new Message(user2Json.getBytes(), messageProperties2);
        rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", usermsg);
    }

    @Test
    public void sendPNGMsg() throws IOException {
        // 分別向 隊列 "test01", "test02", "test03" 發消息,"test01", "test02",
        // "test03"與springdemo.direct已經綁定,routingKey都爲orderRoutingKey
        String filePath="C:\\Users\\wyg04\\Desktop\\image\\green.png";
        byte[] allBytes = Files.readAllBytes(Paths.get(filePath));


        MessageProperties messageProperties = new MessageProperties();
        // 這裏 ContentType 必定要寫成 "image/png"或"png"
        messageProperties.setContentType("png");
        // 拓展名
        messageProperties.getHeaders().put("extName", ".png");
        Message msg = new Message(allBytes, messageProperties);
        rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", msg);

    }

    @Test
    public void sendPDFMsg() throws IOException {
        // 分別向 隊列 "test01", "test02", "test03" 發消息,"test01", "test02",
        // "test03"與springdemo.direct已經綁定,routingKey都爲orderRoutingKey
        String filePath="C:\\Users\\wyg04\\Desktop\\深度工做.pdf";
        byte[] allBytes = Files.readAllBytes(Paths.get(filePath));


        MessageProperties messageProperties = new MessageProperties();
        // 這裏 ContentType 必定要寫成 "pdf"
        messageProperties.setContentType("pdf");
        // 拓展名
        messageProperties.getHeaders().put("extName", ".pdf");
        Message msg = new Message(allBytes, messageProperties);
        rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", msg);

    }

}
相關文章
相關標籤/搜索