https://blog.csdn.net/dengyaan/article/details/51752327git
最近由於工做須要,須要使用C# 語言編寫一個經過MQTT協議 ,上傳數據到雲端的工具。由於以前沒有用過MQTT,因此 使用的時候遇到不少問題.下面將會把我遇到的問題一一解釋。github
1.引用源碼庫地址
https://github.com/eclipse/paho.mqtt.m2mqtt
2.說明
https://m2mqtt.wordpress.com/m2mqtt_doc/
3.使用後遇到的問題
當網絡中斷後,MQTT 程序有時候不會自動重連。
解決方案 添加監控MQTT鏈接狀態網絡
1.添加全局靜態變量 uPLibrary.Networking.M2Mqtt.MQTTConfig.IsSocketRun;session
class MQTTConfig{
public static bool IsSocketRun = false;
}
1
2
3
2.修改MqttClient 類 的Connect 方法,在鏈接成功後把IsSocketRun = true.
MQTTConfig.IsSocketRun = true;eclipse
/// <summary>
/// Connect to broker
/// </summary>
/// <param name="clientId">Client identifier</param>
/// <param name="username">Username</param>
/// <param name="password">Password</param>
/// <param name="willRetain">Will retain flag</param>
/// <param name="willQosLevel">Will QOS level</param>
/// <param name="willFlag">Will flag</param>
/// <param name="willTopic">Will topic</param>
/// <param name="willMessage">Will message</param>
/// <param name="cleanSession">Clean sessione flag</param>
/// <param name="keepAlivePeriod">Keep alive period</param>
/// <returns>Return code of CONNACK message from broker</returns>
public byte Connect(string clientId,
string username,
string password,
bool willRetain,
byte willQosLevel,
bool willFlag,
string willTopic,
string willMessage,
bool cleanSession,
ushort keepAlivePeriod)
{
// create CONNECT message
MqttMsgConnect connect = new MqttMsgConnect(clientId,
username,
password,
willRetain,
willQosLevel,
willFlag,
willTopic,
willMessage,
cleanSession,
keepAlivePeriod,
(byte)this.ProtocolVersion);ide
try
{
// connect to the broker
this.channel.Connect();
}
catch (Exception ex)
{
throw new MqttConnectionException("Exception connecting to the broker", ex);
}wordpress
this.lastCommTime = 0;
this.isRunning = true;
MQTTConfig.IsSocketRun = true;
this.isConnectionClosing = false;
// start thread for receiving messages from broker
Fx.StartThread(this.ReceiveThread);工具
....
3.繼續修改 MqttClient .cs類中的Ping() 方法ui
/// <summary>
/// Execute ping to broker for keep alive
/// </summary>
/// <returns>PINGRESP message from broker</returns>
private MqttMsgPingResp Ping()
{
MqttMsgPingReq pingreq = new MqttMsgPingReq();
try
{
// broker must send PINGRESP within timeout equal to keep alive period
return (MqttMsgPingResp)this.SendReceive(pingreq, this.keepAlivePeriod);
}
catch (Exception e)
{
#if TRACE
MqttUtility.Trace.WriteLine(TraceLevel.Error, "Exception occurred: {0}", e.ToString());
#endif
MQTTConfig.IsSocketRun = false;
// client must close connection
this.OnConnectionClosing();
return null;
}
}
4.最後在咱們程序集入口初始化程序的時候 添加線程調用 。當MQTT中斷後就會自動重連 ,另外提醒方法異常時必定要異常處理哦。this
while (true)
{
LogWriter.DebugLog(string.Format("執行次數{0} IsSocketRun {1}", i, uPLibrary.Networking.M2Mqtt.MQTTConfig.IsSocketRun));
if (!uPLibrary.Networking.M2Mqtt.MQTTConfig.IsSocketRun)
{
程序執行到嗎。。。
}
System.Threading.Thread.Sleep(10000);
}
MQTT 訂閱
// create client instance
MqttClient client = new MqttClient(IPAddress.Parse(MQTT_BROKER_ADDRESS));
// register to message received
client.MqttMsgPublishReceived += client_MqttMsgPublishReceived;
string clientId = Guid.NewGuid().ToString();
client.Connect(clientId);
// subscribe to the topic "/home/temperature" with QoS 2
client.Subscribe(new string[] { "/home/temperature" }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
...
static void client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e)
{
// handle message received
}
MQTT 發佈
…
// create client instance
MqttClient client = new MqttClient(IPAddress.Parse(MQTT_BROKER_ADDRESS));
string clientId = Guid.NewGuid().ToString();
client.Connect(clientId);
string strValue = Convert.ToString(value);
// publish a message on "/home/temperature" topic with QoS 2 client.Publish("/home/temperature", Encoding.UTF8.GetBytes(strValue), MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, false);