本教程主要闡釋瞭如何利用SignalR與消息隊列的結合,實現不一樣客戶端的交互html
下面屏幕截圖展現了各個客戶端經過ActiveMQ相互通訊git
一、SignalR寄宿在web:github
二、SignalR寄宿在控制檯中,web客戶端調用SignalR,讀者自行測試。web
工程目錄:瀏覽器
1、建立項目session
一、建立生產者項目,該項目要是經過控制檯輸入消息,發送到消息隊列app
建立控制檯應用程序命名爲ActiveMQNetProcucer,而後用包管理器安裝ActiveMQ的.Net客戶端tcp
Install-Package Apache.NMS.ActiveMQide
主要代碼以下:測試
1 using Apache.NMS; 2 using Apache.NMS.ActiveMQ; 3 using System; 4 using System.Collections.Generic; 5 using System.Linq; 6 using System.Text; 7 using System.Threading.Tasks; 8 namespace ActiveMQNet 9 { 10 class Program 11 { 12 static IConnectionFactory _factory = null; 13 static IConnection _connection = null; 14 static ITextMessage _message = null; 15 16 static void Main(string[] args) 17 { 18 //建立工廠 19 _factory = new ConnectionFactory("tcp://127.0.0.1:61616/"); 20 21 try 22 { 23 //建立鏈接 24 using (_connection = _factory.CreateConnection()) 25 { 26 //建立會話 27 using (ISession session = _connection.CreateSession()) 28 { 29 //建立一個主題 30 IDestination destination = new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("topic"); 31 32 //建立生產者 33 IMessageProducer producer = session.CreateProducer(destination); 34 35 Console.WriteLine("Please enter any key to continue! "); 36 Console.ReadKey(); 37 Console.WriteLine("Sending: "); 38 39 //建立一個文本消息 40 _message = producer.CreateTextMessage("Hello AcitveMQ...."); 41 42 //發送消息 43 producer.Send(_message, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue); 44 while (true) 45 { 46 var msg = Console.ReadLine(); 47 _message = producer.CreateTextMessage(msg); 48 producer.Send(_message, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue); 49 } 50 51 } 52 } 53 54 } 55 catch (Exception ex) 56 { 57 Console.WriteLine(ex.ToString()); 58 } 59 60 Console.ReadLine(); 61 62 } 63 } 64 }
二、建立消費者項目,該項目主要是訂閱消息隊列中的消息
建立控制檯應用程序命名爲ActiveMQNetCustomer,而後用包管理器安裝ActiveMQ的.Net客戶端
Install-Package Apache.NMS.ActiveMQ
主要代碼:
1 using Apache.NMS; 2 using Apache.NMS.ActiveMQ; 3 using System; 4 using System.Collections.Generic; 5 using System.Linq; 6 using System.Text; 7 using System.Threading.Tasks; 8 9 namespace ActiveMQNetCustomer 10 { 11 class Program 12 { 13 static IConnectionFactory _factory = null; 14 15 static void Main(string[] args) 16 { 17 try 18 { 19 //建立鏈接工廠 20 _factory = new ConnectionFactory("tcp://127.0.0.1:61616/"); 21 //建立鏈接 22 using (IConnection conn = _factory.CreateConnection()) 23 { 24 //設置客戶端ID 25 // conn.ClientId = "Customer"; 26 conn.Start(); 27 //建立會話 28 using (ISession session = conn.CreateSession()) 29 { 30 //建立主題 31 var topic = new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("topic"); 32 33 //建立消費者 34 IMessageConsumer consumer = session.CreateDurableConsumer(topic, "Customer", null, false); 35 36 //註冊監聽事件 37 consumer.Listener += new MessageListener(consumer_Listener); 38 39 //這句代碼很是重要, 40 //這裏沒有read方法,Session會話會被關閉,那麼消費者將監聽不到生產者的消息 41 Console.Read(); 42 } 43 44 //關閉鏈接 45 conn.Stop(); 46 conn.Close(); 47 } 48 49 } 50 catch (Exception ex) 51 { 52 Console.Write(ex.ToString()); 53 } 54 55 } 56 57 /// <summary> 58 /// 消費監聽事件 59 /// </summary> 60 /// <param name="message"></param> 61 static void consumer_Listener(IMessage message) 62 { 63 ITextMessage msg = (ITextMessage)message; 64 Console.WriteLine("Receive: " + msg.Text); 65 } 66 } 67 }
三、建立包裝ActiveMQ生產者和消費者項目,供SignalR.ActiveMQ.WebHost項目使用,來發布消息和訂閱消息
建立類庫項目Signalr.ActiveMQ,而後用包管理器安裝ActiveMQ的.Net客戶端
Install-Package Apache.NMS.ActiveMQ
主要代碼;
生產者類:建立單實例生產者對象調用Send發放,發送消息到ActiveMQ消息隊列
using Apache.NMS; using Apache.NMS.ActiveMQ; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Signalr.ActiveMQ { public class Procucer { private IMessageProducer producer; private static Procucer instance=null; private Procucer(string customerId,string address) { instance = this; //建立工廠 IConnectionFactory _factory = new ConnectionFactory("tcp://127.0.0.1:61616/"); try { //建立鏈接 IConnection _connection = _factory.CreateConnection(); { //建立會話 ISession session = _connection.CreateSession(); { //建立一個主題 IDestination destination = new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("topic"); //建立生產者 producer = session.CreateProducer(destination); Console.WriteLine("Please enter any key to continue! "); // Console.ReadKey(); Console.WriteLine("Sending: "); } } } catch (Exception ex) { Console.WriteLine(ex.ToString()); } //Console.ReadLine(); } public static Procucer GetInstance(string customerId="",string address= "tcp://127.0.0.1:61616/") { if (instance == null) instance = new Procucer(customerId, address); return instance; } public void Send(string msg) { //建立一個文本消息 ITextMessage _message = producer.CreateTextMessage(msg); //發送消息 producer.Send(_message, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue); } } }
消費者類:啓用單獨的線程監聽消息隊列中的消息,當監聽到消息後 廣播給全部的 SinglaR客戶端,其中靜態屬性Clients保存了全部的SinglaR客戶端,當SinglaR客戶端鏈接或者斷開的時候會更新Clients屬性詳細代碼在SignalR.ActiveMQ.WebHost中的 MyHub文件中。爲了阻止當前線程退出調用了 System.Threading.Thread.CurrentThread.Join();阻塞當前線程,避免當web中方法執行完畢後對象被回收,起不到監聽消息隊列的做用。
using Apache.NMS; using Apache.NMS.ActiveMQ; using Microsoft.AspNet.SignalR.Hubs; using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Web; namespace SignalR.ActiveMQ { public class Customer { private static object lockObj = new object(); private static IHubCallerConnectionContext<dynamic> _clients; public static IHubCallerConnectionContext<dynamic> Clients { get { return _clients; } set { lock (lockObj) { _clients = value; } } } public static void Run(string cutomerId="",string address= "tcp://127.0.0.1:61616/") { System.Threading.Thread t = new System.Threading.Thread(() => { try { //建立鏈接工廠 IConnectionFactory _factory = new ConnectionFactory(address); //建立鏈接 using (IConnection conn = _factory.CreateConnection()) { //設置客戶端ID conn.ClientId = cutomerId; conn.Start(); //建立會話 using (ISession session = conn.CreateSession()) { //建立主題 var topic = new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("topic"); //建立消費者 IMessageConsumer consumer = session.CreateDurableConsumer(topic, "Customer", null, false); //註冊監聽事件 consumer.Listener += new MessageListener(consumer_Listener); //阻塞當前線程,監聽消息 System.Threading.Thread.CurrentThread.Join(); } //關閉鏈接 conn.Stop(); conn.Close(); } } catch (Exception ex) { Debug.WriteLine(ex.ToString()); Console.WriteLine(ex.ToString()); } }); t.Start(); } static void consumer_Listener(IMessage message) { ITextMessage msg = (ITextMessage)message; if (Clients != null) { Clients.All.broadcastMessage(msg.Text); } Debug.WriteLine("Receive: " + msg.Text); Console.WriteLine("Receive: " + msg.Text); } } }
四、建立web自宿主的SignalR項目,該項目既發佈消息,也訂閱消息
建立MVC項目SignalR.ActiveMQ.WebHost,而後用包管理器安裝ActiveMQ的.Net客戶端
Install-Package Apache.NMS.ActiveMQ
建立SignalR的hub:當有客戶端鏈接或者斷開的時候更新Customer.Clients 靜態屬性,保存全部的SignalR客戶端。
web端經過調用代理的Send方法發送消息到消息隊列。
using System; using System.Collections.Generic; using System.Linq; using System.Web; using Microsoft.AspNet.SignalR; using Signalr.ActiveMQ; using System.Threading.Tasks; namespace SignalR.ActiveMQ.Sample.Signal.Class { public class chatHub : Hub { public void Send(string clientName, string message) { Procucer.GetInstance().Send(message); } public override Task OnConnected() { Customer.Clients = this.Clients; return base.OnConnected(); } public override Task OnDisconnected(bool stopCalled) { Customer.Clients = this.Clients; return base.OnDisconnected(stopCalled); } } }
Startup類中啓動消費者監聽線程,調用的項目Signalr.ActiveMQ中的Customer.Run()方法:
using Microsoft.AspNet.SignalR; using Microsoft.Owin; using Owin; using SignalR.ActiveMQ; [assembly: OwinStartupAttribute(typeof(SignalR.ActiveMQ.Sample.Startup))] namespace SignalR.ActiveMQ.Sample { public partial class Startup { public void Configuration(IAppBuilder app) { app.MapSignalR(); Customer.Run();//啓動消費者監聽線程 } } }
2、啓動順序:
一、啓動ActiveMQ程序 可參考 http://www.cnblogs.com/xwdreamer/archive/2012/02/21/2360818.html
二、啓動ActiveMQNetProcucer項目
三、ActiveMQNetCustomer項目
四、啓動SignalR.ActiveMQ.WebHost,開多個瀏覽器窗口,模擬多個SignalR客戶端
3、SignalR宿主和web客戶端分離兩個項目
Signalr.ActiveMQ.SelfHost 用控制檯寄宿SignalR提供的服務供Signalr.ActiveMQ.Web使用
Signalr.ActiveMQ.Web 經過chart.html調用Signalr.ActiveMQ.SelfHost的服務
Signalr.ActiveMQ.SelfHost 和SignalR.ActiveMQ.WebHost不能同時啓動,如今兩個項目綁定到了同一個端口。
4、測試
在生產者窗口中輸入消息回車,觀察其餘客戶端的變化
在Singlar的web客戶端發送消息,觀察其餘客戶端的變化
源代碼:https://github.com/zhaoyingju/SignalrActiveMQ.git
轉自:https://www.cnblogs.com/zhyj/p/5071447.html