NetMQ 發佈訂閱模式 Publisher-Subscriber

第一部分引用於:點擊打開html

1:簡單介紹

PUB-SUB模式通常處理的都不是系統的關鍵數據。發佈者不關注訂閱者是否收到發佈的消息,訂閱者也不知道本身是否收到了發佈者發出的全部消息。你也不知道訂閱者什麼時候開始收到消息。相似於廣播,收音機。所以邏輯上,它都不是可靠的。這個能夠經過與請求響應模型組合來解決。app


圖1:簡單的發佈訂閱模式tcp


圖2:與請求響應模式組合的發佈訂閱模式測試

2:案例google

定義IPublishser接口spa

namespace NetMQDemoPublisher
{
    public interface IPublisher:IDisposable
    {
        /// <summary>
        /// 發佈消息
        /// </summary>
        /// <param name="topicName">主題</param>
        /// <param name="data">內容</param>
        void Publish(string topicName, string data);
    }
}

 

Publisher實現類3d

namespace NetMQDemoPublisher
{
    public class Publisher:IPublisher
    {
        private object _lockObject = new object();

        private PublisherSocket _publisherSocket;

        public Publisher(string endPoint)
        {
            _publisherSocket = new PublisherSocket();
            _publisherSocket.Options.SendHighWatermark = 1000;
            _publisherSocket.Bind(endPoint);
        }
        #region Implementation of IDisposable

        /// <summary>
        /// 執行與釋放或重置非託管資源相關的應用程序定義的任務。
        /// </summary>
        public void Dispose()
        {
            lock (_lockObject)
            {
                _publisherSocket.Close();
                _publisherSocket.Dispose();
            }
        }

        /// <summary>
        /// 發佈消息
        /// </summary>
        /// <param name="topicName">主題</param>
        /// <param name="data">內容</param>
        public void Publish(string topicName, string data)
        {
            lock (_lockObject)
            {
                _publisherSocket.SendMoreFrame(topicName).SendFrame(data);
            }
        }

        #endregion
    }
}

 

Publisher窗口界面code

界面中實現的功能代碼orm

namespace NetMQDemoPublisher
{
    public partial class PublisherForm : Form
    {
        private IPublisher publisher;
        public PublisherForm()
        {
            InitializeComponent();
            publisher = new Publisher("tcp://127.0.0.1:8888");
        }

        private void button1_Click(object sender, EventArgs e)
        {
            string strContent = textBox1.Text;
            ListViewItem item = new ListViewItem(string.Format("topic:NetMQ,Data:{0}",  strContent));
            listView1.Items.Add(item);
            publisher.Publish("NetMQ", strContent);
        }
    }
}

 

定義ISubscriber接口htm

namespace NetMQDemoSubscriber
{
    public interface ISubscriber:IDisposable
    {
        /// <summary>
        /// 事件
        /// </summary>
        event Action<string, string> Nofity;

        /// <summary>
        /// 註冊訂閱主題
        /// </summary>
        /// <param name="topics"></param>
        void RegisterSubscriber(List<string> topics);

        /// <summary>
        /// 註冊訂閱
        /// </summary>
        void RegisterSbuscriberAll();

        /// <summary>
        /// 移除全部訂閱消息,並關閉
        /// </summary>
        void RemoveSbuscriberAll();
    }
}

 

Subscriber實現類

namespace NetMQDemoSubscriber
{
    public class Subscriber:ISubscriber
    {
        private SubscriberSocket _subscriberSocket = null;
        private string _endpoint = @"tcp://127.0.0.1:9876";

        public Subscriber(string endPoint)
        {
            _subscriberSocket = new SubscriberSocket();
            _endpoint = endPoint;
        }
        #region Implementation of IDisposable

        /// <summary>
        /// 執行與釋放或重置非託管資源相關的應用程序定義的任務。
        /// </summary>
        public void Dispose()
        {
            throw new NotImplementedException();
        }

        #endregion

        #region Implementation of ISubscriber

        public event Action<string, string> Nofity = delegate { };

        /// <summary>
        /// 註冊訂閱主題
        /// </summary>
        /// <param name="topics"></param>
        public void RegisterSubscriber(List<string> topics)
        {
            InnerRegisterSubscriber(topics);
        }

        /// <summary>
        /// 註冊訂閱
        /// </summary>
        public void RegisterSbuscriberAll()
        {
            InnerRegisterSubscriber();
        }

        /// <summary>
        /// 移除全部訂閱消息,並關閉
        /// </summary>
        public void RemoveSbuscriberAll()
        {
            InnerStop();
        }

        #endregion

        #region 內部實現

        /// <summary>
        /// 註冊訂閱消息
        /// </summary>
        /// <param name="topics">訂閱的主題</param>
        private void InnerRegisterSubscriber(List<string> topics = null)
        {
            InnerStop();
            _subscriberSocket = new SubscriberSocket();
            _subscriberSocket.Options.ReceiveHighWatermark = 1000;
            _subscriberSocket.Connect(_endpoint);
            if (null == topics)
            {
                _subscriberSocket.SubscribeToAnyTopic();
            }
            else
            {
                topics.ForEach(item => _subscriberSocket.Subscribe(item));
            }
            Task.Factory.StartNew(() =>
            {
                while (true)
                {
                    string messageTopicReceived = _subscriberSocket.ReceiveFrameString();
                    string messageReceived = _subscriberSocket.ReceiveFrameString();
                    Nofity(messageTopicReceived, messageReceived);
                }
            });
        }

        /// <summary>
        /// 關閉訂閱
        /// </summary>
        private void InnerStop()
        {
            _subscriberSocket.Close();
        }

        #endregion
    }
}

 

Subscriber窗口界面

窗體功能代碼

namespace NetMQDemoSubscriber
{
    public partial class SubscriberForm : Form
    {
        private ISubscriber subscriber;
        public SubscriberForm()
        {
            InitializeComponent();
        }

        private void SubscriberForm_Load(object sender, EventArgs e)
        {
            subscriber = new Subscriber("tcp://127.0.0.1:8888");
            subscriber.RegisterSbuscriberAll();
            subscriber.Nofity+= delegate(string s, string s1)
            {
                ListViewItem item = new ListViewItem(string.Format("topic:{0},Data:{1}", s, s1));
                listView1.Items.Add(item);
            };
        }
    }
}

 

運行後,Publiser開啓一個,Subscirber開啓三個,進行測試如圖

源碼下載

若是以爲文章好,記得關注一下公衆號喲!

相關文章
相關標籤/搜索