Spring下ActiveMQ實戰

    MessageQueue是分佈式的系統裏常常要用到的組件,通常來講,當須要把消息跨網段、跨集羣的分發出去,就能夠用這個。一些典型的示例就是:html

    一、集羣A中的消息須要發送給多個機器共享;java

    二、集羣A中消息須要主動推送,但彼此的網絡不是互通的(如集羣A只有過HA才能被外界訪問);spring

    

    固然上面的幾個點,除了用MQ還有其它實現方式,可是MQ無疑是很是適合用來作這些事的。衆多MQ中,ActiveMQ是比較有名氣也很穩定的,它發送消息的成本很是廉價,支持Queue與Topic兩種消息機制。本文主要就是講如何在Spring環境下配置此MQ:apache

 

一、場景假設服務器

    現有機器兩臺Server、Worker須要進行異步通訊,另有一臺ActiveMQ機器,關於MQ的配置信息存放在Zookeeper中,Zookeeper的節點有:網絡

      - /mq/activemq/ip:mq的機器ip併發

      -/mq/activemq/port:這是mq的機器端口app

 

二、Server的Spring XML配置異步

    Server主要的工做就是接受Worker消息,併發送消息給Worker。主要是定義了鏈接MQ的鏈接池接受Worker消息的隊列worker,發送消息給Worker的隊列server:tcp

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="
 3         http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
 4         http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.1.xsd">
 5 
 6     <!-- ActiveMQ鏈接池 -->
 7     <bean id="conFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
 8         <property name="connectionFactory">
 9             <bean class="org.apache.activemq.ActiveMQConnectionFactory">
10                 <property name="brokerURL">
11                     <bean class="lekko.mq.util.MQPropertiesFactory" factory-method="getUrl" />
12                 </property>
13                 <property name="closeTimeout" value="60000" />
14                 <!-- <property name="userName" value="admin" /> -->
15                 <!-- <property name="password" value="admin" /> -->
16                 <!-- <property name="optimizeAcknowledge" value="true" /> -->
17                 <property name="optimizedAckScheduledAckInterval" value="10000" />
18             </bean>
19         </property>
20     </bean>
21 
22 
23     <!-- Worker任務消息 -->
24     <bean id="taskWorkerTopic" class="org.apache.activemq.command.ActiveMQTopic">
25         <constructor-arg value="worker_topic" />
26     </bean>
27     <!-- 任務監聽容器 -->
28     <bean id="taskWorkerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
29         <property name="connectionFactory" ref="conFactory" />
30         <property name="destination" ref="taskWorkerTopic" />
31         <property name="messageListener">
32             <bean class="lekko.mq.task.TaskWorkerListener" />
33         </property>
34         <property name="pubSubDomain" value="true" />
35     </bean>
36 
37 
38     <!-- Server任務消息 -->
39     <bean id="taskServerTopic" class="org.apache.activemq.command.ActiveMQTopic">
40         <constructor-arg value="server_topic" />
41     </bean>    
42     <!-- 任務消息發送模板 -->
43     <bean id="taskServerTemplate" class="org.springframework.jms.core.JmsTemplate" p:connectionFactory-ref="conFactory" p:defaultDestination-ref="taskServerTopic" />
44 
45 </beans>

    一段一段地分析,ActiveMQ鏈接池這裏,定義了鏈接的bean爲「conFactory」,其中broberURL屬性是經過後臺Java代碼的靜態方法來設置的,方便線上環境經過Java代碼動態地切換,稍後會介紹這塊代碼,你如今須要知道的是,它實際上返回的就是一個字符串,格式像:tcp://xxx.xxx.xxx.xxx:port,若是不要用後臺來管理鏈接信息,直接改爲「<property name="brokerURL" value="tcp://xxx.xxx.xxx.xxx:port">」也是OK的。

    接下來,即是Worker消息隊列的定義,這裏定義爲「taskWorkerTopic」,類型是org.apache.activemq.command.ActiveMQTopic,(訂閱模式)它表示一個消息能夠被多個機器收到並處理,其它的還有org.apache.activemq.command.ActiveMQQueue,(點對點模式)表示一個消息只能被一臺機器收到,當收到後消息就出隊列了,其它機器沒法處理。它們都有一個構造參數constructor-arg,指定了消息隊列的名稱,一個MQ中一個消息隊列的名字是惟一的。

    Worker的消息隊列定義好了以後,就是接受Worker的裏消息了,這裏定義了「taskWorkerContainer」,其屬性分別定義了鏈接池、目標隊列、消息處理器(咱們本身的Java類,後面再講),參數pubSubDomain用於指定是使用訂閱模式仍是使用點對點模式,若是是ActiveMQTopic則要設置爲true,默認是false。

    好了,Server如今已經能夠經過本身定義的「lekko.mq.task.TaskWorkerListener」類接受並處理taskWorkerTopic的消息了。

    如法炮製,定義一個專門用於往Worker裏發消息的隊列「taskServerTopic」,並定義發送消息的模板「taskServerTemplate」備用。

 

三、Server端的接收類與發送類

    lekko.mq.task.TaskWorkerListener即是一個接收類示例:

 1 package lekko.mq.task;
 2 
 3 import javax.jms.Message;
 4 import javax.jms.MessageListener;
 5 
 6 import org.apache.activemq.command.ActiveMQObjectMessage;
 7 import org.apache.log4j.Logger;
 8 import org.springframework.stereotype.Service;
 9 import lekko.mq.model.MessageModel;
10 
11 
12 /**
13  * Task消息監聽類
14  * @author lekko
15  */
16 @Service
17 public class TaskWorkerListener implements MessageListener {
18 
19     private Logger _logger = Logger.getLogger(TaskWorkerListener.class);
20 
21     @Override
22     public void onMessage(Message message) {
23         if (message instanceof ActiveMQObjectMessage) {
24             ActiveMQObjectMessage aMsg = (ActiveMQObjectMessage) message;
25             try {
26                 onMessage((MessageModel) aMsg.getObject());
27             } catch (Exception e) {
28                 _logger.warn("Message:${} is not a instance of MessageModel.", e);
29             }
30         } else {
31             _logger.warn("Message:${} is not a instance of ActiveMQObjectMessage.");
32         }
33     }
34 
35     /**
36      * 處理消息
37      * @param message 自定義消息實體
38      */
39     public void onMessage(MessageModel message) { ... }
40 
41 }

    這裏給你們演示的並非最基礎的知識,處理的消息是一個自定義的類「lekko.mq.model.MessageModel」,這個類怎麼寫能夠隨便整,反正就是一些你要傳遞的數據字段,可是記得要實現Serializable接口。若是你須要傳遞的僅僅是純字符串,那麼直接在代碼的23行片,把message.toString()便可。這個類經過前面XML配置會處理來自「worker_topic」隊列中的消息。

    

    再就是發送類,實際上就是把前面的taskServiceTemplate拿來用就好了:

 1 package lekko.mq.task;
 2 
 3 import org.springframework.beans.factory.annotation.Autowired;
 4 import org.springframework.beans.factory.annotation.Qualifier;
 5 import org.springframework.jms.core.JmsTemplate;
 6 import org.springframework.stereotype.Service;
 7 import lekko.mq.model.MessageModel;
 8 
 9 
10 /**
11  * 服務器任務消息分發
12  * @author lekko
13  */
14 @Service
15 public class TaskServerSender {
16 
17     @Autowired
18     @Qualifier("taskServerTemplate")
19     private JmsTemplate jmsTemplate;
20 
21     /**
22      * 發送消息
23      */
24     public void sendMessage(MessageModel msg) {
25         jmsTemplate.convertAndSend(msg);
26     }
27 
28 }

    把這個類TaskServerSender注入到任意須要用到的地方,調用sendMessage方法便可。它會往前面定義的「server_topic」中塞消息,等Worker來取。

 

四、關於Zookeeper配置MQ鏈接信息

    Worker端的配置我這裏再也不闡述,由於它跟在Server端的配置太相像,區別就在於Server端是從worker_topic中取消息,往server_topic中寫消息;而Worker端的代碼則是反過來,往worker_topic中寫消息,從server_topic中取消息。

    那麼如何使用Java代碼來控制ActiveMQ的配置消息呢:

 1 package lekko.mq.util;
 2 
 3 import org.apache.zookeeper.ZooKeeper;
 4 import org.apache.zookeeper.data.Stat;
 5 
 6 /**
 7  * 獲取MQ配置
 8  * @author lekkoli
 9  */
10 public class MQPropertiesFactory {
11     
12     private static boolean isLoaded = false;
13     private static String ZOOKEEPER_CLUST = "xxx.xxx.xxx.xxx:2181";
14     private static ZooKeeper _zk;
15     private static String _ip;
16     private static String _port;
17 
18     private static String getProperty(String path) throws Exception {
19         if (_zk == null) {
20             if (ZOOKEEPER_CLUST == null) {
21                 throw new Exception("Zookeeper, Host \"" + ZOOKEEPER_CLUST + "\" is null!");
22             }
23             _zk = new ZooKeeper(ZOOKEEPER_CLUST, 90000, null);
24         }
25         Stat s = _zk.exists(path, false);
26         if (s != null)
27             return new String(_zk.getData(path, false, s));
28         throw new Exception("Zookeeper, Path \"" + path + "\" is not exist!");
29     }
30 
31     private static void load() throws Exception {
32         if (!isLoaded) {
33             _ip = getProperty("/mq/activemq/ip");
34             _port = getProperty("/mq/activemq/port");
35             isLoaded = true;
36         }
37     }
38 
39     public static String getUrl() throws Exception {
40         load();
41         StringBuilder failover = new StringBuilder();
42         String[] ips = _ip.split(";"), ports = _port.split(";");
43         for (int i = 0; i < ips.length; ++i) {
44             failover.append("tcp://").append(ips[i]).append(":").append(ports[i]).append(",");
45         }
46         failover.setLength(failover.length() - 1);
47         String failovers = failover.toString();
48         if (ips.length > 1) {
49             failovers = "failover:(" + failovers + ")";
50         }
51         return failovers;
52     }
53 }

    上面的代碼須要解釋的地方跟MQ相關的很少,主要就是若是是mq集羣,則格式是:failover:(tcp://192.168.1.117:1001,tcp://192.168.1.118:1001,tcp://xxx.xxx.xxx.xxx:port)。其它上面代碼沒有對Zookeeper集羣都掛了的狀況,作應急鏈接方案。固然,不管如何本節都不是全文的重點,可是多學一技未嘗不可?

    最近工做愈來愈忙,更新博客也是時有時無,可是我會堅持下去,還有許多工做中的點滴,在這裏沉澱一下,也但願更進一步吧。

 

    轉載請註明原址:http://www.cnblogs.com/lekko/p/4940976.html 

相關文章
相關標籤/搜索