轉載於 http://blog.csdn.net/nimmy/article/details/6247289java
近日因工做關係,在研究JMS,使用ActiveMQ做爲提供者,考慮到消息的重要,擬採用ActiveMQ的集羣,網上查詢,資料不多,且語焉不詳,最後仍是看Apache提供的官方文檔(俺E文很差,楞是拿金山詞霸一個一個單詞的看,累啊),終於作出來了,因而形諸於文,以供有須要的朋友參考mysql
本文闡述了使用 Pure Master Slave 方式 以及 JDBC Master Slave 方式s解決單點故障的問題sql
關於ActiveMQ集羣數據庫
1) Java_home : 指向 $java_dir$apache
2) Ant_home :指向 $ant_dir$異步
3) Path : 應包含 %java_home%/bintcp
1) <broker brokerName="pure_master" …測試
2) <kahaDB directory="${activemq.base}/data/kahadb_pure_master "/>url
3) <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>spa
1) <broker brokerName="pure_slave" masterConnectorURI="tcp://0.0.0.0:61616"
shutdownOnMasterFailure="false" …
2) <kahaDB directory="${activemq.base}/data/kahadb_pure_slave "/>
3) <transportConnector name="openwire" uri="tcp://0.0.0.0:61617"/>
1) 啓動Master:$activemq_dir$/bin>activemq xbean:file:../conf/pure_master.xml
2) 啓動Slave:$activemq_dir$/bin>activemq xbean:file:../conf/pure_slave.xml
public static void main(String[] args) throws Exception {
ConnectionFactory cf = new ActiveMQConnectionFactory("failover:(tcp://0.0.0.0:61616,tcp://0.0.0.0:61617)");
Connection conn = cf.createConnection();
conn.start();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue qq = new ActiveMQQueue("qq1");
MessageProducer prod = sess.createProducer(qq);
Message msg = null;
Scanner scan = new Scanner(System.in);
String str = scan.next();
while(true) {
msg = sess.createTextMessage(str);
prod.send(msg);
if(str.equals("exit")) {
break;
}
str = scan.next();
}
conn.close();
}
public static void main(String[] args) throws Exception {
ConnectionFactory cf = new ActiveMQConnectionFactory("failover:( tcp://0.0.0.0:61616,tcp://0.0.0.0:61617)");
Connection conn = cf.createConnection();
conn.start();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue qq = new ActiveMQQueue("qq1");
MessageConsumer cs = sess.createConsumer(qq);
TextMessage msg = (TextMessage)cs.receive();
String str = msg.getText();
while(true) {
System.out.println("receive msg:/t"+msg.getText());
if(str.equals("exit")) {
break;
}
msg = (TextMessage)cs.receive();
str = msg.getText();
}
conn.close();
}
Pure Master Slave模式實現方式簡單,能夠實現消息的雙機熱備功能;隊列能夠實現消息的異步和點對點發送
1) <broker brokerName=" jdbc_broker01" …
2) <!--配置持久適配器-->
<persistenceAdapter>
<jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/>
</persistenceAdapter>
3) <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
4) <!--配置數據源:注意是在broker標記以外-->
…
</broker>
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/test?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="root"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
1) <broker brokerName=" jdbc_broker02" …
2) <transportConnector name="openwire" uri="tcp://0.0.0.0:61617"/>
1) <broker brokerName=" jdbc_broker03" …
2) <transportConnector name="openwire" uri="tcp://0.0.0.0:61618"/>
ConnectionFactory cf = new ActiveMQConnectionFactory("failover:(tcp://0.0.0.0:61616,tcp://0.0.0.0:61617,tcp://0.0.0.0:61618)");
JDBC Master Slave模式實現方式稍微複雜一點,能夠實現消息的多點熱備功能,Master、Slave的交替徹底是即時的,自動的,無需重啓任何broker;隊列能夠實現消息的異步和點對點發送