Spring整合RabbitMQ-04-MessageListenerAdapter

RabbitConfig

package com.wyg.rabbitmq.springamqp;

import com.wyg.rabbitmq.springamqp.convert.MyPngMesssageConvert;
import com.wyg.rabbitmq.springamqp.convert.MyPDFMessageConvert;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.ConsumerTagStrategy;
import org.springframework.amqp.support.converter.ContentTypeDelegatingMessageConverter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.wyg.rabbitmq.springamqp.convert.MyTextMessageConvert;

/**
 * RabbitAdmin
 * 
 * @author wyg0405@gmail.com
 * @date 2019-11-25 15:11
 * @since JDK1.8
 * @version V1.0
 */

@Configuration
public class RabbitConfig {

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setAddresses("localhost:5672");
        cachingConnectionFactory.setUsername("guest");
        cachingConnectionFactory.setPassword("guest");
        cachingConnectionFactory.setVirtualHost("/");
        return cachingConnectionFactory;
    }

    /**
     * RabbitAdmin注入容器
     * 
     * @param connectionFactory
     * @return
     * @throws @author
     *             wyg0405@gmail.com
     * @date 2019/11/25 16:35
     */
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);

        /*
         *  autoStartup 必需要設爲 true ,不然Spring容器不會加載RabbitAdmin類
         */
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }

    /**
     * RabbitTemplate注入
     * 
     * @param connectionFactory
     * @return
     * @throws @author
     *             wyg0405@gmail.com
     * @date 2019/11/25 16:37
     */
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }

    /**
     * SimpleMessageListenerContainer注入
     * 
     * @param connectionFactory
     * @return
     * @throws @author
     *             wyg0405@gmail.com
     * @date 2019/11/25 17:16
     */
    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        // 監聽多個queue
        container.addQueueNames("test01");
        //container.addQueueNames("test01", "test02", "test03");
        // 設置當前消費者數量
        container.setConcurrentConsumers(1);
        // 設置最大的消費者數量
        container.setMaxConcurrentConsumers(5);
        // 設置不要重回隊列
        container.setDefaultRequeueRejected(false);
        // 設置自動簽收
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        // 設置消費端tag策略
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + System.currentTimeMillis();
            }
        });

        // 方式二,使用適配器
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MyMessageListenerDelegate());
        // 自定處理消息方法,不設置默認爲handleMessage
        adapter.setDefaultListenerMethod("consumeMsg");
        // 自定義消息轉換器
        adapter.setMessageConverter(new MyTextMessageConvert());

        container.setMessageListener(adapter);
        return container;
    }

}

MyMessageListenerDelegate,自定義適配器

package com.wyg.rabbitmq.springamqp;

import java.io.File;
import java.util.Map;

import com.wyg.rabbitmq.springamqp.convert.Order;
import com.wyg.rabbitmq.springamqp.convert.User;

/**
 *
 * @author wyg0405@gmail.com
 * @date 2019-11-29 14:23
 * @since JDK1.8
 * @version V1.0
 */

public class MyMessageListenerDelegate {

    // 默認方法
    public void handleMessage(byte[] body) {
        System.out.println("默認處理方法,message:" + new String(body));
    }

    // 自定義處理方法
    public void consumeMsg(byte[] msgBody) {
        System.out.println("自定義處理方法,message:" + new String(msgBody));
    }

    // 自定義處理String類型消息方法
    public void consumeMsg(String msgBody) {
        System.out.println("自定義處理String消息方法,message:" + new String(msgBody));
    }

 
}

RabbitConfigTest 單元測試

package com.wyg.rabbitmq.springamqp;

import java.io.*;
import java.lang.reflect.Field;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.wyg.rabbitmq.springamqp.convert.Order;
import com.wyg.rabbitmq.springamqp.convert.User;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitConfigTest {
    @Autowired
    RabbitAdmin rabbitAdmin;
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private SimpleMessageListenerContainer simpleMessageListenerContainer;

    @Test
    public void sendTextMsg() {
        // 分別向 隊列 "test01", "test02", "test03" 發消息,"test01", "test02",
        // "test03"與springdemo.direct已經綁定,routingKey都爲orderRoutingKey
        for (int i = 0; i < 3; i++) {
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("application/text");
            String body = "第" + i + "條消息";
            Message msg = new Message(body.getBytes(), messageProperties);
            rabbitTemplate.convertAndSend("springdemo.direct", "orderRoutingKey", msg);
        }
    } 
}
相關文章
相關標籤/搜索