ActiveMQ——三、ActiveMQ高可用與集羣搭建

1. ActiveMQ的高可用原理
使用ZooKeeper(集羣)註冊全部的ActiveMQ Broker。只有其中的一個Broker能夠提供服務,被視爲 Master,其餘的 Broker 處於待機狀態,被視爲Slave。若是Master因故障而不能提供服務,Zookeeper會從Slave中選舉出一個Broker充當Master。
Slave鏈接Master並同步他們的存儲狀態,Slave不接受客戶端鏈接。全部的存儲操做都將被複制到 鏈接至 Master的Slaves。若是Master宕了,獲得了最新更新的Slave會成爲 Master。故障節點在恢復後會從新加入到集羣中並鏈接Master進入Slave模式。
是否是以爲和Redis Sentinel主從高可用的方式很像,這裏的zookeeper起到的做用和reids裏的sentinel做用差很少。java

另外,附上官方文檔的一則警告,請使用者注意。replicated LevelDB 不支持延遲或者計劃任務消息。這 些消息存儲在另外的LevelDB文件中,若是使用延遲或者計劃任務消息,將不會複製到Slave Broker上,不能實現消息的高可用。json

2.ActiveMQ高可用環境搭建服務器

本人是在一臺CentOS虛擬機上進行測試的,在開發中須要根據本身的實際狀況做出相應的調整。在這臺服務器上,配置了3個ActiveMQ,以下圖所示:session

後面須要修改的配置文件都在ACTIVEMQ_HOME/conf文件夾下。首先修改每一個ActiveMQ的持久化方式(修改ACTIVEMQ_HOME/bin/activemq.xml文件),ActiveMQ默認使用的是kahaDB做爲持久化存儲數據的,這裏修改爲levelDB。以下圖所示:負載均衡

接下來修改ActiveMQ的TCP連接端口號,activemq-1使用默認的61616端口,activemq-2修改成61617,activemq-3修改成61618。以下圖所示(注意紅框部分):dom

修改並保存以後,就是修改jetty的端口號(修改ACTIVEMQ_HOME/bin/jetty.xml文件),由於實在同一臺服務器上,不修改的話,第二個和第三個jetty將啓動不了。第activemq-1依然使用默認端口8161,activemq-2使用8162,activemq-3使用8163端口,以下圖:tcp

到這裏,ActiveMQ高可用就配置好了,若是沒啓動zookeeper的話,先啓動zookeeper(能夠看個人關於zookeeper的博客),而後分別啓動activemq-1,activemq-2,activemq-3。進入ACTIVEMQ_HOME/data下,查看activemq.log文件,若是沒有報錯則說明啓動成功!在zookeeper上,能夠看到以下數據:工具

其中elected不爲空的節點表示爲Master,由該activemq對外提供服務。測試

好了,讓咱們來擼點代碼測試一下吧!this

ClustorProducer:

public class ClustorProducer {  
    private ConnectionFactory factory;  
    private Connection connection;  
    private Session session;  
    private Destination destination;  
    private MessageProducer producer;  
  
    public ClustorProducer() throws JMSException {  
        this.factory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,  
                ActiveMQConnectionFactory.DEFAULT_PASSWORD,  
                "failover:(tcp://192.168.4.19:61616,tcp://192.168.4.19:61617,tcp://192.168.4.19:61618)?randomize=false");  
        this.connection = factory.createConnection();  
        connection.start();  
        this.session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);  
        this.destination = session.createQueue("first");  
        producer = session.createProducer(destination);  
    }  
  
    public void send() throws JMSException, InterruptedException {  
        for(int i=1; i<=50000; i++) {  
            Message message = session.createTextMessage("內容:" + i);  
            producer.send(destination, message);  
            System.out.println(message);  
            Thread.sleep(1000);  
        }  
    }  
  
    public static void main(String[] args) throws JMSException, InterruptedException {  
        ClustorProducer clustorProducer = new ClustorProducer();  
        clustorProducer.send();  
    }  
}

ClustorConsumer:

public class ClustorConsumer {  
    private ConnectionFactory factory;  
    private Connection connection;  
    private Session session;  
    private Destination destination;  
    private MessageConsumer consumer;  
  
    public ClustorConsumer() throws JMSException {  
        this.factory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,  
                ActiveMQConnectionFactory.DEFAULT_PASSWORD,  
                "failover:(tcp://192.168.4.19:61616,tcp://192.168.4.19:61617,tcp://192.168.4.19:61618)?randomize=false");  
        this.connection = factory.createConnection();  
        connection.start();  
        this.session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);  
        this.destination = session.createQueue("first");  
        this.consumer = session.createConsumer(destination);  
    }  
  
    public void consume() throws JMSException, InterruptedException {  
        while (true) {  
            Message message = consumer.receive();  
            if(message == null)  
                break;  
            System.out.println(message);  
            Thread.sleep(1000);  
        }  
    }  
  
    public static void main(String[] args) throws JMSException, InterruptedException {  
        ClustorConsumer clustorConsumer = new ClustorConsumer();  
        clustorConsumer.consume();  
    }  
}

其中的brokerUrl參數發生了變化變成了:

failover:(tcp://192.168.4.19:61616,tcp://192.168.4.19:61617,tcp://192.168.4.19:61618)?randomize=false

停掉三個ActiveMQ中任意的一個,咱們能夠發現依然能夠發送和接收消息。說明ActiveMQ的高可用很成功!

3.ActiveMQ集羣負載均衡搭建

以前已經實現了ActiveMQ的高可用部署,單僅僅是高可用集羣,沒法達到負載均衡的做用,接下來只需簡單配置就能完成能夠實現負載均衡的集羣功能:
在集羣1的activemq.xml中連接集羣2(在persistenceAdapter標籤前配置):

<networkConnectors>
    <networkConnector uri="static:(tcp://192.168.1.103:61616,tcp://192.168.2.103:61617,tcp://192.168.2.103:61618)" duplex="false"/>
</networkConnectors>
在集羣2的activemq.xml中連接集羣1(在persistenceAdapter標籤前配置):
<networkConnectors>
    <networkConnector uri="static:(tcp://192.168.1.104:61616,tcp://192.168.1.104:61617,tcp://192.168.1.104:61618)" duplex="false"/>
</networkConnectors>


這樣就實現了ActiveMQ的集羣高可用負載均衡功能。
客戶端鏈接:
ActiveMQ 的客戶端只能訪問Master的Broker,其餘處於Slave的Broker不能訪問。因此客戶端鏈接Broker應該使用failover協議。
配置文件地址應爲:

failover:(tcp://192.168.1.103:61616,tcp://192.168.1.103:61617,tcp://192.168.1.103:61618)?randomize=false

或:

failover:(tcp://192.168.1.104:61616,tcp://192.168.1.104:61617,tcp://192.168.1.104:61618)?randomize=false

使用zookeeper的可視化工具,能夠查看activemq在zookeeper中的註冊狀況:

address有內容的時候,表示該地址的節點爲master

當爲null的時候,表示該節點爲slave

並且,activemq的UI界面也只能在master節點上打開

相關文章
相關標籤/搜索