[轉載]關於ActiveMQ集羣

轉載於 http://blog.csdn.net/nimmy/article/details/6247289java

近日因工做關係,在研究JMS,使用ActiveMQ做爲提供者,考慮到消息的重要,擬採用ActiveMQ的集羣,網上查詢,資料不多,且語焉不詳,最後仍是看Apache提供的官方文檔(俺E文很差,楞是拿金山詞霸一個一個單詞的看,累啊),終於作出來了,因而形諸於文,以供有須要的朋友參考mysql

本文闡述了使用 Pure Master Slave 方式 以及 JDBC Master Slave 方式s解決單點故障的問題sql

 關於ActiveMQ集羣數據庫

 

1 前提

  1. 下載jdk6(update24),解壓,安裝,下面用 $java_dir$ 表示JDK主目錄
  2. 下載ActiveMQ5.4.2,解壓,下面用 $activemq_dir$ 表示activeMQ主目錄
  3. 下載AapcehANT1.8,解壓,下面用 $ant_dir$ 表示ANT主目錄
  4. 配置好環境變量

1)         Java_home : 指向 $java_dir$apache

2)         Ant_home :指向 $ant_dir$異步

3)         Path : 應包含 %java_home%/bintcp

  1. 將MySQL驅動包放置到 $activemq_home$/lib 下(本例用到MySQL數據源)
  2. Master:給broker取個名字,修改其持久化KAHADB文件
  3. Slave:給broker取個名字,修改其持久化KAHADB文件,須要配置Master的地址和端口
  4. 一個Master只能帶一個Slave
  5. Master工做期間,會將消息情況自動同步到Slave
  6. Master一旦崩潰,Slave自動接替其工做,已發送並還沒有消費的消息繼續有效
  7. Slave接手後,必須中止Slave才能重啓先前的Master
  8. Master:首先複製 $activemq_dir$/conf/activemq.xml,並更名爲:pure_master.xml,修改文件

2         解決單點故障:Pure Master Slave

2.1     概述

2.2     MQ配置(這裏是一臺機器配置)

1)         <broker brokerName="pure_master" …測試

2)         <kahaDB directory="${activemq.base}/data/kahadb_pure_master "/>url

3)         <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>spa

  1. Slave:首先複製 $activemq_dir$/conf/activemq.xml,並更名爲:pure_slave.xml,修改文件

1)         <broker brokerName="pure_slave" masterConnectorURI="tcp://0.0.0.0:61616"

shutdownOnMasterFailure="false" …

2)         <kahaDB directory="${activemq.base}/data/kahadb_pure_slave "/>

3)         <transportConnector name="openwire" uri="tcp://0.0.0.0:61617"/>

  1. 首先啓動Master,啓動完畢後在另外一個Shell啓動Slave,Slave啓動後,能夠看到Master那個Shell中顯示已經Attach上了Slave

1)         啓動Master:$activemq_dir$/bin>activemq xbean:file:../conf/pure_master.xml

2)         啓動Slave:$activemq_dir$/bin>activemq xbean:file:../conf/pure_slave.xml

2.3     JAVA測試:隊列

  1. 生產者

public static void main(String[] args) throws Exception {

         ConnectionFactory cf = new ActiveMQConnectionFactory("failover:(tcp://0.0.0.0:61616,tcp://0.0.0.0:61617)");

         Connection conn = cf.createConnection();

         conn.start();

         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

         Queue qq = new ActiveMQQueue("qq1");

         MessageProducer prod = sess.createProducer(qq);

         Message msg = null;

        

         Scanner scan = new Scanner(System.in);

         String str = scan.next();

         while(true) {

                   msg = sess.createTextMessage(str);

                   prod.send(msg);

                   if(str.equals("exit")) {

                            break;

                   }

                   str = scan.next();

         }

         conn.close();

}

  1. 消費者

public static void main(String[] args) throws Exception {

         ConnectionFactory cf = new ActiveMQConnectionFactory("failover:( tcp://0.0.0.0:61616,tcp://0.0.0.0:61617)");

         Connection conn = cf.createConnection();     

         conn.start();

         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

         Queue qq = new ActiveMQQueue("qq1");

         MessageConsumer cs = sess.createConsumer(qq);

 

         TextMessage msg = (TextMessage)cs.receive();

         String str = msg.getText();

         while(true) {

                   System.out.println("receive msg:/t"+msg.getText());

                   if(str.equals("exit")) {

                            break;

                   }

                   msg = (TextMessage)cs.receive();

                   str = msg.getText();

         }

         conn.close();

}

2.4     測試步驟

  1. 啓動生產者,發送幾條消息
  2. 啓動消費者,可看到接收到的消息
  3. 關閉消費者
  4. 生產者繼續發送幾條消息—消息A
  5. 中止Master(可看到生產者端顯示鏈接到Slave(tcp://0.0.0.0:61617)了)
  6. 生產者繼續發送幾條消息—消息B
  7. 啓動消費者
  8. 消費者接收了消息A和消息B,可見Slave接替了Master的工做,並且儲存了以前生產者通過Master發送的消息

2.5     結論

Pure Master Slave模式實現方式簡單,能夠實現消息的雙機熱備功能;隊列能夠實現消息的異步和點對點發送

3         解決單點故障:JDBC Master Slave

3.1     概述

  1. 配置上,不存在Master和Slave,全部Broder的配置基本是同樣的
  2. 多個共享數據源的Broker構成JDBC Master Slave
  3. 給每一個Broker取個名字
  4. 首先搶到資源(數據庫鎖)的Broker成爲Masetr
  5. 其餘Broker保持預備狀態,按期嘗試搶佔資源,運行其的Shell中清楚的顯示了這一點
  6. 一旦Master崩潰,其餘Broker嘗試搶佔資源,最終只有一臺搶到,它馬上成爲Master
  7. 以前的Master即便重啓成功,也只能做爲Slave等待
  8. 將 $activemq_dir$/conf/activemq.xml 複製3份,分別更名爲:jdbc_broker01.xml、jdbc_broker02.xml、jdbc_broker03.xml
  9. 首先修改jdbc_broker01.xml

3.2     MQ配置(這裏是一臺機器配置)

1)         <broker brokerName=" jdbc_broker01" …

2)         <!--配置持久適配器-->

<persistenceAdapter>

  <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data"   dataSource="#mysql-ds"/>

</persistenceAdapter>

3)         <transportConnector name="openwire"   uri="tcp://0.0.0.0:61616"/>

4)         <!--配置數據源:注意是在broker標記以外-->

</broker>

<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource"  destroy-method="close">

  <property name="driverClassName" value="com.mysql.jdbc.Driver"/>

  <property name="url" value="jdbc:mysql://localhost/test?relaxAutoCommit=true"/>

  <property name="username" value="root"/>

  <property name="password" value="root"/>

  <property name="maxActive" value="200"/>

  <property name="poolPreparedStatements" value="true"/>

</bean>

  1. 一樣修改 jdbc_broker02.xml ,與 jdbc_broker01.xml 不一樣之處

1)         <broker brokerName=" jdbc_broker02" …

2)         <transportConnector name="openwire" uri="tcp://0.0.0.0:61617"/>

  1. 一樣修改 jdbc_broker03.xml ,與 jdbc_broker01.xml 不一樣之處

1)         <broker brokerName=" jdbc_broker03" …

2)         <transportConnector name="openwire" uri="tcp://0.0.0.0:61618"/>

3.3     JAVA測試:隊列

  1. 代碼基本和前面一致,只是由於這裏有3個broker,因此建立鏈接工廠的代碼稍有差異:

ConnectionFactory cf = new ActiveMQConnectionFactory("failover:(tcp://0.0.0.0:61616,tcp://0.0.0.0:61617,tcp://0.0.0.0:61618)");

3.4     測試步驟

  1. 先啓動生產者,發送幾條消息
  2. 啓動消費者,可看到接收到的消息
  3. 關閉消費者
  4. 生產者繼續發送幾條消息-消息A
  5. 中止broker01(可看到生產者端顯示鏈接到broker02(tcp://0.0.0.0:61617)了,同時運行broker02的Shell也顯示其成爲了Master)
  6. 生產者繼續發送幾條消息-消息B
  7. 啓動消費者
  8. 消費者接收了消息A和消息B,可見broker02接替了broker01的工做,並且儲存了以前生產者通過broker01發送的消息
  9. 關閉消費者
  10. 生產者繼續發送幾條消息-消息C
  11. 中止broker02(可看到生產者端顯示鏈接到broker03(tcp://0.0.0.0:61618)了,同時運行broker03的Shell也顯示其成爲了Master)
  12. 生產者繼續發送幾條消息-消息D
  13. 啓動消費者
  14. 消費者接收了消息C和消息D,可見broker03接替了broker02的工做,並且儲存了以前生產者通過broker02發送的消息
  15. 再次啓動broker01,生產者或消費者均未顯示鏈接到broker01(tcp://0.0.0.0:61616),代表broker01此時只是個Slave

3.5     結論

JDBC Master Slave模式實現方式稍微複雜一點,能夠實現消息的多點熱備功能,Master、Slave的交替徹底是即時的,自動的,無需重啓任何broker;隊列能夠實現消息的異步和點對點發送

相關文章
相關標籤/搜索