段時間在使用MQTTnet,都說這個東西比較好,但是翻了翻網上沒有例子給參考一下。html
今天算是找到了,給高手的帖子作個宣傳吧.git
原網址以下:https://blog.csdn.net/chenlu5201314/article/details/94740765github
因爲GitHub上介紹的東西比較少,以個人水平真是不知道怎麼用,先照葫蘆畫瓢,再看看怎麼回事吧:服務器
功能:多線程
把訂閱與發佈作成一個類,還帶有自動重連的功能異步
using System.Threading; using System.Threading.Tasks; using MQTTnet; using MQTTnet.Client; //客戶端須要用到 using MQTTnet.Client.Options; //具體鏈接時須要用到的屬性,ID的名稱,要鏈接Server的名稱,接入時用到的帳號和密碼,掉線時是否從新清除原有名稱,還有許多... using MQTTnet.Packets; //這個沒用上 using MQTTnet.Protocol; //這個也沒用上 using MQTTnet.Client.Receiving; //接收 using MQTTnet.Client.Disconnecting; //斷線 using MQTTnet.Client.Connecting; //鏈接
新建一個類:先寫一下變量和一些字段async
class HOSMQTT { private static MqttClient mqttClient = null; private static IMqttClientOptions options = null; private static bool runState = false; private static bool running = false; /// <summary> /// 服務器IP /// </summary> private static string ServerUrl = "182.61.51.85"; /// <summary> /// 服務器端口 /// </summary> private static int Port = 61613; /// <summary> /// 選項 - 開啓登陸 - 密碼 /// </summary> private static string Password = "ruichi8888"; /// <summary> /// 選項 - 開啓登陸 - 用戶名 /// </summary> private static string UserId = "admin"; /// <summary> /// 主題 /// <para>China/Hunan/Yiyang/Nanxian</para> /// <para>Hotel/Room01/Tv</para> /// <para>Hospital/Dept01/Room001/Bed001</para> /// <para>Hospital/#</para> /// </summary> private static string Topic = "China/Hunan/Yiyang/Nanxian"; /// <summary> /// 保留 /// </summary> private static bool Retained = false; /// <summary> /// 服務質量 /// <para>0 - 至多一次</para> /// <para>1 - 至少一次</para> /// <para>2 - 恰好一次</para> /// </summary> private static int QualityOfServiceLevel = 0; }
先看一下Start方法ide
public static void Start() { try { runState = true; Thread thread = new Thread(Work); //原帖中是這樣寫的 Thread thread = new Thread(new ThreadStart( Work)); thread.IsBackground = true; thread.Start(); } catch (Exception ex) { Console.WriteLine( "啓動客戶端出現問題:" + ex.ToString()); } }
沒進入正題以前,先普及一下基本知識 函數
具體請看下面的鏈接post
http://www.javashuo.com/article/p-avsvybev-dt.html
進入總體,介紹鏈接方法 Work
private static void Work() { running = true; Console.WriteLine("Work >>Begin"); try { var factory = new MqttFactory(); //聲明一個MQTT客戶端的標準步驟 的第一步 mqttClient = factory.CreateMqttClient() as MqttClient; //factory.CreateMqttClient()實際是一個接口類型(IMqttClient),這裏是把他的類型變了一下 options = new MqttClientOptionsBuilder() //實例化一個MqttClientOptionsBulider
.WithTcpServer(ServerUrl, Port) .WithCredentials(UserId, Password) .WithClientId("XMan") .Build(); mqttClient.ConnectAsync(options); //鏈接服務器
//下面這些東西是什麼,爲何要這麼寫,直到剛纔我仍是不懂,不過在GitHub的網址我發現了出處. mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(new Func<MqttClientConnectedEventArgs, Task>(Connected)); mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(new Func<MqttClientDisconnectedEventArgs, Task>(Disconnected)); mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(new Action<MqttApplicationMessageReceivedEventArgs>(MqttApplicationMessageReceived)); while (runState) { Thread.Sleep(100); } } catch(Exception exp) { Console.WriteLine(exp); } Console.WriteLine("Work >>End"); running = false; runState = false; }
先來看看MqttClient 類裏面都有什麼東西
須要實現的接口,如何實現,說重點!
在GitHub上有個地方進去看看就知道了‘
這個頁面的最下方寫着如何實現 https://github.com/chkr1011/MQTTnet/wiki/Upgrading-guide
private void Something() { mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(OnAppMessage); mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnConnected); mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnDisconnected); } private async void OnAppMessage(MqttApplicationMessageReceivedEventArgs e) { } private async void OnConnected(MqttClientConnectedEventArgs e) { } private async void OnDisconnected(MqttClientDisconnectedEventArgs e) { }
在開始Connected方法以前有必要看一下關於同步和異步的知識,
現學現賣簡單說一下:
Task就是異步的調用,就在不影響主線程運行的另外一個線程,可是他能像線程池同樣更高效的利用現有的空閒線程
async必須用來修飾Task ,void,或者Task<TResult>, await是等待異步線程Task.Run()開始的後臺線程執行完畢。
記住要是Task 實現異步功能,必須用 async 修飾,且async 與await成對出現。
詳見下面大神寫的大做:http://www.javashuo.com/article/p-pjqezvdb-ke.html
下面是什麼意思?
mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(new Func<MqttClientConnectedEventArgs, Task>(Connected));
MqttClientConnectedHandlerDelegate 這個實例實現了mqttClient.ConnectedHandler接口
new Func<MqttClientConnectedEventArgs, Task>(Connected) ,
使用Func委託傳入MqttClientConnectedEventArgs類型的參數,返回的類型是Task,Task是一個類,這個類沒有返回值,若是有返回值就是Task<TResult>。
是委託就要帶一個方法取實現,這個方法就是Connected。
這句話的意思是,用MqttClientConnectedHandlerDelegate實現接口,同時使用委託取調用Connected的方法,而且給這個方法傳入一個MqttClientConnectedEventArgs參數,
這個委託的返回值是Task(就是不須要返回類型的異步調用),這也就定義了Connected的類型必須是async Task。
好了來看下 Connected,這個函數什麼意思
就是與服務器鏈接以後要幹什麼,訂閱一個Topic,或幾個Topic。鏈接以前已經鏈接了Connectasync(),若是斷線還會重連,後面會提到。
這個就鏈接以後須要作的事----訂閱!
private static async Task Connected(MqttClientConnectedEventArgs e) { try { List<TopicFilter> listTopic = new List<TopicFilter>(); if (listTopic.Count() <= 0) { var topicFilterBulder = new TopicFilterBuilder().WithTopic(Topic).Build(); listTopic.Add(topicFilterBulder); Console.WriteLine("Connected >>Subscribe " + Topic); } await mqttClient.SubscribeAsync(listTopic.ToArray()); Console.WriteLine("Connected >>Subscribe Success"); } catch (Exception exp) { Console.WriteLine(exp.Message); } }
TopicFilter是一個Topic詳細信息的類
掉線的發生時會執行這個函數
private static async Task Disconnected(MqttClientDisconnectedEventArgs e) { try { Console.WriteLine("Disconnected >>Disconnected Server"); await Task.Delay(TimeSpan.FromSeconds(5)); try { await mqttClient.ConnectAsync(options); } catch (Exception exp) { Console.WriteLine("Disconnected >>Exception " + exp.Message); } } catch (Exception exp) { Console.WriteLine(exp.Message); } }
越寫問題越多,這個爲何斷線的時候會執行這個方法,這不是事件,只是接口!
怎麼實現的?看了一下源碼,一時只看了大概,這些功能的綁定都是在ConnectAsync的時候就完成了!
下面接收到消息的時候
/// <summary> /// 接收消息觸發事件 /// </summary> /// <param name="e"></param> private static void MqttApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e) { try { string text = Encoding.UTF8.GetString(e.ApplicationMessage.Payload); string Topic = e.ApplicationMessage.Topic; string QoS = e.ApplicationMessage.QualityOfServiceLevel.ToString(); string Retained = e.ApplicationMessage.Retain.ToString(); Console.WriteLine("MessageReceived >>Topic:" + Topic + "; QoS: " + QoS + "; Retained: " + Retained + ";"); Console.WriteLine("MessageReceived >>Msg: " + text); } catch (Exception exp) { Console.WriteLine(exp.Message); } }
最後就是發佈:通常會選擇0,若是選擇其餘的狀況在訂閱端不在的時候,服務器可能會崩潰
/// <summary> /// /// 發佈 /// <paramref name="QoS"/> /// <para>0 - 最多一次</para> /// <para>1 - 至少一次</para> /// <para>2 - 僅一次</para> /// </summary> /// <param name="Topic">發佈主題</param> /// <param name="Message">發佈內容</param> /// <returns></returns> public static void Publish( string Topic,string Message) { try { if (mqttClient == null) return; if (mqttClient.IsConnected == false) mqttClient.ConnectAsync(options); if (mqttClient.IsConnected == false) { Console.WriteLine("Publish >>Connected Failed! "); return; } Console.WriteLine("Publish >>Topic: " + Topic + "; QoS: " + QualityOfServiceLevel + "; Retained: " + Retained + ";"); Console.WriteLine("Publish >>Message: " + Message); MqttApplicationMessageBuilder mamb = new MqttApplicationMessageBuilder() .WithTopic(Topic) .WithPayload(Message).WithRetainFlag(Retained); if (QualityOfServiceLevel == 0) { mamb = mamb.WithAtMostOnceQoS(); } else if (QualityOfServiceLevel == 1) { mamb = mamb.WithAtLeastOnceQoS(); } else if (QualityOfServiceLevel == 2) { mamb = mamb.WithExactlyOnceQoS(); } mqttClient.PublishAsync(mamb.Build()); } catch (Exception exp) { Console.WriteLine("Publish >>" + exp.Message); } }
紙上得來終覺淺,要改形成本身想要的些東西,還要花些功夫!不過這已經很好了!謝謝各位高手的貢獻