溫故之消息隊列ActiveMQ

消息隊列中間件是分佈式系統中的重要組件,主要解決應用耦合、異步消息、流量削鋒等問題。可幫助實現高性能,高可用,可伸縮和最終一致性的架構html

在消息隊列方面,除了 ActiveMQ、RabbitMQ、RocketMQ、ZeroMQ,Kafka等,還有不少其餘的競爭者。這篇文章咱們不會去講解它們之間的區別,僅只詳細的介紹一下 ActiveMQ,以及它在 .NET 中的使用apache

消息隊列應用場景

異步任務服務器

好比有如下場景:如今不少網站或App註冊時都採用了驗證碼的機制,所以,當服務器收到客戶端發起獲取驗證碼的請求,有如下處理方式session

  1. 在當前線程中當即發送短信(會阻塞當前線程一小會兒)
  2. 新創建一個線程發送短信(在 .NET 中創建一個 Task 就行)
  3. 交由其餘的服務來處理這個任務(轉發給消息隊列,讓消息隊列處理)

那麼,以上幾種方式哪一種更好呢?架構

  • 第一種:實時性確定更好,收到請求當即處理,但它阻塞了當前線程,會形成其餘客戶端的請求被阻塞(請求少的時候咱們可能根本感受不到);
  • 第二種:在當前進程中創建一個線程來處理,實時性不如第一種,但它不會阻塞其餘客戶端的請求。不過一個進程中能建立的線程數量有限,所以也有瓶頸
  • 第三種:使用其餘特定場景的服務,這種實時性最差(但若是服務器配置好,咱們也不必定能感受到差別),但其是使用的最多的,而且其上線後效果是最好的(穩定性、可伸縮性)

所以,若是是正式上線的版本(好比項目初期用於驗證市場的版本,每每會爲了速度而不考慮架構,這時可能會選擇第一種或第二種方案),且峯值較高的服務,選用第三種方案無疑是最好的。由於對於上線的服務,穩定性是很是重要的併發

對於發送短信這樣的任務(對實時性要求不是那麼高),使用消息隊列是很是合適的。將任務交由消息隊列以後,發送短信具體要作的事情主服務就不須要干涉了。若是須要,主服務訂閱任務的處理結果便可(發送成功或者失敗)。這樣,主服務就能夠繼續處理其餘客戶端的請求,而且,有消息隊列的參與,主服務的壓力就沒有那麼重了框架

固然,實際項目中,這樣的場景還有不少,好比記錄日誌,咱們都知道,寫文件(磁盤I/O)很耗時。所以如今不少大型的服務,都有專門的日誌服務器來處理其餘服務器發送過來的日誌,這時候咱們可使用 Kafka 來作這樣的事情(由於它就是爲了處理日誌而生的)異步

消息服務tcp

好比現現在的微服務、分佈式集羣等,各個節點之間的通訊,就可使用消息隊列來處理。具體使用什麼方式,可更具場景從如下兩種選擇分佈式

  • P2P(Point to Point)點對點模式
  • Publish/Subscribe(Pub/Sub) 發佈訂閱模式

後面在給出案例時會具體講解這兩種模式

ActiveMQ

ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已是好久的事情了,可是JMS在當今的J2EE應用中間仍然扮演着特殊的地位。另外,在不少大型的網站或服務中,也都會使用到它

它具備如下特性

  • 多種語言和協議編寫客戶端
    語言:Java、C、C++、C#、Ruby、Perl、Python、PHP;
    應用協議:OpenWire、Stomp REST、WS Notification、XMPP、AMQP
  • 徹底支持JMS1.1和J2EE 1.4規範 (持久化,XA消息,事務)
  • 對Spring的支持,ActiveMQ能夠很容易內嵌到使用Spring的系統裏面去,並且也支持Spring 的最新特性
  • 經過了常見J2EE服務器(如 Geronimo、JBoss 四、GlassFish、WebLogic)的測試,其中經過JCA 1.5 resource adaptors的配置,可讓ActiveMQ能夠自動的部署到任何兼容J2EE 1.4 商業服務器上
  • 支持多種傳輸協議:in-VM、TCP、SSL、NIO、UDP、JGroups、JXTA
  • 支持經過JDBC和journal提供高速的消息持久化
  • 從設計上保證了高性能的集羣,客戶端-服務器以及點對點的通訊
  • 支持Ajax
  • 支持與Axis的整合
  • 能夠很容易調用內嵌 JMS provider 進行測試

它的優點

  • 穩定性:失敗重連機制,持久化服務, 容錯機制, 多種恢復機制
  • 高效性:支持多種傳送協議如TCP, SSL, NIO, UDP等,集羣消息在多個代理之間轉發防止消息丟失,支持超快的JDBC消息持久化和高效的日誌系統
  • 可擴展:ActiveMQ 的高級特性均可以配置的形式來表現,很好的實現例如遊標,容錯機制,消息group及監控服務,同時擴展了不少成熟的框架
  • 高級特性:消息羣組(Message Groups)、虛擬端點(Virtual Destinations)、通配符(Wildcards)、複合端點(Composite Destinations)

ActiveMQ在Windows上的安裝配置

這方面的教程在網上有不少,咱們在這就不提供了,只提供一些移動端友好的連接以幫助朋友安裝配置

ActiveMQ在C#中的使用

首先,須要在 Apache官網 上下載 .NET 的驅動,也能夠經過如下連接下載

mirrors.hust.edu.cn/apache/acti…

要在項目中使用 ActiveMQ,須要引入上面下載的包中的兩個 dll 文件:Apache.NMS.ActiveMQ.dllApache.NMS.dll

P2P模式案例

P2P模式包含三個角色:消息隊列(Queue),發送者(Sender),接收者(Receiver)。
每條消息都被髮送到一個特定的隊列,接收者從隊列中獲取消息。隊列保留着消息,直到它們被消費或超時

P2P的特色:

  • 每條消息只有一個消費者(即一旦被消費,消息就會被移除消息隊列):在運行了多個消費者以後,一條消息只會有一個消費者收到,其餘的消費者是不能夠收到的
  • 接收者在成功接收消息以後需向隊列應答成功:咱們能夠經過指定應答模式來更改,默認是自動應答模式

所以,若是但願發送的每一個消息都會被成功處理的話,則應該P2P模式

示例代碼的基類以下

public abstract class ActiveMQBase {
    protected IConnectionFactory factory;
    protected IConnection connection;
    protected ISession session;

    public virtual void Init() {
        try {
            //初始化工廠, 端口默認爲61616,指定其餘會拋異常
            factory = new ConnectionFactory("tcp://localhost:61616");
            connection = factory.CreateConnection();
            connection.Start();
            session = connection.CreateSession();
        } catch (Exception e) {
            Console.WriteLine($"Error: {e.Message}");
        }
    }
    
    public abstract void Run();

    // 釋放相關資源
    public virtual void Release() {
        try {
            if (session != null) session.Close();
            if (connection != null) connection.Close();
        } finally {
            session = null;
            connection = null;
            factory = null;
        }
    }
}
複製代碼

生產者(Producer)以下

public class ActiveMQP2PDemoProducer : ActiveMQBase {
    private IMessageProducer messageProducer;
    private ActiveMQQueue demoQueue;

    public override void Init() {
        base.Init();
        try {
            // 指定隊列,以實現點對點的通訊 
            demoQueue = new ActiveMQQueue("DEMO_QUEUE");
            // 建立生產者對象
            messageProducer = session.CreateProducer(demoQueue);
        } catch (Exception e) {
            Console.WriteLine($"Error: {e.Message}");
        }
    }
    
    public override void Run() {
        while (true) {
            Console.WriteLine("請輸入消息,exit 退出");
            string line = Console.ReadLine();
            if (line.Equals("exit", StringComparison.InvariantCultureIgnoreCase)) {
                break;
            }
            // 建立一條文本消息,在 MessageProvider 中存在多個建立消息的方法
            // 在實際項目中靈活選擇便可
            ITextMessage message = messageProducer.CreateTextMessage(line);
            // 發送消息,可調用其餘的重載,以設置是否持久化、優先級等特性
            messageProducer.Send(message);
        }
    }

    public override void Release() {
        base.Release();
        try {
            if (demoQueue != null) demoQueue.Dispose();
            if (messageProducer != null) messageProducer.Close();
        } finally {
            demoQueue = null;
            messageProducer = null;
        }
    }
}
複製代碼

消費者(Consumer)以下

public class ActiveMQP2PDemoComsumer : ActiveMQBase {
    private IMessageConsumer messageConsumer;
    private ActiveMQQueue demoQueue;

    public override void Init() {
        base.Init();
        try {
            demoQueue = new ActiveMQQueue("DEMO_QUEUE");
            // 建立消息的消費者
            messageConsumer = session.CreateConsumer(demoQueue);
            // 添加監聽,當消息來臨時,會觸發此事件
            messageConsumer.Listener += this.MessageConsumer_Listener;
        } catch (Exception e) {
            Console.WriteLine($"Error: {e.Message}");
        }
    }


    private void MessageConsumer_Listener(IMessage message) {
        // 解析接收到的消息
        if (message is ITextMessage msg) {
            Console.WriteLine($"Received Message: {msg.Text}");
        }
    }

    public override void Run() {
        // 此處用於阻止控制檯結束,以保證消息可被正確處理
        Console.WriteLine("請輸入消息,exit 退出");
        string line = Console.ReadLine();
    }

    public override void Release() {
        base.Release();
        try {
            if (demoQueue != null) demoQueue.Dispose();
            if (messageConsumer != null){
               messageConsumer.Listener -= this.MessageConsumer_Listener;
               messageConsumer.Close();
            }
        } finally {
            demoQueue = null;
            messageConsumer = null;
        }
    }
}
複製代碼

使用方式以下

// 生產者初始化
ActiveMQP2PBase demo = new ActiveMQP2PDemoProducer();
// 消費者初始化代碼則爲: ActiveMQP2PBase demo = new ActiveMQP2PDemoComsumer();
demo.Init();
demo.Run();
demo.Release();
複製代碼

在 ActiveMQ 管理界面能夠看到以下,表示生產者發送的消息,都已經被消費者消費了

P2P模式

Pub/Sub模式

Pub/Sub模式:包含三個角色主題(Topic),發佈者(Publisher),訂閱者(Subscriber)。多個發佈者將消息發送到Topic, 系統將這些消息傳遞給多個訂閱者,能夠認爲生產者與消費者之間是多對多的關係

Pub/Sub的特色

  • 每條消息能夠有多個消費者
  • 爲了消費消息,訂閱者必須保持運行的狀態
  • 爲了緩和這樣嚴格的時間相關性,JMS 容許訂閱者建立一個可持久化的訂閱。這樣即便訂閱者沒有運行,在運行以後它也能接收到發佈者的消息

所以,若是容許發送的消息能夠被一個或多個消費者消費、或者能夠不被消費,那麼能夠採用 Pub/Sub 模型

在 C# 中,它與 P2P 的使用區別不大,只須要將上述代碼生產者和消費者初始化代碼中

demoQueue = new ActiveMQQueue("DEMO_QUEUE");
複製代碼

這部分換成

demoTopic = new ActiveMQTopic("DEMO_TOPIC");
複製代碼

在管理員界面能夠看到以下數據

Pub/Sub

經過示例能夠看出,P2P 是基於 Queue 的,而 Pub/Sub 模式則是基於 Topic 的。

在 Pub/Sub 模式下,能夠實現多對多的通訊,便可以有多個生產者,也能夠有多個消費者,一旦有消息到來,它們會都會收到消息。

而P2P模式下,它能夠容許有多個生產者,也能夠有多個消費者。與 Pub/Sub 不一樣的是,若是有多個消費者,若是有消息到來,這些消費者會輪流着去消費該消息,而不是每一個消費者都收到消息。即一條消息只會有一個消費者

因爲在 C# 中,這兩種模式的使用方式差異很小,而運行以後產生的行爲卻差異較大。所以,在實際項目中,咱們須要注意這二者之間的區別,以避免帶來沒必要要的困惑

實際項目中的一些問題

ActiveMQ服務器宕機怎麼辦 若是咱們想要在服務器宕機以後恢復數據,則須要對消息進行持久化

在一般的狀況下,非持久化消息是存儲在內存中的,持久化消息是存儲在文件中的。它們的最大限制在配置文件的<systemUsage>節點中配置

可是,在非持久化消息堆積到必定程度,內存告急的時候,ActiveMQ 會將內存中的非持久化消息寫入臨時文件中,以騰出內存。雖然都保存到了文件裏,但它和持久化消息的區別是,重啓後持久化消息會從文件中恢復,非持久化的臨時文件會直接刪除(即重啓以後不會從臨時文件中恢復消息)

所以,爲了保證數據的可靠性

  • 儘可能使用持久化消息(消息不重要也能夠不用持久化)
  • 能夠將持久化與非持久化文件的限制調大一點,以保證服務最大可用

丟消息

這一樣是持久化消息的問題。對於這種狀況,咱們能夠

  1. 儘可能將消息持久化
  2. 若是不想持久化,那麼咱們應該儘量的及時處理非持久化的消息
  3. 使用事務,它能夠保證消息不會由於鏈接關閉而丟失

持久化消息比較慢

默認的狀況下,非持久化消息是異步發送的;而持久化消息是同步發送的。遇到慢一點的硬盤,發送消息的速度也會很慢

但若是開啓事務的狀況下,消息都會異步發送,效率會有很是大的提高。因此在發送持久化消息時,咱們應該務必開啓事務。而且咱們也建議發送非持久化消息時也開啓事務

自定義 ActiveMQ 的重發策略(Redelivery Policy)

可經過 ConnectionFactory.RedeliveryPolicy 屬性設置

  • CollisionAvoidancePercent:默認值 0.15, 設置防止衝突範圍的正負百分比,只有啓用 UseCollisionAvoidance 參數時才生效
  • MaximumRedeliveries:默認值 6, 最大重傳次數,達到最大重連次數後拋出異常。爲-1時不限制次數,爲0時表示不進行重傳
  • InitialRedeliveryDelay:默認值 1000, 初始重發延遲時間
  • UseCollisionAvoidance:默認值 false, 啓用防止衝突功能
  • UseExponentialBackOff:默認值 false, 啓用指數倍數遞增的方式增長延遲時間
  • BackOffMultiplier:默認值 5, 重連時間間隔遞增倍數,只有值大於1和啓用 UseExponentialBackOff 參數時才生效。

多消費者併發處理

在有多個消費者,ActiveMQ 中累積了大量的數據的狀況下,有可能會出現只有一個消費者消費、其餘消費者不「工做」的狀況

這種狀況下,咱們只須要將 ActiveMQ 的 prefetch 值設置得小一點便可。在 Queue模式時,其默認值爲 1000;Topic 下爲 32766。可經過 ConnectionFactory.PrefetchPolicy 設置

這篇文章就先講到這裏,後面咱們會講解 ActiveMQ 的一些其餘場景,如分佈式集羣。歡迎持續關注公衆號【嘿嘿的學習日記】,Thank you~

公衆號二維碼
相關文章
相關標籤/搜索