目前公司項目中有用到activemq,兩臺機器上分別經過共享文件方式搭建了master-slave集羣,但兩臺機器之間並未組建broker cluster,而是在客戶端經過軟負載的方式隨機選擇一組提供服務來達到集羣擴展的目的。css
上面的方案主要問題在於須要經過軟負載去實現分佈式的負載均衡算法,須要解決一系列問題。java
下面的文章就在原有基礎上組建broker cluser(activemq自帶),基於學習的目的經過一次搭建過程來體驗下(畢竟我不是運維人員),下面是效果圖:不須要軟負載。git
爲了簡單,broker cluster只建立兩組,並且所有節點部署在同一臺機器上。github
節點名稱 | tcp open-write端口 | 管理臺端口 | 共享文件 |
---|---|---|---|
master-a | 61616 | 8161 | /Users/iss/data/activemq/activemq-ha-a |
slave-a | 61617 | 8162 | /Users/iss/data/activemq/activemq-ha-a |
master-b | 61618 | 8163 | /Users/iss/data/activemq/activemq-ha-b |
slave-b | 61619 | 8164 | /Users/iss/data/activemq/activemq-ha-b |
因爲最新的版本須要jdk1.8,我這裏選擇的是支持jdk1.7的5.14.3web
簡單運行,咱們只須要修改兩個端口便可:這兩文件在activemq安裝目錄的conf中算法
這裏只用到tcp,因此將其它的能夠所有刪除,修改uri中的端口61616爲節點的端口。spring
<transportConnectors> <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> </transportConnectors>
修改下面的port就行,這是activemq的管理系統端口。shell
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start"> <!-- the default port number for the web console --> <property name="host" value="0.0.0.0"/> <property name="port" value="8161"/> </bean>
端口修改好以後,執行下面的腳本便可啓動,而後在data目錄下查看activemq.log。apache
bin/activemq start
爲了防止activemq單節點出現故障影響提供服務,因此須要有一個備份的節點當主節點出現故障時立刻替補上。這裏採用共享文件的方式,原理就是讓參與高可用的全部節點共用一個數據文件目錄,經過文件鎖的方式來決定誰是master誰是slave。咱們須要作的就是將多個節點的數據目錄配置成相同的就行。app
環境變量
在bin目錄下有個env文件,裏面指定了activemq所使用到的各種變量,數據目錄路徑修改 ACTIVEMQ_DATA:
# Active MQ installation dirs # ACTIVEMQ_HOME="<Installationdir>/" # ACTIVEMQ_BASE="$ACTIVEMQ_HOME" # ACTIVEMQ_CONF="$ACTIVEMQ_BASE/conf" ACTIVEMQ_DATA="/Users/iss/data/activemq/activemq-ha-a/data" # ACTIVEMQ_TMP="$ACTIVEMQ_BASE/tmp"
先啓動master,而後再啓動slave,若是配置正常,在slave的啓動日誌中會輸出以下日誌,表示已經有master鎖定,本身將以slave角色運行。
2018-01-01 01:46:56,769 | INFO | Database /Users/iss/data/activemq/activemq-ha-a/data/kahadb/lock is locked by another server. This broker is now in slave mode waiting a lock to be acquired | org.apache.activemq.store.SharedFileLocker | main
當master-a出現故障時系統會自動被slave-a取代。
上面的高可用只是解決了單點故障問題,同一時間提供服務的只有master一個節點,這顯示沒法面對數據量的增加需求,因此就須要一種可擴展節點的集羣方式來解決面臨的問題。讓一個broker與其它broker互相通訊,咱們這裏採用靜態uri方式,作法仍是修改activemq.xml:
master-a與slave-a組成一個broker-a;master-b與slave-b組成一個broker-b,broker-a與broker-b組成broker cluster
讓其能與broker-b通訊
<networkConnectors> <networkConnector uri="static:(tcp://localhost:61618,tcp://localhost:61619)" duplex="false"/> </networkConnectors>
讓其能與broker-a通訊
<networkConnectors> <networkConnector uri="static:(tcp://localhost:61616,tcp://localhost:61617)" duplex="false"/> </networkConnectors>
因爲本文出於簡單演示的目的,只組建了兩個broker,它們相互之間的通訊配置也很容易。當broker實例比較多時,相互以前的橋接通訊的配置還須要仔細研究,待後續補充......
整個工程結構以下,包含一個生產消息的,一個消費消息的。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>
配置整個集羣的url,包含所有master,slave,本文總共是4個。
發送消息時支持類,是對JmsTemplate的進一步包裝。
@ComponentScan(basePackages = {"com.jim.framework.activemq"}) @Configuration public class ActivemqConfiguration { private static final String BROKER_URL="failover:(tcp://192.168.10.222:61616,tcp://192.168.10.222:61617,tcp://192.168.10.222:61618,tcp://192.168.10.222:61619)"; @Bean public Queue productActiveMQQueue(){ return new ActiveMQQueue("jim.queue.product"); } @Bean public JmsListenerContainerFactory<?> jmsListenerContainerQueue() { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setConnectionFactory(new ActiveMQConnectionFactory(BROKER_URL)); return bean; } @Bean public JmsMessagingTemplate jmsMessagingTemplate(){ return new JmsMessagingTemplate(new ActiveMQConnectionFactory(BROKER_URL)); } }
public interface ProductSendMessage { void sendMessage(Object message); }
@Service public class ProductProducer implements ProductSendMessage { @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired private Queue productActiveMQQueue; @Override public void sendMessage(Object message) { this.jmsMessagingTemplate.convertAndSend(this.productActiveMQQueue,message); } }
@JmsListener,這個註解即標識監聽哪個消息隊列。
@Component public class ProductConsumer { @JmsListener(destination = "jim.queue.product",containerFactory = "jmsListenerContainerQueue") public void receiveQueue(String text) { System.out.println("Consumer,productId:"+text); } }
簡單的一個web工程,訪問某個連接時發送消息
@RestController @RequestMapping("/product") public class ProductController{ @Autowired private ProductProducer productProducer; @RequestMapping("/{productId}") public Long getById(@PathVariable final long productId) { this.productProducer.sendMessage(productId); return productId; } }
當訪問請求後,看看消費方的輸出:請求分別轉發到了61616以及61618兩個master上了,實現了自動負載均衡。
2018-01-01 09:03:43.683 INFO 18418 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport : Successfully connected to tcp://192.168.10.222:61616 Consumer,productId:80 2018-01-01 09:03:45.794 INFO 18418 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport : Successfully connected to tcp://192.168.10.222:61616 Consumer,productId:80 2018-01-01 09:03:47.745 INFO 18418 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport : Successfully connected to tcp://192.168.10.222:61618 Consumer,productId:80 2018-01-01 09:03:49.669 INFO 18418 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport : Successfully connected to tcp://192.168.10.222:61616 Consumer,productId:80
模似一個master出現故障,中止master-a後出現這樣的日誌,顯然activemq客戶端已經檢測到。
2018-01-01 11:25:19.348 WARN 18418 --- [222:61616@55277] o.a.a.t.failover.FailoverTransport : Transport (tcp://192.168.10.222:61616) failed , attempting to automatically reconnect: {} java.io.EOFException: null at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[na:1.8.0_121] at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268) ~[activemq-client-5.14.5.jar:5.14.5] at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240) ~[activemq-client-5.14.5.jar:5.14.5] at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232) ~[activemq-client-5.14.5.jar:5.14.5] at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215) ~[activemq-client-5.14.5.jar:5.14.5] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]
再次請求測試連接:發如今中止到master-a後,slave-a(61617)已經成功取代原來的master-a(61616),如今請求已經成功負載到新的master上。
2018-01-01 11:25:19.383 INFO 18418 --- [ActiveMQ Task-3] o.a.a.t.failover.FailoverTransport : Successfully reconnected to tcp://192.168.10.222:61618 2018-01-01 11:26:47.652 INFO 18418 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport : Successfully connected to tcp://192.168.10.222:61618 Consumer,productId:80 2018-01-01 11:26:55.408 INFO 18418 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport : Successfully connected to tcp://192.168.10.222:61618 Consumer,productId:80 2018-01-01 11:26:57.446 INFO 18418 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport : Successfully connected to tcp://192.168.10.222:61617 Consumer,productId:80
https://github.com/jiangmin168168/jim-framework/tree/master/jim-framework-activemq