ActiveMQ集羣搭建

1、 Activemq主備搭建

  Shared Filesystem Master-Slave方式
  shared filesystem Master-Slave部署方式主要是經過共享存儲目錄來實現master和slave的熱備,全部的ActiveMQ應用都在不斷地獲取共享目錄的控制權,哪一個應用搶到了控制權,它就成爲master。
  多個共享存儲目錄的應用,誰先啓動,誰就能夠最先取得共享目錄的控制權成爲master,其餘的應用就只能做爲slave。html

1  搭建配置步驟搭建 master-slave (一主 一備份)

準備mq的1節點 activemq-1
準備mq的2節點 activemq-2node

  特色:
    只能本地不能分佈式 和 集羣。web

針對每個activemq的節點進行配置:數據庫

1.1 配置節點1:
首先建立共享目錄,並建立兩個節點
如圖:apache

  •  配置activemq-1,須要修改持久數據庫位置,修改:activemq-1/conf/activemq.xml 

  • 配置activemq-1,須要修改activemq-1/conf/activemq.xml

  如圖:修改爲61617服務器

  • 配置activemq-1 ,須要修改activemq-1/conf/jetty.xml

2.2 配置節點2:session

  • 配置activemq-2,須要修改 持久目錄文件,修改:activemq-2/conf/activemq.xml 

  • 配置activemq-2,須要修改activemq-2/conf/activemq.xml

  如圖:修改爲61618tcp

  • 配置activemq-1 ,須要修改activemq-2/conf/jetty.xml

2 測試master-slave
2.1 生產者分佈式

 1 public class ProduceQueue {


 2     @Test

 3     public void sendMessage() throws Exception{

 4         //1.建立一個鏈接工廠 connectionfactory 參數:就是要鏈接的服務器的地址

 5         ConnectionFactory factory = new ActiveMQConnectionFactory("failover:(tcp://192.168.25.130:61617,tcp://192.168.25.130:61618)");

 6         //2.經過工廠獲取鏈接對象 建立鏈接 7         Connection connection = factory.createConnection();

 8         //3.開啓鏈接 9         connection.start();

10         //4.建立一個session對象  提供發送消息等方法

11         // 第一個參數:表示是否開啓分佈式事務(JTA)  通常是false 不開啓。

12         // 第二個參數:就是設置消息的應答模式
13         // 若是 第一個參數爲false時,第二個參數設置纔有意義。用的是自動應答14         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
15         //5.建立目的地 (destination)  queue 參數:目的地的名稱

16         Queue queue = session.createQueue("queue-test-cluster");

17         //6.建立個生產者18         MessageProducer producer = session.createProducer(queue);

19         //7.構建消息的內容20         TextMessage textMessage = session.createTextMessage("queue測試發送的消息");
21         // 8.發送消息22         producer.send(textMessage);

23         //9.關閉資源24         producer.close();

25         session.close();

26         connection.close();

27     }
28 
}

2.2 消費者ide

 1 public class ConsumerQueue {


 2     @Test

 3     public void consumer() throws Exception{

 4         //1.建立鏈接的工廠 5         ConnectionFactory factory = new ActiveMQConnectionFactory("failover:(tcp://192.168.25.130:61617,tcp://192.168.25.130:61618)");

 6         //2.建立鏈接 7         Connection connection = factory.createConnection();

 8         //3.開啓鏈接 9         connection.start();

10         //4.建立session

11         // 第一個參數:表示是否開啓分佈式事務(JTA)  通常是false 不開啓。

12         // 第二個參數:就是設置消息的應答模式   若是 第一個參數爲false時,第二個參數設置纔有意義。用的是自動應答13         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

14         // 5.建立接收消息的一個目的地15         Queue queue = session.createQueue("queue-test-cluster");

16         // 6.建立消費者17         MessageConsumer consumer = session.createConsumer(queue);

18         // 7.接收消息 打印

19         // 第一種20         /*while(true){

21             Message message = consumer.receive(1000000);//設置接收消息的超時時間

22             //沒有接收到消息就跳出循環

23             if(message==null){

24                 break;

25             }

26             if(message instanceof TextMessage){

27                 TextMessage message2 = (TextMessage) message;

28                 System.out.println("接收的消息爲"+message2.getText());

29                 }

30          }*/31         //第二種


32         // 設置一個監聽器

33         // System.out.println("start");

34         // 這裏其實開闢了一個新的線程35         consumer.setMessageListener(new MessageListener() {


36             //當有消息的時候會執行如下的邏輯37             @Override

38             public void onMessage(Message message) {

39                 if(message instanceof TextMessage){

40                     TextMessage message2 = (TextMessage) message;

41                     try {

42                         System.out.println("接收的消息爲"+message2.getText());

43                     } catch (JMSException e) {

44                         e.printStackTrace();

45                     }

46                 }

47             }

48         });

49         //System.out.println("end");50         Thread.sleep(199999);

51         // 8.關閉資源52         consumer.close();

53         session.close();

54         connection.close();

55     }


56 }

先啓動的成爲主節點,日常主節點工做,slave不工做可是一直作監聽。當主節點掛掉,slave接手工做。

2、 基於zookeeper的activemq集羣搭建(推薦)

2.1 基於可複製的 LevelDB
  LevelDB 是 Google 開發的一套用於持久化數據的高性能類庫。 LevelDB 並非一種服務,用戶須要自行實現 Server。 是單進程的服務,可以處理十億級別規模 Key-Value 型數據,佔用內存小。
  http://activemq.apache.org/replicated-leveldb-store.html

 

  高可用的原理:使用 ZooKeeper(集羣)註冊全部的 ActiveMQ Broker。只有其中的一個 Broker 能夠提供服務,被視爲 Master,其餘的 Broker 處於待機狀態,被視爲 Slave。若是 Master 因故障而不能提供服務,ZooKeeper 會從 Slave 中選舉出一個 Broker 充當 Master。Slave 鏈接 Master 並同步他們的存儲狀態, Slave 不接受客戶端鏈接。全部的存儲操做都將被複制到鏈接至 Master 的 Slaves。 若是 Master 宕了,獲得了最新更新的 Slave 會成爲 Master。 故障節點在恢復後會從新加入到集羣中並鏈接 Master 進入 Slave 模式。全部須要同步的 disk 的消息操做都將等待存儲狀態被複制到其餘法定節點的操做完成才能完成。因此,若是你配置了 replicas=3,那麼法定大小是(3/2)+1=2。 Master 將會存儲並更新而後等待 (2-1)=1 個Slave 存儲和更新完成,才彙報 success。 至於爲何是 2-1,熟悉 Zookeeper 的應該知道,有一個 node要做爲觀擦者存在。當一個新的 Master 被選中,你須要至少保障一個法定 node 在線以可以找到擁有最新狀態的 node。這個 node 能夠成爲新的 Master。所以,推薦運行至少 3 個 replica nodes,以防止一個 node失敗了,服務中斷。(原理與 ZooKeeper 集羣的高可用實現方式相似)。

2.2 集羣單機環境規劃
  定義環境:3個activemq節點(node01 node02 node03)

Ip 集羣通訊端口 節點消息鏈接端口 Jetty後臺運行端口
192.168.25.130 63631 51515 8361
192.168.25.130 63632 51516 8362
192.168.25.130 63633 51517 8363

2.3 配置zookeeper集羣

見相關教程;

2.4 節點1的配置

  在activemq.xml中配置:

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerCluster" dataDirectory="${activemq.data}">

  brokerName指定一個名字:任意便可。可是整個集羣的全部的配置項名稱都應該是一致的。

  broker標籤下的配置:

<persistenceAdapter> 
  <!-- kahaDB directory="${activemq.data}/kahadb"/ --> 
  <replicatedLevelDB 
  directory="${activemq.data}/leveldb" 
  replicas="3" 
  bind="tcp://0.0.0.0:63631" 
  zkAddress="192.168.25.130:2181,192.168.25.130:2182,192.168.25.130:2183" 
  hostname="localhost" 
  zkPath="/activemq2/leveldb-stores"/> 
</persistenceAdapter>

  jetty.xml:配置項:

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
  <!-- the default port number for the web console -->
  <property name="host" value="0.0.0.0"/>
  <property name="port" value="8361"/>
</bean>

2.5 節點2的配置
  參考節點1搭建便可,注意:端口不要同樣。

  搭建後的截圖:


  node01 -node03 是activemq的3個節點。

相關文章
相關標籤/搜索