消息隊列中間件是分佈式系統中的重要組件,主要解決應用耦合、異步消息、流量削鋒等問題。可幫助實現高性能,高可用,可伸縮和最終一致性的架構html
在消息隊列方面,除了 ActiveMQ、RabbitMQ、RocketMQ、ZeroMQ,Kafka等,還有不少其餘的競爭者。這篇文章咱們不會去講解它們之間的區別,僅只詳細的介紹一下 ActiveMQ,以及它在 .NET 中的使用apache
異步任務服務器
好比有如下場景:如今不少網站或App註冊時都採用了驗證碼的機制,所以,當服務器收到客戶端發起獲取驗證碼的請求,有如下處理方式session
那麼,以上幾種方式哪一種更好呢?架構
所以,若是是正式上線的版本(好比項目初期用於驗證市場的版本,每每會爲了速度而不考慮架構,這時可能會選擇第一種或第二種方案),且峯值較高的服務,選用第三種方案無疑是最好的。由於對於上線的服務,穩定性是很是重要的併發
對於發送短信這樣的任務(對實時性要求不是那麼高),使用消息隊列是很是合適的。將任務交由消息隊列以後,發送短信具體要作的事情主服務就不須要干涉了。若是須要,主服務訂閱任務的處理結果便可(發送成功或者失敗)。這樣,主服務就能夠繼續處理其餘客戶端的請求,而且,有消息隊列的參與,主服務的壓力就沒有那麼重了框架
固然,實際項目中,這樣的場景還有不少,好比記錄日誌,咱們都知道,寫文件(磁盤I/O)很耗時。所以如今不少大型的服務,都有專門的日誌服務器來處理其餘服務器發送過來的日誌,這時候咱們可使用 Kafka 來作這樣的事情(由於它就是爲了處理日誌而生的)異步
消息服務tcp
好比現現在的微服務、分佈式集羣等,各個節點之間的通訊,就可使用消息隊列來處理。具體使用什麼方式,可更具場景從如下兩種選擇分佈式
後面在給出案例時會具體講解這兩種模式
ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個徹底支持JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已是好久的事情了,可是JMS在當今的J2EE應用中間仍然扮演着特殊的地位。另外,在不少大型的網站或服務中,也都會使用到它
它具備如下特性
它的優點
這方面的教程在網上有不少,咱們在這就不提供了,只提供一些移動端友好的連接以幫助朋友安裝配置
首先,須要在 Apache官網 上下載 .NET 的驅動,也能夠經過如下連接下載
mirrors.hust.edu.cn/apache/acti…
要在項目中使用 ActiveMQ,須要引入上面下載的包中的兩個 dll 文件:Apache.NMS.ActiveMQ.dll
和 Apache.NMS.dll
P2P模式案例
P2P模式包含三個角色:消息隊列(Queue),發送者(Sender),接收者(Receiver)。
每條消息都被髮送到一個特定的隊列,接收者從隊列中獲取消息。隊列保留着消息,直到它們被消費或超時
P2P的特色:
所以,若是但願發送的每一個消息都會被成功處理的話,則應該P2P模式
示例代碼的基類以下
public abstract class ActiveMQBase {
protected IConnectionFactory factory;
protected IConnection connection;
protected ISession session;
public virtual void Init() {
try {
//初始化工廠, 端口默認爲61616,指定其餘會拋異常
factory = new ConnectionFactory("tcp://localhost:61616");
connection = factory.CreateConnection();
connection.Start();
session = connection.CreateSession();
} catch (Exception e) {
Console.WriteLine($"Error: {e.Message}");
}
}
public abstract void Run();
// 釋放相關資源
public virtual void Release() {
try {
if (session != null) session.Close();
if (connection != null) connection.Close();
} finally {
session = null;
connection = null;
factory = null;
}
}
}
複製代碼
生產者(Producer)以下
public class ActiveMQP2PDemoProducer : ActiveMQBase {
private IMessageProducer messageProducer;
private ActiveMQQueue demoQueue;
public override void Init() {
base.Init();
try {
// 指定隊列,以實現點對點的通訊
demoQueue = new ActiveMQQueue("DEMO_QUEUE");
// 建立生產者對象
messageProducer = session.CreateProducer(demoQueue);
} catch (Exception e) {
Console.WriteLine($"Error: {e.Message}");
}
}
public override void Run() {
while (true) {
Console.WriteLine("請輸入消息,exit 退出");
string line = Console.ReadLine();
if (line.Equals("exit", StringComparison.InvariantCultureIgnoreCase)) {
break;
}
// 建立一條文本消息,在 MessageProvider 中存在多個建立消息的方法
// 在實際項目中靈活選擇便可
ITextMessage message = messageProducer.CreateTextMessage(line);
// 發送消息,可調用其餘的重載,以設置是否持久化、優先級等特性
messageProducer.Send(message);
}
}
public override void Release() {
base.Release();
try {
if (demoQueue != null) demoQueue.Dispose();
if (messageProducer != null) messageProducer.Close();
} finally {
demoQueue = null;
messageProducer = null;
}
}
}
複製代碼
消費者(Consumer)以下
public class ActiveMQP2PDemoComsumer : ActiveMQBase {
private IMessageConsumer messageConsumer;
private ActiveMQQueue demoQueue;
public override void Init() {
base.Init();
try {
demoQueue = new ActiveMQQueue("DEMO_QUEUE");
// 建立消息的消費者
messageConsumer = session.CreateConsumer(demoQueue);
// 添加監聽,當消息來臨時,會觸發此事件
messageConsumer.Listener += this.MessageConsumer_Listener;
} catch (Exception e) {
Console.WriteLine($"Error: {e.Message}");
}
}
private void MessageConsumer_Listener(IMessage message) {
// 解析接收到的消息
if (message is ITextMessage msg) {
Console.WriteLine($"Received Message: {msg.Text}");
}
}
public override void Run() {
// 此處用於阻止控制檯結束,以保證消息可被正確處理
Console.WriteLine("請輸入消息,exit 退出");
string line = Console.ReadLine();
}
public override void Release() {
base.Release();
try {
if (demoQueue != null) demoQueue.Dispose();
if (messageConsumer != null){
messageConsumer.Listener -= this.MessageConsumer_Listener;
messageConsumer.Close();
}
} finally {
demoQueue = null;
messageConsumer = null;
}
}
}
複製代碼
使用方式以下
// 生產者初始化
ActiveMQP2PBase demo = new ActiveMQP2PDemoProducer();
// 消費者初始化代碼則爲: ActiveMQP2PBase demo = new ActiveMQP2PDemoComsumer();
demo.Init();
demo.Run();
demo.Release();
複製代碼
在 ActiveMQ 管理界面能夠看到以下,表示生產者發送的消息,都已經被消費者消費了
Pub/Sub模式
Pub/Sub模式:包含三個角色主題(Topic),發佈者(Publisher),訂閱者(Subscriber)。多個發佈者將消息發送到Topic, 系統將這些消息傳遞給多個訂閱者,能夠認爲生產者與消費者之間是多對多的關係
Pub/Sub的特色
所以,若是容許發送的消息能夠被一個或多個消費者消費、或者能夠不被消費,那麼能夠採用 Pub/Sub 模型
在 C# 中,它與 P2P 的使用區別不大,只須要將上述代碼生產者和消費者初始化代碼中
demoQueue = new ActiveMQQueue("DEMO_QUEUE");
複製代碼
這部分換成
demoTopic = new ActiveMQTopic("DEMO_TOPIC");
複製代碼
在管理員界面能夠看到以下數據
經過示例能夠看出,P2P 是基於 Queue 的,而 Pub/Sub 模式則是基於 Topic 的。
在 Pub/Sub 模式下,能夠實現多對多的通訊,便可以有多個生產者,也能夠有多個消費者,一旦有消息到來,它們會都會收到消息。
而P2P模式下,它能夠容許有多個生產者,也能夠有多個消費者。與 Pub/Sub 不一樣的是,若是有多個消費者,若是有消息到來,這些消費者會輪流着去消費該消息,而不是每一個消費者都收到消息。即一條消息只會有一個消費者
因爲在 C# 中,這兩種模式的使用方式差異很小,而運行以後產生的行爲卻差異較大。所以,在實際項目中,咱們須要注意這二者之間的區別,以避免帶來沒必要要的困惑
ActiveMQ服務器宕機怎麼辦 若是咱們想要在服務器宕機以後恢復數據,則須要對消息進行持久化
在一般的狀況下,非持久化消息是存儲在內存中的,持久化消息是存儲在文件中的。它們的最大限制在配置文件的<systemUsage>
節點中配置
可是,在非持久化消息堆積到必定程度,內存告急的時候,ActiveMQ 會將內存中的非持久化消息寫入臨時文件中,以騰出內存。雖然都保存到了文件裏,但它和持久化消息的區別是,重啓後持久化消息會從文件中恢復,非持久化的臨時文件會直接刪除(即重啓以後不會從臨時文件中恢復消息)
所以,爲了保證數據的可靠性
丟消息
這一樣是持久化消息的問題。對於這種狀況,咱們能夠
持久化消息比較慢
默認的狀況下,非持久化消息是異步發送的;而持久化消息是同步發送的。遇到慢一點的硬盤,發送消息的速度也會很慢
但若是開啓事務的狀況下,消息都會異步發送,效率會有很是大的提高。因此在發送持久化消息時,咱們應該務必開啓事務。而且咱們也建議發送非持久化消息時也開啓事務
自定義 ActiveMQ 的重發策略(Redelivery Policy)
可經過 ConnectionFactory.RedeliveryPolicy
屬性設置
CollisionAvoidancePercent
:默認值 0.15, 設置防止衝突範圍的正負百分比,只有啓用 UseCollisionAvoidance
參數時才生效MaximumRedeliveries
:默認值 6, 最大重傳次數,達到最大重連次數後拋出異常。爲-1時不限制次數,爲0時表示不進行重傳InitialRedeliveryDelay
:默認值 1000, 初始重發延遲時間UseCollisionAvoidance
:默認值 false
, 啓用防止衝突功能UseExponentialBackOff
:默認值 false
, 啓用指數倍數遞增的方式增長延遲時間BackOffMultiplier
:默認值 5, 重連時間間隔遞增倍數,只有值大於1和啓用 UseExponentialBackOff
參數時才生效。多消費者併發處理
在有多個消費者,ActiveMQ 中累積了大量的數據的狀況下,有可能會出現只有一個消費者消費、其餘消費者不「工做」的狀況
這種狀況下,咱們只須要將 ActiveMQ 的 prefetch
值設置得小一點便可。在 Queue模式時,其默認值爲 1000;Topic 下爲 32766。可經過 ConnectionFactory.PrefetchPolicy
設置
這篇文章就先講到這裏,後面咱們會講解 ActiveMQ 的一些其餘場景,如分佈式集羣。歡迎持續關注公衆號【嘿嘿的學習日記】,Thank you~