架構設計:系統間通訊(25)——ActiveMQ集羣方案(上)

一、綜述

經過以前的文章,咱們討論了ActiveMQ的基本使用,包括單個ActiveMQ服務節點的性能特徵,關鍵調整參數;咱們還介紹了單個ActiveMQ節點上三種不一樣的持久化存儲方案,並討論了這三種不一樣的持久化存儲方案的配置和性能特色。可是這還遠遠不夠,由於在生產環境中爲了保證讓咱們設計的消息服務方案可以持續工做,咱們還須要爲消息中間件服務搭建集羣環境,從而在保證消息中間件服務可靠性和處理性能。java

二、ActiveMQ多節點方案

集羣方案主要爲了解決系統架構中的兩個關鍵問題:高可用和高性能。ActiveMQ服務的高可用性是指,在ActiveMQ服務性能不變、數據不丟失的前提下,確保當系統災難出現時ActiveMQ可以持續提供消息服務,高可靠性方案最終目的是減小整個ActiveMQ中止服務的時間web

ActiveMQ服務的高性能是指,在保證ActiveMQ服務持續穩定性、數據不丟失的前提下,確保ActiveMQ集羣可以在單位時間內吞吐更高數量的消息、確保ActiveMQ集羣處理單條消息的時間更短、確保ActiveMQ集羣可以容納更多的客戶端穩定鏈接。apache

下面咱們分別介紹如何經過多個ActiveMQ服務節點集羣方式,分別提供熱備方案和高性能方案。最後咱們討論如何將兩種方案結合在一塊兒,最終造成在生成環境下使用的推薦方案。安全

2-二、ActiveMQ高性能方案

ActiveMQ的多節點集羣方案,主要有動態集羣和靜態集羣兩種方案。所謂動態集羣就是指,同時提供消息服務的ActiveMQ節點數量、位置(IP和端口)是不肯定的,當某一個節點啓動後,會經過網絡組播的方式向其餘節點發送通知(同時接受其餘節點的組播信息)。當網絡中其餘節點收到組播通知後,就會向這個節點發起鏈接,最終將新的節點加入ActiveMQ集羣;所謂靜態集羣是指同時提供消息服務的多個節點的位置(IP和端口)是肯定的,每一個節點不須要經過廣播的方式發現目標節點,只須要在啓動時按照給定的位置進行鏈接。服務器

  • 靜態集羣方案markdown

    這裏寫圖片描述

  • 動態集羣方案網絡

這裏寫圖片描述

2-1-一、基於組播(multicast)的節點發現

在使用動態集羣配置時,當某個ActiveMQ服務節點啓動後並不知道整個網絡中還存在哪些其餘的服務節點。因此ActiveMQ集羣須要規定一種節點與節點間的發現機制,以保證可以解決上述問題。ActiveMQ集羣中,使用「組播」原理進行其餘節點的發現架構

組播(multicast)基於UDP協議,它是指在一個可連通的網絡中,某一個數據報發送源向一組數據報接收目標進行操做的過程。在這個過程當中,數據報發送者只須要向這個組播地址(一個D類IP)發送一個數據報,那麼加入這個組播地址的全部接收者均可以收到這個數據報。組播實現了網絡中單點到多點的高效數據傳送,可以節約大量網絡帶寬,下降網絡負載。負載均衡

這裏寫圖片描述

在IP協議中,規定的D類IP地址爲組播地址。224.0.0.0~239.255.255.255這個範圍內的IP都是D類IP地址,其中有一些IP段是保留的有特殊含義的:框架

  • 224.0.0.0~224.0.0.255:這個D類IP地址段爲保留地址,不建議您在開發過程當中使用,由於可能產生衝突。例如224.0.0.5這個組播地址專供OSPF協議(是一種路由策略協議,用於找到最優路徑)使用的組播地址;224.0.0.18這個組播地址專供VRRP協議使用(VRRP協議是虛擬路由器冗餘協議)。

  • 224.0.1.0~224.0.1.255:這個D類IP地址爲公用組播地址,用於在整個Internet網絡上進行組播。除非您有頂級DNS的控制/改寫權限,不然不建議在局域網內使用這個組播地址斷。

  • 239.0.0.0~239.255.255.255:這個D類IP地址段爲推薦在局域網內使用的組播地址段。注意,若是要在局域網內使用組播功能,須要局域網中的交換機/路由器支持組播功能。幸運的是,目前市面上只要不是太太低端的交換機/路由器,都支持組播功能(組播功能所使用的主要協議爲IGMP協議,關於IGMP協議的細節就再也不進行深刻了)。

下面咱們使用java語言,編寫一個局域網內的組播發送和接受過程。以便讓各位讀者對基於組播的節點發現操做有一個直觀的理解。雖然ActiveMQ中關於節點發現的過程,要比如下的示例複雜得多,可是基本原理是不會改變的。

  • 組播數據報發送者:
package multicast;

import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.util.Date;

public class SendMulticast {
    public static void main(String[] args) throws Throwable {
        // 組播地址
        InetAddress group = InetAddress.getByName("239.0.0.5");
        // 組播端口,同時也是UDP 數據報的發送端口
        int port = 19999;
        MulticastSocket mss = null;  

        // 建立一個用於發送/接收的MulticastSocket組播套接字對象
        mss = new MulticastSocket(port);
        // 建立要發送的組播信息和UDP數據報
        // 攜帶的數據內容,就是這個activeMQ服務節點用來提供Network Connectors的TCP/IP地址和端口等信息
        String message = "我是一個活動的activeMQ服務節點(節點編號:yyyyyyy),個人可用tcp信息爲:XXXXXXXXXX : ";  
        byte[] buffer2 = message.getBytes(); 
        DatagramPacket dp = new DatagramPacket(buffer2, buffer2.length, group, port);
        // 使用組播套接字joinGroup(),將其加入到一個組播
        mss.joinGroup(group);

        // 開始按照必定的週期向加入到224.0.0.5組播地址的其餘ActiveMQ服務節點進行廣播
        Thread thread = Thread.currentThread();
        while (!thread.isInterrupted()) {
            // 使用組播套接字的send()方法,將組播數據包對象放入其中,發送組播數據包
            mss.send(dp);
            System.out.println(new Date() + "發起組播:" + message);
            synchronized (SendMulticast.class) {
                SendMulticast.class.wait(5000);
            }
        }

        mss.close();
    }
}
  • 組播數據報接收者:
package multicast;

import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;

/** * 測試接收組播信息 * @author yinwenjie */
public class AcceptMulticast {
    public static void main(String[] args) throws Throwable {
        // 創建組播套接字,並加入分組
        MulticastSocket multicastSocket = new MulticastSocket(19999);
        // 注意,組播地址和端口必須和發送者的一直,才能加入正確的組
        InetAddress ad = InetAddress.getByName("239.0.0.5");
        multicastSocket.joinGroup(ad);

        // 準備接收可能的組播信號
        byte[] datas = new byte[2048];
        DatagramPacket data = new DatagramPacket(datas, 2048 ,ad , 19999);
        Thread thread = Thread.currentThread();

        // 開始接收組播信息,並打印出來
        System.out.println(".....開始接收組播信息.....");
        while(!thread.isInterrupted()) {
            multicastSocket.receive(data);
            int leng = data.getLength();
            System.out.println(new String(data.getData() , 0 , leng , "UTF-8"));
        }

        multicastSocket.close();
    }
}

另外,咱們以前講過的DUBBO框架中,也有基於「組播」的發現/註冊管理。具體可參考DUBBO框架中的com.alibaba.dubbo.registry.multicast.MulticastRegistry類和其引用類(如下爲MulticastRegistry類中,建立組播套接字和接受組播數據報的關鍵代碼段):

......
mutilcastAddress = InetAddress.getByName(url.getHost());
mutilcastPort = url.getPort() <= 0 ? DEFAULT_MULTICAST_PORT : url.getPort();
mutilcastSocket = new MulticastSocket(mutilcastPort);
mutilcastSocket.setLoopbackMode(false);
mutilcastSocket.joinGroup(mutilcastAddress);
Thread thread = new Thread(new Runnable() {
    public void run() {
        byte[] buf = new byte[2048];
        DatagramPacket recv = new DatagramPacket(buf, buf.length);
        while (! mutilcastSocket.isClosed()) {
            try {
                mutilcastSocket.receive(recv);
                String msg = new String(recv.getData()).trim();
                int i = msg.indexOf('\n');
                if (i > 0) {
                    msg = msg.substring(0, i).trim();
                }
                MulticastRegistry.this.receive(msg, (InetSocketAddress) recv.getSocketAddress());
                Arrays.fill(buf, (byte)0);
            } catch (Throwable e) {
                if (! mutilcastSocket.isClosed()) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    }
}, "DubboMulticastRegistryReceiver");
thread.setDaemon(true);
thread.start();
......

2-1-二、橋接Network Bridges

爲了實現ActiveMQ集羣的橫向擴展要求和高穩定性要求,ActiveMQ集羣提供了Network Bridges功能。經過Network Bridges功能,技術人員能夠將多個ActiveMQ服務節點鏈接起來。並讓它們經過配置好的策略做爲一個總體對外提供服務。

這樣的服務策略主要包括兩種:主/從模式和負載均衡模式。對於第一種策略咱們會在後文進行討論。本節咱們要重點討論的是基於Network Bridges的負載均衡模式。

這裏寫圖片描述

2-1-三、動態Network Connectors

既然已經講述了ActiveMQ中的動態節點發現原理和ActiveMQ Network Bridges的概念,那麼關於ActiveMQ怎樣配置集羣的方式就是很是簡單的問題了。咱們先來討論如何進行基於組播發現的ActiveMQ負載均衡模式的配置——動態網絡鏈接Network Connectors;再來討論基於固定地址的負載均衡模式配置——靜態網絡鏈接Network Connectors。

要配置基於組播發現的ActiveMQ負載均衡模式,其過程很是簡單。開發人員只須要在每個ActiveMQ服務節點的主配置文件中(activemq.xml),添加/更改 如下配置信息便可:

......
<transportConnectors>
    <!-- 在transportConnector中增長discoveryUri屬性,表示這個transportConnector是要經過組播告知其它節點的:使用這個transportConnector位置鏈接我 -->
    <transportConnector name="auto" uri="auto+nio://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600&amp;org.apache.activemq.transport.nio.SelectorManager.corePoolSize=20&amp;org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize=50&amp;consumer.prefetchSize=5" discoveryUri="multicast://239.0.0.5" />
</transportConnectors>

......

<!-- 關鍵的networkConnector標籤, uri屬性標示爲組播發現-->
<networkConnectors>
    <networkConnector uri="multicast://239.0.0.5" duplex="false"/>
</networkConnectors>

......

2-1-3-1:networkConnector標籤

若是使用ActiveMQ的組播發現功能,請在networkConnector標籤的uri屬性中添加以下格式的信息:

multicast://[組播地址][:端口]

例如,您能夠按照以下方式使用ActiveMQ默認的組播地址來發現網絡種其餘ActiveMQ服務節點:

#ActiveMQ集羣默認的組播地址(239.255.2.3):
multicast://default

也能夠按照以下方式,指定一個組播地址——這在高安全級別的網絡中頗有用,由於可能其餘的組播地址已經被管理員禁用。注意組播地址只能是D類IP地址段:

#使用組播地址239.0.0.5
multicast://239.0.0.5

如下是經過抓包軟件得到的的組播UDP報文:

這裏寫圖片描述

從上圖中咱們能夠得到幾個關鍵信息:

  • 192.168.61.138和192.168.61.139這兩個IP地址分別按照必定的週期(1秒一次),向組播地址239.0.0.5發送UDP數據報。以便讓在這個組播地址的其它服務節點可以感知本身的存在

  • 另外,以上UDP數據報文使用的端口是6155。您也能夠更改這個端口信息經過相似以下的方式:

#使用組播地址239.0.0.5:19999
multicast://239.0.0.5:19999
  • 每一個UDP數據報中,包含的主要信息包括本節點ActiveMQ的版本信息,以及鏈接到本身所須要使用的host名字、協議名和端口信息。相似以下:
default.ActiveMQ-4.ailve%localhost%auto+nio://activemq:61616

2-1-3-2:transportConnector標籤的關聯設置

任何一個ActiveMQ服務節點A,要鏈接到另外的ActiveMQ服務節點,都須要使用當前節點A已經公佈的transportConnector鏈接端口,例如如下配置中,可以供其它服務節點進行鏈接的就只有兩個transportConnector鏈接中的任意一個:

......
<transportConnectors>
    <!-- 其它ActiveMQ服務節點,只能使用如下三個鏈接協議和端口進行鏈接 -->
    <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
    <transportConnector name="tcp" uri="tcp://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="nio" uri="nio://0.0.0.0:61618?maximumConnections=1000" />
    <transportConnector name="auto" uri="auto://0.0.0.0:61617?maximumConnections=1000" />   
</transportConnectors>
......

那麼要將哪個鏈接方式經過UDP數據報向其餘ActiveMQ節點進行公佈,就須要在transportConnector標籤上使用discoveryUri屬性進行標識,以下所示:

......
<transportConnectors>
    ......
    <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="auto" uri="auto+nio://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" discoveryUri="multicast://239.0.0.5" />
</transportConnectors>

......
<networkConnectors>
    <networkConnector uri="multicast://239.0.0.5"/>
</networkConnectors>
......

2-1-3-3:其餘注意事項

  • 關於防火牆:請記得關閉您Linux服務器上對須要公佈的IP和端口的限制;

  • 關於hosts路由信息:因爲基於組播的動態發現機制,可以找到的是目標ActiveMQ服務節點的機器名,而不是直接找到的IP。因此請設置當前服務節點的hosts文件,以便當前ActiveMQ節點可以經過hosts文件中的IP路由關係,獲得機器名與IP的映射:

    # hosts文件
    
    ...... 192.168.61.139 activemq1 192.168.61.138 activemq2 ......
  • 關於哪些協議可以被用於進行Network Bridges鏈接:根據筆者以往的使用經驗,只有tcp頭的uri格式(openwire協議)可以被用於Network Bridges鏈接;固然您可使用auto頭,由於其兼容openwire協議;另外,您還能夠指定爲附加nio頭。

2-1-四、靜態Network Connectors

相比於基於組播發現方式的動態Network Connectors而言,雖然靜態Network Connectors沒有那樣靈活的橫向擴展性,可是卻能夠適用於網絡環境受嚴格管理的狀況。例如:管理員關閉了交換機/路由器的組播功能、端口受到嚴格管控等等。

配置靜態Network Connectors的ActiveMQ集羣的方式也很簡單,只須要更改networkConnectors標籤中的配置便可,而無需關聯改動transportConnectors標籤。可是配置靜態Network Connectors的ActiveMQ集羣時,須要注意很是關鍵的細節:每個節點都要配置其餘全部節點的鏈接位置

爲了演示配置過程,咱們假設ActiveMQ集羣由兩個節點構成,分別是activemq1:192.168.61.138 和 activemq2:192.168.61.139。那麼配置狀況以下所示:

  • 192.168.61.138:須要配置activemq2的位置信息以便進行鏈接:
......
<transportConnectors>
    <transportConnector name="auto" uri="auto+nio://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600&amp;consumer.prefetchSize=5"/>
</transportConnectors>
......

<!-- 請注意,必定須要192.168.61.139(activemq2)提供了這樣的鏈接協議和端口 -->
<networkConnectors>
    <networkConnector uri="static:(auto+nio://192.168.61.139:61616)"/>
</networkConnectors>
......
  • 192.168.61.139:須要配置activemq1的位置信息以便進行鏈接:
......
<transportConnectors>
    <transportConnector name="auto" uri="auto+nio://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600&amp;consumer.prefetchSize=5"/>
</transportConnectors>

......
<!-- 請注意,必定須要192.168.61.138(activemq1)提供了這樣的鏈接協議和端口 -->
<networkConnectors>
   <networkConnector uri="static:(auto+nio://192.168.61.138:61616)"/>
</networkConnectors>
......

同理,若是您的ActiveMQ集羣規劃中有三個ActiveMQ服務節點,那麼任何一個節點都應該配置其它兩個服務節點的鏈接方式。在配置格式中使用「,」符號進行分割:

...... <networkConnectors> <networkConnector uri="static:(tcp://host1:61616,tcp://host2:61616,tcp://..)"/> </networkConnectors> ......

如下是配置完成後可能的效果:

  • 192.168.61.138(activemq1):

這裏寫圖片描述

  • 192.168.61.139(activemq2):

這裏寫圖片描述

2-1-五、其餘配置屬性

下表列舉了在networkConnector標籤中還可使用的屬性以及其意義。請特別注意其中的duplex屬性。若是隻從字面意義理解該屬性,則被稱爲「雙工模式」;若是該屬性爲true,當這個節點使用Network Bridge鏈接到其它目標節點後,將強制目標也創建Network Bridge進行反向鏈接。其目的在於讓消息既能發送到目標節點,又能夠經過目標節點接受消息,但實際上大多數狀況下是沒有必要的,由於目標節點通常都會自行創建鏈接到本節點。因此,該duplex屬性的默認值爲false。

屬性名稱 默認值 屬性意義
name bridge 名稱
dynamicOnly false 若是爲true, 持久訂閱被激活時才建立對應的網路持久訂閱。
decreaseNetworkConsumerPriority false 若是爲true,網絡的消費者優先級下降爲-5。若是爲false,則默認跟本地消費者同樣爲0.
excludedDestinations empty 不經過網絡轉發的destination
dynamicallyIncludedDestinations empty 經過網絡轉發的destinations,注意空列表表明全部的都轉發。
staticallyIncludedDestinations empty 匹配的都將經過網絡轉發-即便沒有對應的消費者,若是爲默認的「empty」,那麼說明全部都要被轉發
duplex false 已經進行詳細介紹的「雙工」屬性。
prefetchSize 1000 設置網絡消費者的prefetch size參數。若是設置成0,那麼就像以前文章介紹過的那樣:消費者會本身輪詢消息。顯然這是不被容許的。
suppressDuplicateQueueSubscriptions false 若是爲true, 重複的訂閱關係一產生即被阻止(V5.3+ 的版本中可使用)。
bridgeTempDestinations true 是否廣播advisory messages來建立臨時destination。
alwaysSyncSend false 若是爲true,非持久化消息也將使用request/reply方式代替oneway方式發送到遠程broker(V5.6+ 的版本中可使用)。
staticBridge false 若是爲true,只有staticallyIncludedDestinations中配置的destination能夠被處理(V5.6+ 的版本中可使用)。

如下這些屬性,只能在靜態Network Connectors模式下使用

屬性名稱 默認值 屬性意義
initialReconnectDelay 1000 重連以前等待的時間(ms) (若是useExponentialBackOff爲false)
useExponentialBackOff true 若是該屬性爲true,那麼在每次重連失敗到下次重連以前,都會增大等待時間
maxReconnectDelay 30000 重連以前等待的最大時間(ms)
backOffMultiplier 2 增大等待時間的係數

請注意這些屬性,並非networkConnector標籤的屬性,而是在uri屬性中進行設置的,例如:

uri="static:(tcp://host1:61616,tcp://host2:61616)?maxReconnectDelay=5000&useExponentialBackOff=false"

============ (接下文)

相關文章
相關標籤/搜索