先說下背景:上週開始給項目添加曾經沒有過的消息中間件。雖說,一路到頭很是容易,直接google,萬事不愁~但是生活遠不只是眼前的「苟且」。首先是想使用其餘項目使用過的一套對mq封裝的框架,融合進來。雖然折騰了上週六週日兩天,總算吧老框架融進項目中了,但是週一來公司和大數據哥們兒一聯調發現,收不到數據!因此沒辦法,當場使用原生那一套擼了個版本出來~但是,但是,但是,俗話說得好:生命在於折騰!在上週末融合老框架的時候,我把源碼讀了遍,發現了不少很好的封裝思想,Ok,這週末總算閒了下來,我就運用這個思想,封裝一個輕量級的唄,說幹就幹!java
說到封裝,我想,應該主要是要儘量減少用戶使用的複雜度,儘可能少的進行配置,書寫,甚至能儘可能少的引入第三發或是原生類庫。因此在這種想法之下,這套框架的精髓主要在如下幾點:git
在這種模式下,咱們不用過多的配置,直接創建一個接口,接口上面使用註解聲明隊列的名稱,而後使用同一的Bean進行初始化,就齊活了!github
不說啥,直接上代碼:spring
public class RabbitMQProducerFactoryBean<T> extends RabbitMQProducerInterceptor implements FactoryBean<T> { private Logger logger = LoggerFactory.getLogger(getClass()); private Class<?> serviceInterface; @Autowired private ConnectionFactory rabbitConnectionFactory; @Value("${mq.queue.durable}") private String durable; @Value("${mq.queue.exclusive}") private String exclusive; @Value("${mq.queue.autoDelete}") private String autoDelete; @SuppressWarnings("unchecked") /** 這個方法很特殊,繼承自FactoryBean,就是說管理權歸屬IoC容器。每次註冊一個隊列的時候,而且注入到具體的service中使用的時候,就會調用這個getObject方法。因此,對於使用本類初始化的bean,其類型並不是本類,而是本類的屬性serviceInterface類型,由於最終getObject的結果是返回了一個動態代理,代理到了serviceInterface。 **/ @Override public T getObject() throws Exception { //初始化 if (getQueueName() != null) { logger.info("指定的目標列隊名[{}],覆蓋接口定義。", getQueueName()); } else { RPCQueueName name = serviceInterface.getAnnotation(RPCQueueName.class); if (name == null) throw new IllegalArgumentException("接口" + serviceInterface.getCanonicalName() + "沒有指定@RPCQueueName"); setQueueName(name.value()); } //建立隊列 declareQueue(); logger.info("創建MQ客戶端代理接口[{}],目標隊列[{}]。", serviceInterface.getCanonicalName(), getQueueName()); return (T) Proxy.newProxyInstance(getClass().getClassLoader(), new Class<?>[]{serviceInterface}, this);//動態代理到目標接口 } private void declareQueue() { Connection connection = rabbitConnectionFactory.createConnection(); Channel channel = connection.createChannel(true); try { channel.queueDeclare(getQueueName(), Boolean.valueOf(durable), Boolean.valueOf(exclusive) , Boolean.valueOf(autoDelete), null); logger.info("註冊隊列成功!"); } catch (IOException e) { logger.warn("隊列註冊失敗", e); } } ...... } public class RabbitMQProducerInterceptor implements InvocationHandler { private Logger logger = LoggerFactory.getLogger(getClass()); private String queueName; @Autowired private AmqpTemplate amqpTemplate; @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Object sendObj; Class<?>[] parameterTypes = method.getParameterTypes(); String methodName = method.getName(); boolean isSendOneJson = Objects.nonNull(args) && args.length == 1 && (args[0] instanceof String); if (isSendOneJson) { sendObj = args[0]; logger.info("發送單一json字符串消息:{}", (String) sendObj); } else { sendObj = new RemoteInvocation(methodName, parameterTypes, args); logger.info("發送封裝消息體:{}", JSONSerializeUtil.jsonSerializerNoType(sendObj)); } logger.info("發送異步消息到[{}],方法名爲[{}]", queueName, method.getName()); //異步方式使用,同時要告知服務端不要發送響應 amqpTemplate.convertAndSend(queueName, sendObj); return null; } ...... }
下面是核心的配置文件json
<?xml version="1.0" encoding="UTF-8" standalone="no"?> <beans default-lazy-init="false" xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.1.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <rabbit:connection-factory id="rabbitConnectionFactory" host="${mq.host}" port="${mq.port}" virtual-host="${mq.vhost}" username="${mq.username}" password="${mq.password}" /> <!-- 供自動建立隊列 --> <rabbit:admin connection-factory="rabbitConnectionFactory" /> <rabbit:template id="amqpTemplate" connection-factory="rabbitConnectionFactory"/> <!-- 建立生產者 --> <bean id="sendMsg" class="com.example.demo.RabbitMQProducerFactoryBean"> <property name="serviceInterface" value="com.example.demo.ISendMsg" /> </bean> </beans>
說明:每次要使用mq,直接導入這個基本配置,和基礎jar包便可。對於配置文件中的生產者聲明,已經直接簡化到三行,這一部分能夠單首創建一個相似於producer-config.xml專門的配置文件。springboot
這裏主要就是涉及一個註解類:app
@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) public @interface RPCQueueName { String value(); }
說明:主要用於隊列名稱的聲明。能夠拓展的再創建其餘的註解類,並在RabbitMQProducerFactoryBean中進行具體的邏輯實現。對於將來功能添加,起到了很是好的解耦效果。框架
具體的接口:異步
@RPCQueueName("test.demo.ISendMsg") public interface ISendMsg { void sendMsg(String msg); }
說明:這樣,就聲明瞭個隊列名叫test.demo.ISendMsg的生產者,每次講IsendMsg注入到要發送消息的Service裏面,直接調用sendMsg便可向註解聲明的隊列發送消息了。ide
寫了個springboot的小demo:
github地址
接下來我會更新消費者的封裝,今天先放一放,出去動動。。哈哈