菜鳥日誌--在MQ的消費端上如何去實現本身的業務

最近開始接觸一些關於MQ的開發,使用的是阿里雲提供的RocketMQ工具;RocketMQ採用二級分類梯度來過濾消息;java

一級是在topic,二級是tag標籤,咱們的代碼目的:主要目的消費從阿里雲隊列上拉去下來的消息;spring

/**
 * 
 * @author linweifeng
 * @since jdk 1.6
 * @history 2016-11-07 created
 */
public class MqListener implements MessageListener {
	
	// 日誌工具類
	private final static Logger log = Logger.getLogger(MqListener.class);
		
	public Action consume(Message message, ConsumeContext context) {
        try {
        	if(MQConstants.XXXTag1.equals(message.getTag())){
				// TODO 
				// do something
			}else if(MQConstants.XXXTag2.equals(message.getTag())){
				// TODO 
				// do something
			}else if(MQConstants.XXXTag3.equals(message.getTag())){
				// TODO 
				// do something
			}....
			else {
				// TODO 
				// do something
			}
            return Action.CommitMessage;
        }catch (Exception e) {
        	log.error("消息消費失敗:"+e.getMessage());
        	return Action.ReconsumeLater;
        }
	}
}

看到這樣的代碼,你們都會考慮到說:if -else 這麼多,代碼越寫會越臃腫,並且閱讀性不好。if-else if是會從頭至尾都進行對比,直到找到結果,那麼頗有可能有時候會一直刷到最後一個,性能也會不好。並且每次我須要新增長一個tag對應的handler的時候,我都須要修改這個類,很明顯就跟開閉原則衝突;可是,它也有它的優勢,好比代碼複雜度很低,基本上一目瞭然;express

做爲初出菜鳥,看到這麼多的if else ,首先我會考慮將至轉爲switch的方式,爲此我須要把MqConstants這個靜態常量類改爲enum,由於只有jdk 1.7以上的版本switch是能夠支持字符串類型;代碼以下:編程

/**
 * 
 * @author linweifeng
 * @since jdk 1.6
 * @history 2016-11-07 created
 */
public class MqListener implements MessageListener {
	
	// 日誌工具類
	private final static Logger log = Logger.getLogger(MqListener.class);
		
	public Action consume(Message message, ConsumeContext context) {
        try {
        	switch(meesage.geTag()){
				
				case MqEnum.XXXTag1:
					// TODO
					//do something
					break;
				case MqEnum.XXXTag2:
					// TODO
					//do something
					break;
				case MqEnum.XXXTag3:
					// TODO
					//do something
					break;
				case MqEnum.XXXTag4:
					// TODO
					//do something
					break;
				defalut:
					// TODO
					//exception 
			}
            return Action.CommitMessage;
        }catch (Exception e) {
        	log.error("消息消費失敗:"+e.getMessage());
        	return Action.ReconsumeLater;
        }
	}
}

這樣子作比以前是好了一些,可是仍是有不少的不足,好比這些處理類,我是否 能夠將至抽取了,java編程,更推薦使用接口編程,因此我是否能夠抽取出這些類的共有部分去作一些事呢?api

/**
 * 
 * @author linweifeng
 * @since jdk 1.6
 * @history 2016-11-07 created
 */
public interface IMQHandler {
	
	/**
	 * 做爲接口,我不關注實現,具體業務的實現邏輯,我交給實現接口的類去完成
	 * @param messageBody
	 */
	public void handler(String messageBody);
}

抽取出這個接口以後,我之後全部須要處理的類,都繼承這個接口,好比我有一個登陸的處理類,它要作的可能 就是說,我登陸以後,我須要記錄登陸日誌,或者其餘一些我並不須要便可執行的業務;框架

/**
 * 
 * @author linweifeng
 * @since jdk 1.6
 * @history 2016-11-07 created
 */
public class LoginHandler  implements IMQHandler{

	@Override
	public void handler(String messageBody) {
		// TODO Auto-generated method stub
		
		//do something
        log.insert();

	}
}

這麼作有什麼好處?彷佛有些累贅,存粹是爲樂方便作擴展,只不過這個擴展是將來必須的,可能下一秒 你就須要作的擴展;千萬不要堅信去作不少將來你不必定會擴展,或者維護的東西,而後去過多的抽取接口;那樣存粹給本身找麻煩;ide

假如我有一個建立者工具類:負責建立全部的這些實現的子類對象,以下:函數

import java.util.HashMap;
import java.util.Map;



/**
 * 
 * @author linweifeng
 * @since jdk 1.6
 * @history 2016-11-07 created
 */
public class MQHandlerBuilderUtil {
	
	/**
	 * 定義map,去作一個映射,由於我得最終目的是:我消費者的監聽器根據不一樣的標籤是調用不一樣的
	 * 實現
	 */
	Map<String ,IMQHandler> map ;
	
	private static MQHandlerBuilderUtil mqHandlerBuilder = null;
	
	private MQHandlerBuilderUtil(){
		map = new HashMap<String, IMQHandler>();
		//初始化後,我將 全部的實現丟在這裏
		map.put(MQConstants.xxxTag1, new LoginHandler());
		//map.put(MQConstants.xxxTag1, new xxxxxHandler());
		// ....
	}
	
	/**
	 * 獲取map對象
	 * @return
	 */
	public Map<String, IMQHandler> getMap() {
		return map;
	}


	/**
	 * 搞個單例模式,獲取到個人建立者的對象,這裏最好是搞個鎖,鎖住
	 * @return
	 */
	public static MQHandlerBuilderUtil getInstance(){
		if(mqHandlerBuilder == null){
			mqHandlerBuilder = new MQHandlerBuilderUtil();
		}
		return mqHandlerBuilder;
	}
	
	/**
	 * 獲取handler的實現
	 * @param tag
	 * @return
	 */
	public static IMQHandler getHandler(String tag){
		return MQHandlerBuilderUtil.getInstance().getMap().get(tag);
	}
}

在這個建立內,我把全部的子類實現都丟在初始化構造函數中,你也能夠根據具體的業務需求去作一些改變,好比我喜歡動態建立,或者其餘的一些方式,隨你喜歡;工具

有了這個類,我有什麼好處呢?性能

/**
 * 
 * @author linweifeng
 * @since jdk 1.6
 * @history 2016-11-07 created
 */
public class MqListener implements MessageListener {
	
	// 日誌工具類
	private final static Logger log = Logger.getLogger(MqListener.class);
		
	public Action consume(Message message, ConsumeContext context) {
        try {
        	IMQHandler handler = MQHandlerBuilderUtil.getHandler(message.getTag());
        	if(handler == null){
        		throw new Exception("沒有找到合適的處理器");
        	}
        	handler.handler(message.getBody());
            return Action.CommitMessage;
        }catch (Exception e) {
        	log.error("消息消費失敗:"+e.getMessage());
        	return Action.ReconsumeLater;
        }
	}
}

之後我不在關注監聽器的問題了,若是我有新的業務需求,我只須要修改,而後將類添加到builder中,或者你能夠本身實現一些辦法,自動去添加到map中,或者動態的去建立那些處理器。這樣作能夠將你的業務與監聽器分離開來,讓你更加關注業務的需求,並且沒有那些可怕的if了,並且寫完以後,你是否會以爲本身棒棒噠?壞處呢?全部的bean都被建立,內存一會兒就佔用了很多呢;

好了,這個辦法彷佛看起來已經不錯了,可是咱們是javaer啊,能夠絕不誇張地說,java的開發是一個又一個工具的組合去最終實現你的業務需求,這也是java的魅力所在;因此,這個時候,在對象的建立於銷燬,到注入的過程,咱們最早想到的是spring這個框架;

美的不可方物的ioc,aop;

咱們先到阿里雲MQ官網找一下監聽器配置,以下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
	   <!-- 生產者配置-->
    <bean id="producer" class="com.aliyun.openservices.ons.api.bean.ProducerBean" init-method="start" destroy-method="shutdown">
        <property name="properties" > <!--生產者配置信息-->
            <props>
                <prop key="ProducerId">PID_DEMO</prop> <!--請替換XXX-->
                <prop key="AccessKey">XXX</prop>
                <prop key="SecretKey">XXX</prop>
            </props>
        </property>
		
		<!-- 消費者配置 -->
	<bean id="loginHandler" class="…….LoginHandler"></bean>
	<bean id="msgListener" class="demo.DemoMessageListener">
		<property name="handlerMap">
			<map>
				<entry key="XXXTag1" value-ref="loginHandler"></entry>
				<entry key="XXXTag2" value-ref="XXXXHandler"></entry>
				<entry key="XXXTag3" value-ref="XXXXHandler"></entry>
				<entry key="XXXTag4" value-ref="XXXXHandler"></entry>
			</map>
		</property>
	</bean> <!--Listener配置-->
    <bean id="consumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown">
        <property name="properties" > <!--消費者配置信息-->
            <props>
                <prop key="ConsumerId">CID_DEMO</prop> <!--請替換XXX-->
                <prop key="AccessKey">AKDEMO</prop>
                <prop key="SecretKey">SKDEMO</prop>
                <!--將消費者線程數固定爲50個
                <prop key="ConsumeThreadNums">50</prop>
                -->
            </props>
        </property>
        <property name="subscriptionTable">
            <map>
                <entry value-ref="msgListener">
                    <key>
                        <bean class="com.aliyun.openservices.ons.api.bean.Subscription">
                            <property name="topic" value="TopicTestMQ"/>
                            <property name="expression" value="*"/><!--expression即Tag,能夠設置成具體的Tag,如 taga||tagb||tagc,也可設置成*。 *僅表明訂閱全部Tag,不支持通配-->
                        </bean>
                    </key>
                </entry>
                <!--更多的訂閱添加entry節點便可-->
            </map>
        </property>
    </bean>

</beans>

咱們把咱們的處理器經過注入的形式直接注入到監聽器中

/**
 * 
 * @author linweifeng
 * @since jdk 1.6
 * @history 2016-11-07 created
 */
public class MqListener implements MessageListener {
	
	Map<String, IMQHandler> handlerMap ;
	
	// 日誌工具類
	private final static Logger log = Logger.getLogger(MqListener.class);
		
	public Action consume(Message message, ConsumeContext context) {
        try {
        	IMQHandler handler = handlerMap.get(message.getTag());
        	if(handler == null){
        		throw new Exception("沒有找到合適的處理器");
        	}
        	handler.handler(message.getBody());
            return Action.CommitMessage;
        }catch (Exception e) {
        	log.error("消息消費失敗:"+e.getMessage());
        	return Action.ReconsumeLater;
        }
	}
}

這麼作的好處在於:我不再關心監聽器,除非MQ接口改動,或者更換MQ工具…,個人全部業務的處理器,我均可以直接添加在配置文件中,不論是新增,仍是卸載,都很是的方便,由於我只須要動動配置文件,將寫好的類配置在map下,就OK啦;總算寫完了,都深夜兩點了,唉……夜深人靜啊;

 

路,都是人走出來的……

 

走多了,總會有翻車的,嘿嘿嘿……

 

<本人菜鳥,說的很差的,歡迎指正……>

相關文章
相關標籤/搜索