MQTTnet 3.0.5學習筆記 C#的ThreadStart 和 Thread

段時間在使用MQTTnet,都說這個東西比較好,但是翻了翻網上沒有例子給參考一下。html

今天算是找到了,給高手的帖子作個宣傳吧.git

原網址以下:https://blog.csdn.net/chenlu5201314/article/details/94740765github

因爲GitHub上介紹的東西比較少,以個人水平真是不知道怎麼用,先照葫蘆畫瓢,再看看怎麼回事吧:服務器

功能:多線程

把訂閱與發佈作成一個類,還帶有自動重連的功能異步

using System.Threading;    
using System.Threading.Tasks;
using MQTTnet;          
using MQTTnet.Client;      //客戶端須要用到
using MQTTnet.Client.Options;  //具體鏈接時須要用到的屬性,ID的名稱,要鏈接Server的名稱,接入時用到的帳號和密碼,掉線時是否從新清除原有名稱,還有許多...
using MQTTnet.Packets;    //這個沒用上
using MQTTnet.Protocol;    //這個也沒用上
using MQTTnet.Client.Receiving;    //接收
using MQTTnet.Client.Disconnecting;  //斷線
using MQTTnet.Client.Connecting;    //鏈接

新建一個類:先寫一下變量和一些字段async

class HOSMQTT   
{        
private static MqttClient mqttClient = null;        
private static IMqttClientOptions options = null;        
private static bool runState = false;        
private static bool running = false;        
 /// <summary>        
/// 服務器IP        
/// </summary>        
private static string ServerUrl = "182.61.51.85";        
/// <summary>        
/// 服務器端口        
/// </summary>        
private static int Port = 61613;        
/// <summary>        
/// 選項 - 開啓登陸 - 密碼        
/// </summary>        
private static string Password = "ruichi8888";        
/// <summary>        
/// 選項 - 開啓登陸 - 用戶名        
/// </summary>        
private static string UserId = "admin";        
/// <summary>        
/// 主題        
/// <para>China/Hunan/Yiyang/Nanxian</para>        
/// <para>Hotel/Room01/Tv</para>        
/// <para>Hospital/Dept01/Room001/Bed001</para>        
/// <para>Hospital/#</para>        
/// </summary>        
private static string Topic = "China/Hunan/Yiyang/Nanxian";        
/// <summary>        
/// 保留        
/// </summary>        
private static bool Retained = false;       
 /// <summary>       
 /// 服務質量        
/// <para>0 - 至多一次</para>        
/// <para>1 - 至少一次</para>        
/// <para>2 - 恰好一次</para>        
/// </summary>        
private static int QualityOfServiceLevel = 0;
}

先看一下Start方法ide

public static void Start()
        {
            try
            {
                runState = true;
                Thread thread = new Thread(Work);    //原帖中是這樣寫的 Thread thread = new Thread(new ThreadStart( Work));
                thread.IsBackground = true;
                thread.Start();
            }
            catch (Exception ex)
            {
                Console.WriteLine( "啓動客戶端出現問題:" + ex.ToString());
            }
        }

沒進入正題以前,先普及一下基本知識 函數

C#的ThreadStart 和 Thread  多線程,new Thread(t1);和new Thread(new ThreadStart(t1));沒有什麼區別.前者是.net的寫法,後者是C#的寫法

 

具體請看下面的鏈接post

http://www.javashuo.com/article/p-avsvybev-dt.html

進入總體,介紹鏈接方法 Work

private static void Work()
        {
            running = true;
            Console.WriteLine("Work >>Begin");
            try
            {
                var factory = new MqttFactory();        //聲明一個MQTT客戶端的標準步驟 的第一步
                mqttClient = factory.CreateMqttClient() as MqttClient;  //factory.CreateMqttClient()實際是一個接口類型(IMqttClient),這裏是把他的類型變了一下
                options = new MqttClientOptionsBuilder()    //實例化一個MqttClientOptionsBulider
.WithTcpServer(ServerUrl, Port) .WithCredentials(UserId, Password) .WithClientId(
"XMan") .Build();                   mqttClient.ConnectAsync(options);      //鏈接服務器
        
          //下面這些東西是什麼,爲何要這麼寫,直到剛纔我仍是不懂,不過在GitHub的網址我發現了出處. mqttClient.ConnectedHandler
= new MqttClientConnectedHandlerDelegate(new Func<MqttClientConnectedEventArgs, Task>(Connected)); mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(new Func<MqttClientDisconnectedEventArgs, Task>(Disconnected)); mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(new Action<MqttApplicationMessageReceivedEventArgs>(MqttApplicationMessageReceived)); while (runState) { Thread.Sleep(100); } } catch(Exception exp) { Console.WriteLine(exp); } Console.WriteLine("Work >>End"); running = false; runState = false; }

 

先來看看MqttClient 類裏面都有什麼東西

須要實現的接口,如何實現,說重點!

在GitHub上有個地方進去看看就知道了‘

 這個頁面的最下方寫着如何實現    https://github.com/chkr1011/MQTTnet/wiki/Upgrading-guide

private void Something()
{
    mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(OnAppMessage);
    mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnConnected);
    mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnDisconnected);
}

private async void OnAppMessage(MqttApplicationMessageReceivedEventArgs e)
{
}

private async void OnConnected(MqttClientConnectedEventArgs e)
{
}

private async void OnDisconnected(MqttClientDisconnectedEventArgs e)
{
}

在開始Connected方法以前有必要看一下關於同步和異步的知識,

現學現賣簡單說一下:

Task就是異步的調用,就在不影響主線程運行的另外一個線程,可是他能像線程池同樣更高效的利用現有的空閒線程

async必須用來修飾Task ,void,或者Task<TResult>, await是等待異步線程Task.Run()開始的後臺線程執行完畢。

記住要是Task 實現異步功能,必須用 async 修飾,且async 與await成對出現。

詳見下面大神寫的大做:http://www.javashuo.com/article/p-pjqezvdb-ke.html

下面是什麼意思?

mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(new Func<MqttClientConnectedEventArgs, Task>(Connected));

 

MqttClientConnectedHandlerDelegate 這個實例實現了mqttClient.ConnectedHandler接口

new Func<MqttClientConnectedEventArgs, Task>(Connected) ,

使用Func委託傳入MqttClientConnectedEventArgs類型的參數,返回的類型是Task,Task是一個類,這個類沒有返回值,若是有返回值就是Task<TResult>。

是委託就要帶一個方法取實現,這個方法就是Connected。

這句話的意思是,用MqttClientConnectedHandlerDelegate實現接口,同時使用委託取調用Connected的方法,而且給這個方法傳入一個MqttClientConnectedEventArgs參數,

這個委託的返回值是Task(就是不須要返回類型的異步調用),這也就定義了Connected的類型必須是async Task。

好了來看下 Connected,這個函數什麼意思

就是與服務器鏈接以後要幹什麼,訂閱一個Topic,或幾個Topic。鏈接以前已經鏈接了Connectasync(),若是斷線還會重連,後面會提到。

這個就鏈接以後須要作的事----訂閱!

        private static async Task Connected(MqttClientConnectedEventArgs e)
        {
            try
            {
                List<TopicFilter> listTopic = new List<TopicFilter>();
                if (listTopic.Count() <= 0)
                {
                    var topicFilterBulder = new TopicFilterBuilder().WithTopic(Topic).Build();
                    listTopic.Add(topicFilterBulder);
                    Console.WriteLine("Connected >>Subscribe " + Topic);
                }                await mqttClient.SubscribeAsync(listTopic.ToArray());
                Console.WriteLine("Connected >>Subscribe Success");
            }
            catch (Exception exp)
            {
                Console.WriteLine(exp.Message);
            }
        }

TopicFilter是一個Topic詳細信息的類

 

 掉線的發生時會執行這個函數

private static async Task Disconnected(MqttClientDisconnectedEventArgs e)
        {
            try
            {
                Console.WriteLine("Disconnected >>Disconnected Server");
                await Task.Delay(TimeSpan.FromSeconds(5));
                try
                {
                    await mqttClient.ConnectAsync(options);
                }
                catch (Exception exp)
                {
                    Console.WriteLine("Disconnected >>Exception " + exp.Message);
                }
            }
            catch (Exception exp)
            {
                Console.WriteLine(exp.Message);
            }
        }

越寫問題越多,這個爲何斷線的時候會執行這個方法,這不是事件,只是接口!

怎麼實現的?看了一下源碼,一時只看了大概,這些功能的綁定都是在ConnectAsync的時候就完成了!

 

下面接收到消息的時候

        /// <summary>
        /// 接收消息觸發事件
        /// </summary>
        /// <param name="e"></param>
        private static void MqttApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
        {
            try
            {
                string text = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
                string Topic = e.ApplicationMessage.Topic; string QoS = e.ApplicationMessage.QualityOfServiceLevel.ToString();
                string Retained = e.ApplicationMessage.Retain.ToString();
                Console.WriteLine("MessageReceived >>Topic:" + Topic + "; QoS: " + QoS + "; Retained: " + Retained + ";");
                Console.WriteLine("MessageReceived >>Msg: " + text);
            }
            catch (Exception exp)
            {
                Console.WriteLine(exp.Message);
            }
        }

 

 最後就是發佈:通常會選擇0,若是選擇其餘的狀況在訂閱端不在的時候,服務器可能會崩潰

/// <summary>       
        /// /// 發佈        
        /// <paramref name="QoS"/>        
        /// <para>0 - 最多一次</para>        
        /// <para>1 - 至少一次</para>        
        /// <para>2 - 僅一次</para>        
        /// </summary>       
        /// <param name="Topic">發佈主題</param>        
        /// <param name="Message">發佈內容</param>        
        /// <returns></returns>        
        public static void Publish( string Topic,string Message)
        {
            try
            {
                if (mqttClient == null)
                    return;
                if (mqttClient.IsConnected == false)
                    mqttClient.ConnectAsync(options);
                if (mqttClient.IsConnected == false)
                {
                    Console.WriteLine("Publish >>Connected Failed! ");
                    return;
                }
                Console.WriteLine("Publish >>Topic: " + Topic + "; QoS: " + QualityOfServiceLevel + "; Retained: " + Retained + ";");
                Console.WriteLine("Publish >>Message: " + Message);
                MqttApplicationMessageBuilder mamb = new MqttApplicationMessageBuilder()                 
                    .WithTopic(Topic)                 
                    .WithPayload(Message).WithRetainFlag(Retained);
                if (QualityOfServiceLevel == 0)
                {
                    mamb = mamb.WithAtMostOnceQoS();
                }
                else if (QualityOfServiceLevel == 1)
                {
                    mamb = mamb.WithAtLeastOnceQoS();
                }
                else if (QualityOfServiceLevel == 2)
                {
                    mamb = mamb.WithExactlyOnceQoS();
                }
                mqttClient.PublishAsync(mamb.Build());
            }
            catch (Exception exp)
            {
                Console.WriteLine("Publish >>" + exp.Message);
            }
        }

 

 

 

 紙上得來終覺淺,要改形成本身想要的些東西,還要花些功夫!不過這已經很好了!謝謝各位高手的貢獻

相關文章
相關標籤/搜索