最近開始接觸一些關於MQ的開發,使用的是阿里雲提供的RocketMQ工具;RocketMQ採用二級分類梯度來過濾消息;java
一級是在topic,二級是tag標籤,咱們的代碼目的:主要目的消費從阿里雲隊列上拉去下來的消息;spring
/** * * @author linweifeng * @since jdk 1.6 * @history 2016-11-07 created */ public class MqListener implements MessageListener { // 日誌工具類 private final static Logger log = Logger.getLogger(MqListener.class); public Action consume(Message message, ConsumeContext context) { try { if(MQConstants.XXXTag1.equals(message.getTag())){ // TODO // do something }else if(MQConstants.XXXTag2.equals(message.getTag())){ // TODO // do something }else if(MQConstants.XXXTag3.equals(message.getTag())){ // TODO // do something }.... else { // TODO // do something } return Action.CommitMessage; }catch (Exception e) { log.error("消息消費失敗:"+e.getMessage()); return Action.ReconsumeLater; } } }
看到這樣的代碼,你們都會考慮到說:if -else 這麼多,代碼越寫會越臃腫,並且閱讀性不好。if-else if是會從頭至尾都進行對比,直到找到結果,那麼頗有可能有時候會一直刷到最後一個,性能也會不好。並且每次我須要新增長一個tag對應的handler的時候,我都須要修改這個類,很明顯就跟開閉原則衝突;可是,它也有它的優勢,好比代碼複雜度很低,基本上一目瞭然;express
做爲初出菜鳥,看到這麼多的if else ,首先我會考慮將至轉爲switch的方式,爲此我須要把MqConstants這個靜態常量類改爲enum,由於只有jdk 1.7以上的版本switch是能夠支持字符串類型;代碼以下:編程
/** * * @author linweifeng * @since jdk 1.6 * @history 2016-11-07 created */ public class MqListener implements MessageListener { // 日誌工具類 private final static Logger log = Logger.getLogger(MqListener.class); public Action consume(Message message, ConsumeContext context) { try { switch(meesage.geTag()){ case MqEnum.XXXTag1: // TODO //do something break; case MqEnum.XXXTag2: // TODO //do something break; case MqEnum.XXXTag3: // TODO //do something break; case MqEnum.XXXTag4: // TODO //do something break; defalut: // TODO //exception } return Action.CommitMessage; }catch (Exception e) { log.error("消息消費失敗:"+e.getMessage()); return Action.ReconsumeLater; } } }
這樣子作比以前是好了一些,可是仍是有不少的不足,好比這些處理類,我是否 能夠將至抽取了,java編程,更推薦使用接口編程,因此我是否能夠抽取出這些類的共有部分去作一些事呢?api
/** * * @author linweifeng * @since jdk 1.6 * @history 2016-11-07 created */ public interface IMQHandler { /** * 做爲接口,我不關注實現,具體業務的實現邏輯,我交給實現接口的類去完成 * @param messageBody */ public void handler(String messageBody); }
抽取出這個接口以後,我之後全部須要處理的類,都繼承這個接口,好比我有一個登陸的處理類,它要作的可能 就是說,我登陸以後,我須要記錄登陸日誌,或者其餘一些我並不須要便可執行的業務;框架
/** * * @author linweifeng * @since jdk 1.6 * @history 2016-11-07 created */ public class LoginHandler implements IMQHandler{ @Override public void handler(String messageBody) { // TODO Auto-generated method stub //do something log.insert(); } }
這麼作有什麼好處?彷佛有些累贅,存粹是爲樂方便作擴展,只不過這個擴展是將來必須的,可能下一秒 你就須要作的擴展;千萬不要堅信去作不少將來你不必定會擴展,或者維護的東西,而後去過多的抽取接口;那樣存粹給本身找麻煩;ide
假如我有一個建立者工具類:負責建立全部的這些實現的子類對象,以下:函數
import java.util.HashMap; import java.util.Map; /** * * @author linweifeng * @since jdk 1.6 * @history 2016-11-07 created */ public class MQHandlerBuilderUtil { /** * 定義map,去作一個映射,由於我得最終目的是:我消費者的監聽器根據不一樣的標籤是調用不一樣的 * 實現 */ Map<String ,IMQHandler> map ; private static MQHandlerBuilderUtil mqHandlerBuilder = null; private MQHandlerBuilderUtil(){ map = new HashMap<String, IMQHandler>(); //初始化後,我將 全部的實現丟在這裏 map.put(MQConstants.xxxTag1, new LoginHandler()); //map.put(MQConstants.xxxTag1, new xxxxxHandler()); // .... } /** * 獲取map對象 * @return */ public Map<String, IMQHandler> getMap() { return map; } /** * 搞個單例模式,獲取到個人建立者的對象,這裏最好是搞個鎖,鎖住 * @return */ public static MQHandlerBuilderUtil getInstance(){ if(mqHandlerBuilder == null){ mqHandlerBuilder = new MQHandlerBuilderUtil(); } return mqHandlerBuilder; } /** * 獲取handler的實現 * @param tag * @return */ public static IMQHandler getHandler(String tag){ return MQHandlerBuilderUtil.getInstance().getMap().get(tag); } }
在這個建立內,我把全部的子類實現都丟在初始化構造函數中,你也能夠根據具體的業務需求去作一些改變,好比我喜歡動態建立,或者其餘的一些方式,隨你喜歡;工具
有了這個類,我有什麼好處呢?性能
/** * * @author linweifeng * @since jdk 1.6 * @history 2016-11-07 created */ public class MqListener implements MessageListener { // 日誌工具類 private final static Logger log = Logger.getLogger(MqListener.class); public Action consume(Message message, ConsumeContext context) { try { IMQHandler handler = MQHandlerBuilderUtil.getHandler(message.getTag()); if(handler == null){ throw new Exception("沒有找到合適的處理器"); } handler.handler(message.getBody()); return Action.CommitMessage; }catch (Exception e) { log.error("消息消費失敗:"+e.getMessage()); return Action.ReconsumeLater; } } }
之後我不在關注監聽器的問題了,若是我有新的業務需求,我只須要修改,而後將類添加到builder中,或者你能夠本身實現一些辦法,自動去添加到map中,或者動態的去建立那些處理器。這樣作能夠將你的業務與監聽器分離開來,讓你更加關注業務的需求,並且沒有那些可怕的if了,並且寫完以後,你是否會以爲本身棒棒噠?壞處呢?全部的bean都被建立,內存一會兒就佔用了很多呢;
好了,這個辦法彷佛看起來已經不錯了,可是咱們是javaer啊,能夠絕不誇張地說,java的開發是一個又一個工具的組合去最終實現你的業務需求,這也是java的魅力所在;因此,這個時候,在對象的建立於銷燬,到注入的過程,咱們最早想到的是spring這個框架;
美的不可方物的ioc,aop;
咱們先到阿里雲MQ官網找一下監聽器配置,以下:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <!-- 生產者配置--> <bean id="producer" class="com.aliyun.openservices.ons.api.bean.ProducerBean" init-method="start" destroy-method="shutdown"> <property name="properties" > <!--生產者配置信息--> <props> <prop key="ProducerId">PID_DEMO</prop> <!--請替換XXX--> <prop key="AccessKey">XXX</prop> <prop key="SecretKey">XXX</prop> </props> </property> <!-- 消費者配置 --> <bean id="loginHandler" class="…….LoginHandler"></bean> <bean id="msgListener" class="demo.DemoMessageListener"> <property name="handlerMap"> <map> <entry key="XXXTag1" value-ref="loginHandler"></entry> <entry key="XXXTag2" value-ref="XXXXHandler"></entry> <entry key="XXXTag3" value-ref="XXXXHandler"></entry> <entry key="XXXTag4" value-ref="XXXXHandler"></entry> </map> </property> </bean> <!--Listener配置--> <bean id="consumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown"> <property name="properties" > <!--消費者配置信息--> <props> <prop key="ConsumerId">CID_DEMO</prop> <!--請替換XXX--> <prop key="AccessKey">AKDEMO</prop> <prop key="SecretKey">SKDEMO</prop> <!--將消費者線程數固定爲50個 <prop key="ConsumeThreadNums">50</prop> --> </props> </property> <property name="subscriptionTable"> <map> <entry value-ref="msgListener"> <key> <bean class="com.aliyun.openservices.ons.api.bean.Subscription"> <property name="topic" value="TopicTestMQ"/> <property name="expression" value="*"/><!--expression即Tag,能夠設置成具體的Tag,如 taga||tagb||tagc,也可設置成*。 *僅表明訂閱全部Tag,不支持通配--> </bean> </key> </entry> <!--更多的訂閱添加entry節點便可--> </map> </property> </bean> </beans>
咱們把咱們的處理器經過注入的形式直接注入到監聽器中
/** * * @author linweifeng * @since jdk 1.6 * @history 2016-11-07 created */ public class MqListener implements MessageListener { Map<String, IMQHandler> handlerMap ; // 日誌工具類 private final static Logger log = Logger.getLogger(MqListener.class); public Action consume(Message message, ConsumeContext context) { try { IMQHandler handler = handlerMap.get(message.getTag()); if(handler == null){ throw new Exception("沒有找到合適的處理器"); } handler.handler(message.getBody()); return Action.CommitMessage; }catch (Exception e) { log.error("消息消費失敗:"+e.getMessage()); return Action.ReconsumeLater; } } }
這麼作的好處在於:我不再關心監聽器,除非MQ接口改動,或者更換MQ工具…,個人全部業務的處理器,我均可以直接添加在配置文件中,不論是新增,仍是卸載,都很是的方便,由於我只須要動動配置文件,將寫好的類配置在map下,就OK啦;總算寫完了,都深夜兩點了,唉……夜深人靜啊;
路,都是人走出來的……
走多了,總會有翻車的,嘿嘿嘿……
<本人菜鳥,說的很差的,歡迎指正……>