ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現html
1. 多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQPjava
2. 徹底支持JMS1.1和J2EE 1.4規範 (持久化,XA消息,事務)node
3. 對Spring的支持,ActiveMQ能夠很容易內嵌到使用Spring的系統裏面去,並且也支持Spring2.0的特性linux
4. 經過了常見J2EE服務器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中經過JCA 1.5 resourceadaptors的配置,可讓ActiveMQ能夠自動的部署到任何兼容J2EE1.4商業服務器上web
5. 支持多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA數據庫
6. 支持經過JDBC和journal提供高速的消息持久化apache
7. 從設計上保證了高性能的集羣,客戶端-服務器,點對點windows
8. 支持Ajax服務器
9. 支持與Axis的整合網絡
10. 能夠很容易得調用內嵌JMS provider,進行測試
一、下載
ActiveMQ的最新版本是5.10.0,但因爲咱們內網下載存在問題,因此目前經過內網只能下載到5.9.0,下載地址:http://activemq.apache.org/activemq-590-release.html。
二、安裝
若是是在windows系統中運行,能夠直接解壓apache-activemq-5.9.0-bin.zip,並運行bin目錄下的activemq.bat文件,此時使用的是默認的服務端口:61616和默認的console端口:8161。
若是是在linux或unix下運行,在bin目錄下執行命令:./activemq setup
三、修改ActiveMQ的服務端口和console端口
A、修改服務端口:打開conf/activemq.xml文件,修改如下紅色字體部分
<transportConnectors>
<transportConnector name="openwire" uri="tcp://10.42.220.72:61618"discoveryUri="multicast://default"/>
</transportConnectors>
B、修改console的地址和端口:打開conf/jetty.xml文件,修改如下紅色字體部分
<bean id="jettyPort"class="org.apache.activemq.web.WebConsolePort"init-method="start">
<property name="port" value="8162"/>
</bean>
須要提早將activemq解壓包中的lib目錄下的相關包引入到工程中,再進行以下編碼:
一、發送端的代碼:
importjavax.jms.Connection;
importjavax.jms.ConnectionFactory;
importjavax.jms.DeliveryMode;
importjavax.jms.Destination;
importjavax.jms.MessageProducer;
importjavax.jms.Session;
importjavax.jms.TextMessage;
importorg.apache.activemq.ActiveMQConnection;
importorg.apache.activemq.ActiveMQConnectionFactory;
publicclass Sender {
privatestaticfinalintSEND_NUMBER = 5;
publicstaticvoid main(String[] args) {
// ConnectionFactory:鏈接工廠,JMS用它建立鏈接
ConnectionFactory connectionFactory;
// Connection:JMS客戶端到JMS Provider的鏈接
Connection connection = null;
// Session:一個發送或接收消息的線程
Session session;
// Destination:消息的目的地;消息發送給誰.
Destination destination;
// MessageProducer:消息發送者
MessageProducer producer;
// TextMessage message;
//構造ConnectionFactory實例對象,此處採用ActiveMq的實現jar
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"failover:(tcp://10.42.220.72:61617,tcp://10.42.220.72:61618)");
try {
//構造從工廠獲得鏈接對象
connection =connectionFactory.createConnection();
//啓動
connection.start();
//獲取操做鏈接
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//獲取session
destination = session.createQueue("FirstQueue");
//獲得消息生成者【發送者】
producer =session.createProducer(destination);
//設置不持久化,此處學習,實際根據項目決定
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//構造消息,此處寫死,項目就是參數,或者方法獲取
sendMessage(session, producer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
publicstaticvoid sendMessage(Session session,MessageProducer producer)
throws Exception {
for (int i = 1; i <=SEND_NUMBER; i++) {
TextMessage message = session
.createTextMessage("ActiveMq發送的消息" + i);
//發送消息到目的地方
System.out.println("發送消息:" + "ActiveMq 發送的消息" + i);
producer.send(message);
}
}
}
2、接收端代碼:
importjavax.jms.Connection;
importjavax.jms.ConnectionFactory;
importjavax.jms.Destination;
importjavax.jms.MessageConsumer;
importjavax.jms.Session;
importjavax.jms.TextMessage;
importorg.apache.activemq.ActiveMQConnection;
importorg.apache.activemq.ActiveMQConnectionFactory;
publicclass Receive {
publicstaticvoid main(String[] args) {
// ConnectionFactory:鏈接工廠,JMS用它建立鏈接
ConnectionFactory connectionFactory;
// Connection:JMS客戶端到JMS Provider的鏈接
Connection connection = null;
// Session:一個發送或接收消息的線程
Session session;
// Destination:消息的目的地;消息發送給誰.
Destination destination;
//消費者,消息接收者
MessageConsumer consumer;
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"failover:(tcp://10.42.220.72:61617,tcp://10.42.220.72:61618)");
try {
//構造從工廠獲得鏈接對象
connection =connectionFactory.createConnection();
//啓動
connection.start();
//獲取操做鏈接
session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
//獲取session
destination = session.createQueue("FirstQueue");
consumer =session.createConsumer(destination);
while (true) {
//設置接收者接收消息的時間,爲了便於測試,這裏誰定爲100s
TextMessage message =(TextMessage) consumer.receive(100000);
if (null != message) {
System.out.println("收到消息" + message.getText());
} else {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
}
}
}
}
3、經過監控查看消息堆棧的記錄:
登錄http://localhost:8162/admin/queues.jsp,默認的用戶名和密碼:admin/admin
單點的ActiveMQ做爲企業應用沒法知足高可用和集羣的需求,因此ActiveMQ提供了master-slave、broker cluster等多種部署方式,但經過分析多種部署方式以後我認爲須要將兩種部署方式相結合才能知足咱們公司分佈式和高可用的需求,因此後面就重點將解如何將兩種部署方式相結合。
主要是經過共享存儲目錄來實現master和slave的熱備,全部的ActiveMQ應用都在不斷地獲取共享目錄的控制權,哪一個應用搶到了控制權,它就成爲master。
多個共享存儲目錄的應用,誰先啓動,誰就能夠最先取得共享目錄的控制權成爲master,其餘的應用就只能做爲slave。
與shared filesystem方式相似,只是共享的存儲介質由文件系統改爲了數據庫而已。
這種主備方式是ActiveMQ5.9之後才新增的特性,使用ZooKeeper協調選擇一個node做爲master。被選擇的master broker node開啓並接受客戶端鏈接。
其餘node轉入slave模式,鏈接master並同步他們的存儲狀態。slave不接受客戶端鏈接。全部的存儲操做都將被複制到鏈接至Master的slaves。
若是master死了,獲得了最新更新的slave被容許成爲master。failed node可以從新加入到網絡中並鏈接master進入slave mode。全部須要同步的disk的消息操做都將等待存儲狀態被複制到其餘法定節點的操做完成才能完成。因此,若是你配置了replicas=3,那麼法定大小是(3/2)+1=2. Master將會存儲並更新而後等待 (2-1)=1個slave存儲和更新完成,才彙報success。至於爲何是2-1,熟悉Zookeeper的應該知道,有一個node要做爲觀察者存在。
單一個新的master被選中,你須要至少保障一個法定node在線以可以找到擁有最新狀態的node。這個node將會成爲新的master。所以,推薦運行至少3個replica nodes,以防止一個node失敗了,服務中斷。
前面的Master-Slave的方式雖然能解決多服務熱備的高可用問題,但沒法解決負載均衡和分佈式的問題。Broker-Cluster的部署方式就能夠解決負載均衡的問題。
Broker-Cluster部署方式中,各個broker經過網絡互相鏈接,並共享queue。當broker-A上面指定的queue-A中接收到一個message處於pending狀態,而此時沒有consumer鏈接broker-A時。若是cluster中的broker-B上面由一個consumer在消費queue-A的消息,那麼broker-B會先經過內部網絡獲取到broker-A上面的message,並通知本身的consumer來消費。
在activemq.xml文件中靜態指定Broker須要創建橋鏈接的其餘Broker:
一、 首先在Broker-A節點中添加networkConnector節點:
<networkConnectors>
<networkConnector uri="static:(tcp:// 0.0.0.0:61617)"duplex="false"/>
</networkConnectors>
二、 修改Broker-A節點中的服務提供端口爲61616:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
三、 在Broker-B節點中添加networkConnector節點:
<networkConnectors>
<networkConnector uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>
</networkConnectors>
四、 修改Broker-A節點中的服務提供端口爲61617:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
五、分別啓動Broker-A和Broker-B。
在activemq.xml文件中不直接指定Broker須要創建橋鏈接的其餘Broker,由activemq在啓動後動態查找:
一、 首先在Broker-A節點中添加networkConnector節點:
<networkConnectors>
<networkConnectoruri="multicast://default"
dynamicOnly="true"
networkTTL="3"
prefetchSize="1"
decreaseNetworkConsumerPriority="true" />
</networkConnectors>
二、修改Broker-A節點中的服務提供端口爲61616:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616? " discoveryUri="multicast://default"/>
</transportConnectors>
三、在Broker-B節點中添加networkConnector節點:
<networkConnectors>
<networkConnectoruri="multicast://default"
dynamicOnly="true"
networkTTL="3"
prefetchSize="1"
decreaseNetworkConsumerPriority="true" />
</networkConnectors>
四、修改Broker-B節點中的服務提供端口爲61617:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617" discoveryUri="multicast://default"/>
</transportConnectors>
五、啓動Broker-A和Broker-B
能夠看到Master-Slave的部署方式雖然解決了高可用的問題,但不支持負載均衡,Broker-Cluster解決了負載均衡,但當其中一個Broker忽然宕掉的話,那麼存在於該Broker上處於Pending狀態的message將會丟失,沒法達到高可用的目的。
因爲目前ActiveMQ官網上並無一個明確的將兩種部署方式相結合的部署方案,因此我嘗試者把二者結合起來部署:
這裏以Broker-A + Broker-B創建cluster,Broker-C做爲Broker-B的slave爲例:
1)首先在Broker-A節點中添加networkConnector節點:
<networkConnectors>
<networkConnector uri="masterslave:(tcp://0.0.0.0:61617,tcp:// 0.0.0.0:61618)" duplex="false"/>
</networkConnectors>
2)修改Broker-A節點中的服務提供端口爲61616:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
3)在Broker-B節點中添加networkConnector節點:
<networkConnectors>
<networkConnector uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>
</networkConnectors>
4)修改Broker-B節點中的服務提供端口爲61617:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
5)修改Broker-B節點中的持久化方式:
<persistenceAdapter>
<kahaDB directory="/localhost/kahadb"/>
</persistenceAdapter>
6)在Broker-C節點中添加networkConnector節點:
<networkConnectors>
<networkConnector uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>
</networkConnectors>
7)修改Broker-C節點中的服務提供端口爲61618:
<transportConnectors>
<transportConnectorname="openwire"uri="tcp://0.0.0.0:61618?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
8)修改Broker-B節點中的持久化方式:
<persistenceAdapter>
<kahaDB directory="/localhost/kahadb"/>
</persistenceAdapter>
9)分別啓動broker-A、broker-B、broker-C,由於是broker-B先啓動,因此「/localhost/kahadb」目錄被lock住,broker-C將一直處於掛起狀態,當人爲停掉broker-B以後,broker-C將獲取目錄「/localhost/kahadb」的控制權,從新與broker-A組成cluster提供服務。