【轉】SignalR與ActiveMQ結合構建實時通訊

1、概述

本教程主要闡釋瞭如何利用SignalR與消息隊列的結合,實現不一樣客戶端的交互html

  • SignalR如何和消息隊列交互(暫使用ActiveMQ消息隊列)
  • SignalR寄宿在web中和其餘SignalR、控制檯客戶端交互。
  • SignalR單獨寄宿在控制檯中和其餘SignalR、控制檯客戶端交互。

下面屏幕截圖展現了各個客戶端經過ActiveMQ相互通訊git

  一、SignalR寄宿在web:github

  二、SignalR寄宿在控制檯中,web客戶端調用SignalR,讀者自行測試。web

 工程目錄:瀏覽器

 

1、建立項目session

  一、建立生產者項目,該項目要是經過控制檯輸入消息,發送到消息隊列app

    建立控制檯應用程序命名爲ActiveMQNetProcucer,而後用包管理器安裝ActiveMQ的.Net客戶端tcp

    Install-Package Apache.NMS.ActiveMQide

    主要代碼以下:測試

 1 using Apache.NMS;
 2 using Apache.NMS.ActiveMQ;
 3 using System;
 4 using System.Collections.Generic;
 5 using System.Linq;
 6 using System.Text;
 7 using System.Threading.Tasks;
 8 namespace ActiveMQNet
 9 {
10     class Program
11     {
12         static IConnectionFactory _factory = null;
13         static IConnection _connection = null;
14         static ITextMessage _message = null;
15 
16         static void Main(string[] args)
17         {
18             //建立工廠
19             _factory = new ConnectionFactory("tcp://127.0.0.1:61616/");
20 
21             try
22             {
23                 //建立鏈接
24                 using (_connection = _factory.CreateConnection())
25                 {
26                     //建立會話
27                     using (ISession session = _connection.CreateSession())
28                     {
29                         //建立一個主題
30                         IDestination destination = new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("topic");
31 
32                         //建立生產者
33                         IMessageProducer producer = session.CreateProducer(destination);
34 
35                         Console.WriteLine("Please enter any key to continue! ");
36                         Console.ReadKey();
37                         Console.WriteLine("Sending: ");
38 
39                         //建立一個文本消息
40                         _message = producer.CreateTextMessage("Hello AcitveMQ....");
41 
42                         //發送消息
43                         producer.Send(_message, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue);
44                         while (true)
45                         {
46                             var msg = Console.ReadLine();
47                             _message = producer.CreateTextMessage(msg);
48                             producer.Send(_message, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue);
49                         }
50                        
51                     }
52                 }
53 
54             }
55             catch (Exception ex)
56             {
57                 Console.WriteLine(ex.ToString());
58             }
59 
60             Console.ReadLine();
61 
62         }
63     }
64 }
View Code

   二、建立消費者項目,該項目主要是訂閱消息隊列中的消息  

    建立控制檯應用程序命名爲ActiveMQNetCustomer,而後用包管理器安裝ActiveMQ的.Net客戶端

    Install-Package Apache.NMS.ActiveMQ

    主要代碼:

 1 using Apache.NMS;
 2 using Apache.NMS.ActiveMQ;
 3 using System;
 4 using System.Collections.Generic;
 5 using System.Linq;
 6 using System.Text;
 7 using System.Threading.Tasks;
 8 
 9 namespace ActiveMQNetCustomer
10 {
11     class Program
12     {
13         static IConnectionFactory _factory = null;
14 
15         static void Main(string[] args)
16         {
17             try
18             {
19                 //建立鏈接工廠
20                 _factory = new ConnectionFactory("tcp://127.0.0.1:61616/");
21                 //建立鏈接
22                 using (IConnection conn = _factory.CreateConnection())
23                 {
24                     //設置客戶端ID
25                    // conn.ClientId = "Customer";
26                     conn.Start();
27                     //建立會話
28                     using (ISession session = conn.CreateSession())
29                     {
30                         //建立主題
31                         var topic = new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("topic");
32 
33                         //建立消費者
34                         IMessageConsumer consumer = session.CreateDurableConsumer(topic, "Customer", null, false);
35 
36                         //註冊監聽事件
37                         consumer.Listener += new MessageListener(consumer_Listener);
38 
39                         //這句代碼很是重要,
40                         //這裏沒有read方法,Session會話會被關閉,那麼消費者將監聽不到生產者的消息
41                         Console.Read();
42                     }
43 
44                     //關閉鏈接
45                     conn.Stop();
46                     conn.Close();
47                 }
48 
49             }
50             catch (Exception ex)
51             {
52                 Console.Write(ex.ToString());
53             }
54 
55         }
56 
57         /// <summary>
58         /// 消費監聽事件
59         /// </summary>
60         /// <param name="message"></param>
61         static void consumer_Listener(IMessage message)
62         {
63             ITextMessage msg = (ITextMessage)message;
64             Console.WriteLine("Receive: " + msg.Text);
65         }
66     }
67 }
View Code

   三、建立包裝ActiveMQ生產者和消費者項目,供SignalR.ActiveMQ.WebHost項目使用,來發布消息和訂閱消息

    建立類庫項目Signalr.ActiveMQ,而後用包管理器安裝ActiveMQ的.Net客戶端

    Install-Package Apache.NMS.ActiveMQ

    主要代碼;

    生產者類:建立單實例生產者對象調用Send發放,發送消息到ActiveMQ消息隊列    

using Apache.NMS;
using Apache.NMS.ActiveMQ;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Signalr.ActiveMQ
{
  public  class Procucer
    {
        private IMessageProducer producer;
        private static Procucer instance=null;
        private Procucer(string customerId,string address)
        {
            instance = this;
            //建立工廠
            IConnectionFactory _factory = new ConnectionFactory("tcp://127.0.0.1:61616/");

            try
            {
                //建立鏈接
                IConnection _connection = _factory.CreateConnection();
                {
                    //建立會話
                    ISession session = _connection.CreateSession();
                    {
                        //建立一個主題
                        IDestination destination = new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("topic");

                        //建立生產者
                        producer = session.CreateProducer(destination);

                        Console.WriteLine("Please enter any key to continue! ");
                      //  Console.ReadKey();
                        Console.WriteLine("Sending: ");                      

                    }
                }

            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.ToString());
            }

            //Console.ReadLine();
        }

        public static Procucer GetInstance(string customerId="",string address= "tcp://127.0.0.1:61616/")
        {
            if (instance == null)
                instance = new Procucer(customerId, address);
            return instance;
        }

        public void Send(string msg)
        {
            //建立一個文本消息
            ITextMessage _message = producer.CreateTextMessage(msg);
            //發送消息
            producer.Send(_message, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue);
        }
    }
}
View Code

 

     消費者類:啓用單獨的線程監聽消息隊列中的消息,當監聽到消息後 廣播給全部的 SinglaR客戶端,其中靜態屬性Clients保存了全部的SinglaR客戶端,當SinglaR客戶端鏈接或者斷開的時候會更新Clients屬性詳細代碼在SignalR.ActiveMQ.WebHost中的 MyHub文件中。爲了阻止當前線程退出調用了 System.Threading.Thread.CurrentThread.Join();阻塞當前線程,避免當web中方法執行完畢後對象被回收,起不到監聽消息隊列的做用。

using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Microsoft.AspNet.SignalR.Hubs;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Web;

namespace SignalR.ActiveMQ
{
    public class Customer
    {
        private static object lockObj = new object();
        private static IHubCallerConnectionContext<dynamic> _clients;
        public static IHubCallerConnectionContext<dynamic> Clients
        {
            get { return _clients; }
            set
            {
                lock (lockObj)
                {
                    _clients = value;
                }
            }
        }
        public static void Run(string cutomerId="",string address= "tcp://127.0.0.1:61616/")
        {
            System.Threading.Thread t = new System.Threading.Thread(() =>
            {
                try
                {
                    //建立鏈接工廠
                    IConnectionFactory _factory = new ConnectionFactory(address);
                    //建立鏈接
                    using (IConnection conn = _factory.CreateConnection())
                    {
                        //設置客戶端ID
                        conn.ClientId = cutomerId;
                        conn.Start();
                        //建立會話
                        using (ISession session = conn.CreateSession())
                        {
                            //建立主題
                            var topic = new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("topic");

                            //建立消費者
                            IMessageConsumer consumer = session.CreateDurableConsumer(topic, "Customer", null, false);

                            //註冊監聽事件
                            consumer.Listener += new MessageListener(consumer_Listener);

                            //阻塞當前線程,監聽消息
                            System.Threading.Thread.CurrentThread.Join();
                        }
                        //關閉鏈接
                        conn.Stop();
                        conn.Close();
                    }

                }
                catch (Exception ex)
                {
                    Debug.WriteLine(ex.ToString());
                    Console.WriteLine(ex.ToString());
                }
               
            });

            t.Start();
        }
        static void consumer_Listener(IMessage message)
        {
            ITextMessage msg = (ITextMessage)message;
            if (Clients != null)
            {
                Clients.All.broadcastMessage(msg.Text);
            }
            Debug.WriteLine("Receive: " + msg.Text);
            Console.WriteLine("Receive: " + msg.Text);
        }
    }
}
View Code

   四、建立web自宿主的SignalR項目,該項目既發佈消息,也訂閱消息

    建立MVC項目SignalR.ActiveMQ.WebHost,而後用包管理器安裝ActiveMQ的.Net客戶端

    Install-Package Apache.NMS.ActiveMQ

    建立SignalR的hub:當有客戶端鏈接或者斷開的時候更新Customer.Clients 靜態屬性,保存全部的SignalR客戶端。

    web端經過調用代理的Send方法發送消息到消息隊列。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Web;
using Microsoft.AspNet.SignalR;
using Signalr.ActiveMQ;
using System.Threading.Tasks;

namespace SignalR.ActiveMQ.Sample.Signal.Class
{   
    public class chatHub : Hub
    {
        public void Send(string clientName, string message)
        {
            Procucer.GetInstance().Send(message);            
        }
        public override Task OnConnected()
        {
            Customer.Clients = this.Clients;
            return base.OnConnected();
        }

        public override Task OnDisconnected(bool stopCalled)
        {
            Customer.Clients = this.Clients;
            return base.OnDisconnected(stopCalled);
        }
    }
}
View Code

    Startup類中啓動消費者監聽線程,調用的項目Signalr.ActiveMQ中的Customer.Run()方法:

using Microsoft.AspNet.SignalR;
using Microsoft.Owin;
using Owin;
using SignalR.ActiveMQ;

[assembly: OwinStartupAttribute(typeof(SignalR.ActiveMQ.Sample.Startup))]
namespace SignalR.ActiveMQ.Sample
{
    public partial class Startup
    {
        public void Configuration(IAppBuilder app)
        {          
            app.MapSignalR();
         
            Customer.Run();//啓動消費者監聽線程
        }
    }
}
View Code

 2、啓動順序:

一、啓動ActiveMQ程序 可參考  http://www.cnblogs.com/xwdreamer/archive/2012/02/21/2360818.html

二、啓動ActiveMQNetProcucer項目

三、ActiveMQNetCustomer項目

四、啓動SignalR.ActiveMQ.WebHost,開多個瀏覽器窗口,模擬多個SignalR客戶端 

 3、SignalR宿主和web客戶端分離兩個項目 

Signalr.ActiveMQ.SelfHost 用控制檯寄宿SignalR提供的服務供Signalr.ActiveMQ.Web使用

Signalr.ActiveMQ.Web 經過chart.html調用Signalr.ActiveMQ.SelfHost的服務 

Signalr.ActiveMQ.SelfHost 和SignalR.ActiveMQ.WebHost不能同時啓動,如今兩個項目綁定到了同一個端口。

4、測試

  在生產者窗口中輸入消息回車,觀察其餘客戶端的變化

     在Singlar的web客戶端發送消息,觀察其餘客戶端的變化

源代碼:https://github.com/zhaoyingju/SignalrActiveMQ.git

轉自:https://www.cnblogs.com/zhyj/p/5071447.html

相關文章
相關標籤/搜索