RabbitMQ封裝實戰

先說下背景:上週開始給項目添加曾經沒有過的消息中間件。雖說,一路到頭很是容易,直接google,萬事不愁~但是生活遠不只是眼前的「苟且」。首先是想使用其餘項目使用過的一套對mq封裝的框架,融合進來。雖然折騰了上週六週日兩天,總算吧老框架融進項目中了,但是週一來公司和大數據哥們兒一聯調發現,收不到數據!因此沒辦法,當場使用原生那一套擼了個版本出來~但是,但是,但是,俗話說得好:生命在於折騰!在上週末融合老框架的時候,我把源碼讀了遍,發現了不少很好的封裝思想,Ok,這週末總算閒了下來,我就運用這個思想,封裝一個輕量級的唄,說幹就幹!java

主要思想

說到封裝,我想,應該主要是要儘量減少用戶使用的複雜度,儘可能少的進行配置,書寫,甚至能儘可能少的引入第三發或是原生類庫。因此在這種想法之下,這套框架的精髓主要在如下幾點:git

  • 使用註解,減小用戶配置
  • 將不一樣的生產者消費者的初始化方式統一
  • 初次註冊生產者或者消費者的時候,進行隊列的自動註冊
  • 再統一的初始化方式中,使用動態代理的方式,代理到具體的生產者或是消費者的發送接收方法

在這種模式下,咱們不用過多的配置,直接創建一個接口,接口上面使用註解聲明隊列的名稱,而後使用同一的Bean進行初始化,就齊活了!github

統一初始化Bean的實現

不說啥,直接上代碼:spring

public class RabbitMQProducerFactoryBean<T> extends RabbitMQProducerInterceptor implements FactoryBean<T> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    private Class<?> serviceInterface;

    @Autowired
    private ConnectionFactory rabbitConnectionFactory;

    @Value("${mq.queue.durable}")
    private String durable;

    @Value("${mq.queue.exclusive}")
    private String exclusive;

    @Value("${mq.queue.autoDelete}")
    private String autoDelete;

    @SuppressWarnings("unchecked")

    /**
    這個方法很特殊,繼承自FactoryBean,就是說管理權歸屬IoC容器。每次註冊一個隊列的時候,而且注入到具體的service中使用的時候,就會調用這個getObject方法。因此,對於使用本類初始化的bean,其類型並不是本類,而是本類的屬性serviceInterface類型,由於最終getObject的結果是返回了一個動態代理,代理到了serviceInterface。
    **/
    @Override
    public T getObject() throws Exception {

        //初始化
        if (getQueueName() != null) {
            logger.info("指定的目標列隊名[{}],覆蓋接口定義。", getQueueName());
        } else {
            RPCQueueName name = serviceInterface.getAnnotation(RPCQueueName.class);
            if (name == null)
                throw new IllegalArgumentException("接口" + serviceInterface.getCanonicalName() + "沒有指定@RPCQueueName");
            setQueueName(name.value());
        }
        //建立隊列
        declareQueue();
        logger.info("創建MQ客戶端代理接口[{}],目標隊列[{}]。", serviceInterface.getCanonicalName(), getQueueName());

        return (T) Proxy.newProxyInstance(getClass().getClassLoader(), new Class<?>[]{serviceInterface}, this);//動態代理到目標接口
    }

    private void declareQueue() {
        Connection connection = rabbitConnectionFactory.createConnection();
        Channel channel = connection.createChannel(true);
        try {
            channel.queueDeclare(getQueueName(), Boolean.valueOf(durable), Boolean.valueOf(exclusive)
                    , Boolean.valueOf(autoDelete), null);
            logger.info("註冊隊列成功!");
        } catch (IOException e) {
            logger.warn("隊列註冊失敗", e);
        }
    }
......

}


public class RabbitMQProducerInterceptor implements InvocationHandler {



    private Logger logger = LoggerFactory.getLogger(getClass());


    private String queueName;

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Object sendObj;
        Class<?>[] parameterTypes = method.getParameterTypes();
        String methodName = method.getName();
        boolean isSendOneJson = Objects.nonNull(args) && args.length == 1 && (args[0] instanceof String);
        if (isSendOneJson) {
            sendObj = args[0];
            logger.info("發送單一json字符串消息:{}", (String) sendObj);
        } else {
            sendObj = new RemoteInvocation(methodName, parameterTypes, args);
            logger.info("發送封裝消息體:{}", JSONSerializeUtil.jsonSerializerNoType(sendObj));
        }


        logger.info("發送異步消息到[{}],方法名爲[{}]", queueName, method.getName());
        //異步方式使用,同時要告知服務端不要發送響應
        amqpTemplate.convertAndSend(queueName, sendObj);
        return null;

    }

    ......
}

下面是核心的配置文件json

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<beans default-lazy-init="false"
    xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:p="http://www.springframework.org/schema/p" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd
        http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.1.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd">

    <rabbit:connection-factory id="rabbitConnectionFactory"
        host="${mq.host}" port="${mq.port}" virtual-host="${mq.vhost}"
        username="${mq.username}" password="${mq.password}" />

    <!-- 供自動建立隊列 -->
    <rabbit:admin connection-factory="rabbitConnectionFactory" />

    <rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory"/>

    <!-- 建立生產者 -->
    <bean id="sendMsg" class="com.example.demo.RabbitMQProducerFactoryBean">
        <property name="serviceInterface" value="com.example.demo.ISendMsg" />
    </bean>


</beans>

說明:每次要使用mq,直接導入這個基本配置,和基礎jar包便可。對於配置文件中的生產者聲明,已經直接簡化到三行,這一部分能夠單首創建一個相似於producer-config.xml專門的配置文件。springboot

附屬類

這裏主要就是涉及一個註解類:app

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface RPCQueueName {

    String value();
}

說明:主要用於隊列名稱的聲明。能夠拓展的再創建其餘的註解類,並在RabbitMQProducerFactoryBean中進行具體的邏輯實現。對於將來功能添加,起到了很是好的解耦效果。框架

具體的接口:異步

@RPCQueueName("test.demo.ISendMsg")
public interface ISendMsg {

    void sendMsg(String msg);
}

說明:這樣,就聲明瞭個隊列名叫test.demo.ISendMsg的生產者,每次講IsendMsg注入到要發送消息的Service裏面,直接調用sendMsg便可向註解聲明的隊列發送消息了。ide

恩,開源

寫了個springboot的小demo:
github地址

接下來我會更新消費者的封裝,今天先放一放,出去動動。。哈哈

相關文章
相關標籤/搜索