1、前言
這裏雖然是說MQTT客戶端。其實對於服務器來講,這裏的一個具備超級權限的MQTT客戶端,就能夠作不少事情。好比手機APP或者網頁或者第三方服務須要發送數據到設備,可是這些又不是設備,又不能讓他們連到MQTT。那麼就能夠經過HTTP請求業務服務器。而後由業務服務器利用這個MQTT客戶端進行發送數據。
還有,以前好多人問我,怎麼保存這些物聯網數據,真的要像前面的博客那樣,要本身寫插件嗎?特別麻煩的啊。這裏給出的結論是不須要。保存數據,除了寫EMQ插件,還能夠在EMQ的規則引擎上進行配置Web消息轉發【EMQ 3.x 版本】,還有就是這種經過業務服務器訂閱根Topic來保存物聯網原始數據。
這篇博客這討論如何把MQTT客戶端集成到業務服務器上(基於SpringBoot 2.0)。下一篇博客會講到數據保存到InfluxDB,而後如何經過Grafana進行可視化Dashboard看板模式展現。html
2、配置pom.xml,引入第三方庫java
1 <!-- MQTT --> 2 <dependency> 3 <groupId>org.springframework.boot</groupId> 4 <artifactId>spring-boot-starter-integration</artifactId> 5 </dependency> 6 <dependency> 7 <groupId>org.springframework.integration</groupId> 8 <artifactId>spring-integration-stream</artifactId> 9 </dependency> 10 <dependency> 11 <groupId>org.springframework.integration</groupId> 12 <artifactId>spring-integration-mqtt</artifactId> 13 </dependency>
3、MQTT客戶端代碼(Java)spring
MqttDemoApplication.javajson
1 package com.wunaozai.mqtt; 2 3 import org.springframework.boot.SpringApplication; 4 import org.springframework.boot.autoconfigure.SpringBootApplication; 5 6 import com.wunaozai.mqtt.tools.MqttPushClient; 7 8 @SpringBootApplication 9 public class MqttDemoApplication { 10 11 public static void main(String[] args) { 12 SpringApplication.run(MqttDemoApplication.class, args); 13 14 test(); 15 } 16 17 18 private static void test(){ 19 MqttPushClient.MQTT_HOST = "tcp://mqtt.com:1883"; 20 MqttPushClient.MQTT_CLIENTID = "client"; 21 MqttPushClient.MQTT_USERNAME = "username"; 22 MqttPushClient.MQTT_PASSWORD = "password"; 23 MqttPushClient client = MqttPushClient.getInstance(); 24 client.subscribe("/#"); 25 } 26 }
MqttPushCallback.java服務器
1 package com.wunaozai.mqtt.tools; 2 3 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; 4 import org.eclipse.paho.client.mqttv3.MqttCallback; 5 import org.eclipse.paho.client.mqttv3.MqttMessage; 6 import org.slf4j.Logger; 7 import org.slf4j.LoggerFactory; 8 9 /** 10 * MQTT 推送回調 11 * @author wunaozai 12 * @date 2018-08-22 13 */ 14 public class MqttPushCallback implements MqttCallback { 15 16 private static final Logger log = LoggerFactory.getLogger(MqttPushCallback.class); 17 18 @Override 19 public void connectionLost(Throwable cause) { 20 log.info("斷開鏈接,建議重連" + this); 21 //斷開鏈接,建議重連 22 } 23 24 @Override 25 public void deliveryComplete(IMqttDeliveryToken token) { 26 //log.info(token.isComplete() + ""); 27 } 28 29 @Override 30 public void messageArrived(String topic, MqttMessage message) throws Exception { 31 log.info("Topic: " + topic); 32 log.info("Message: " + new String(message.getPayload())); 33 } 34 35 }
MqttPushClient.javadom
1 package com.wunaozai.mqtt.tools; 2 3 import org.eclipse.paho.client.mqttv3.MqttClient; 4 import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 5 import org.eclipse.paho.client.mqttv3.MqttDeliveryToken; 6 import org.eclipse.paho.client.mqttv3.MqttMessage; 7 import org.eclipse.paho.client.mqttv3.MqttTopic; 8 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; 9 import org.slf4j.Logger; 10 import org.slf4j.LoggerFactory; 11 12 /** 13 * 建立一個MQTT客戶端 14 * @author wunaozai 15 * @date 2018-08-22 16 */ 17 public class MqttPushClient { 18 19 private static final Logger log = LoggerFactory.getLogger(MqttPushClient.class); 20 public static String MQTT_HOST = ""; 21 public static String MQTT_CLIENTID = ""; 22 public static String MQTT_USERNAME = ""; 23 public static String MQTT_PASSWORD = ""; 24 public static int MQTT_TIMEOUT = 10; 25 public static int MQTT_KEEPALIVE = 10; 26 27 private MqttClient client; 28 private static volatile MqttPushClient mqttClient = null; 29 public static MqttPushClient getInstance() { 30 if(mqttClient == null) { 31 synchronized (MqttPushClient.class) { 32 if(mqttClient == null) { 33 mqttClient = new MqttPushClient(); 34 } 35 } 36 } 37 return mqttClient; 38 } 39 40 private MqttPushClient() { 41 log.info("Connect MQTT: " + this); 42 connect(); 43 } 44 45 private void connect() { 46 try { 47 client = new MqttClient(MQTT_HOST, MQTT_CLIENTID, new MemoryPersistence()); 48 MqttConnectOptions option = new MqttConnectOptions(); 49 option.setCleanSession(true); 50 option.setUserName(MQTT_USERNAME); 51 option.setPassword(MQTT_PASSWORD.toCharArray()); 52 option.setConnectionTimeout(MQTT_TIMEOUT); 53 option.setKeepAliveInterval(MQTT_KEEPALIVE); 54 option.setAutomaticReconnect(true); 55 try { 56 client.setCallback(new MqttPushCallback()); 57 client.connect(option); 58 } catch (Exception e) { 59 e.printStackTrace(); 60 } 61 } catch (Exception e) { 62 e.printStackTrace(); 63 } 64 } 65 /** 66 * 發佈主題,用於通知<br> 67 * 默認qos爲1 非持久化 68 * @param topic 69 * @param data 70 */ 71 public void publish(String topic, String data) { 72 publish(topic, data, 1, false); 73 } 74 /** 75 * 發佈 76 * @param topic 77 * @param data 78 * @param qos 79 * @param retained 80 */ 81 public void publish(String topic, String data, int qos, boolean retained) { 82 MqttMessage message = new MqttMessage(); 83 message.setQos(qos); 84 message.setRetained(retained); 85 message.setPayload(data.getBytes()); 86 MqttTopic mqttTopic = client.getTopic(topic); 87 if(null == mqttTopic) { 88 log.error("Topic Not Exist"); 89 } 90 MqttDeliveryToken token; 91 try { 92 token = mqttTopic.publish(message); 93 token.waitForCompletion(); 94 } catch (Exception e) { 95 e.printStackTrace(); 96 } 97 } 98 /** 99 * 訂閱某個主題 qos默認爲1 100 * @param topic 101 */ 102 public void subscribe(String topic) { 103 subscribe(topic, 1); 104 } 105 /** 106 * 訂閱某個主題 107 * @param topic 108 * @param qos 109 */ 110 public void subscribe(String topic, int qos) { 111 try { 112 client.subscribe(topic, qos); 113 } catch (Exception e) { 114 e.printStackTrace(); 115 } 116 } 117 }
4、MQTT客戶端代碼(C#)
爲了下下篇博客Grafana有數據能夠展現,我須要開發一個PC小工具【設備仿真】,用來模擬設備一直髮送數據。這裏就不對C#開發進行過多的說明了。經過nuget,引入第三方mqtt庫。這個工具是我如今開發平臺工具鏈的一個小工具。至於裏面的Payload協議,能夠不用管。讀者能夠根據本身的業務制定本身的通訊協議。
eclipse
部分C#代碼(鏈接服務器與發送數據)async
1 using MQTTClient.Model; 2 using MQTTnet; 3 using MQTTnet.Core; 4 using MQTTnet.Core.Client; 5 using Newtonsoft.Json; 6 using System; 7 using System.Collections.Generic; 8 using System.Text; 9 using System.Threading.Tasks; 10 using System.Windows.Forms; 11 12 namespace MQTTClient 13 { 14 public partial class MainPage : Form 15 { 16 public MainPage() 17 { 18 InitializeComponent(); 19 init(); 20 } 21 private void init() 22 { 23 txtusername.Text = ""; 24 txtpassword.Text = ""; 25 txtclientid.Text = ""; 26 txttopic.Text = "iot/UUID/device/devicepub/update"; 27 } 28 29 IMqttClient client = null; 30 private async Task ConnectMqttServerAsync() 31 { 32 if(client == null) 33 { 34 client = new MqttClientFactory().CreateMqttClient() as MqttClient; 35 client.ApplicationMessageReceived += mqttClientApplicationMessageReceived; 36 client.Connected += mqttClientConnected; 37 client.Disconnected += mqttClientDisconnected; 38 } 39 try 40 { 41 await client.DisconnectAsync(); 42 var option = getMQTTOption(); 43 await client.ConnectAsync(option); 44 }catch(Exception e) 45 { 46 Invoke((new Action(() => 47 { 48 lblStatus.Text = "鏈接服務器失敗: " + e.Message; 49 }))); 50 } 51 } 52 private void mqttClientDisconnected(object sender, EventArgs e) 53 { 54 Invoke((new Action(() => 55 { 56 lblStatus.Text = "鏈接服務器失敗: ERROR"; 57 }))); 58 } 59 private void mqttClientConnected(object sender, EventArgs e) 60 { 61 Invoke((new Action(() => 62 { 63 lblStatus.Text = "鏈接服務器成功"; 64 }))); 65 } 66 private void mqttClientApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) 67 { 68 //本工具部收數據 69 throw new NotImplementedException(); 70 } 71 72 private void btnconnect_Click(object sender, EventArgs e) 73 { 74 Task.Run(async () => { await ConnectMqttServerAsync(); }); 75 } 76 private void btndisconnect_Click(object sender, EventArgs e) 77 { 78 client.DisconnectAsync(); 79 } 80 private void btnsendone_Click(object sender, EventArgs e) 81 { 82 sendPayload(); 83 } 84 private void btnsendts_Click(object sender, EventArgs e) 85 { 86 timer1.Interval = Convert.ToInt32(txttime.Text); 87 timer1.Enabled = true; 88 } 89 private void btnstopts_Click(object sender, EventArgs e) 90 { 91 timer1.Enabled = false; 92 } 93 private void timer1_Tick(object sender, EventArgs e) 94 { 95 sendPayload(); 96 } 97 private int sendPayload() 98 { 99 if (client.IsConnected == false) 100 { 101 return -1; 102 } 103 PayloadModel payload = getPayload(); 104 string json = JsonConvert.SerializeObject(payload, Formatting.Indented); 105 txtview.Text = json; 106 string topic = txttopic.Text; 107 var msg = new MqttApplicationMessage(topic, Encoding.Default.GetBytes(json), 108 MQTTnet.Core.Protocol.MqttQualityOfServiceLevel.AtMostOnce, false); 109 client.PublishAsync(msg); 110 lblSendStatus.Text = "發送: " + DateTime.Now.ToLongTimeString(); 111 return 0; 112 } 113 114 private MqttClientTcpOptions getMQTTOption() 115 { 116 MqttClientTcpOptions option = new MqttClientTcpOptions(); 117 string hostname = txthostname.Text; 118 string[] host_port = hostname.Split(':'); 119 int port = 1883; 120 if(host_port.Length >= 2) 121 { 122 hostname = host_port[0]; 123 port = Convert.ToInt32(host_port[1]); 124 } 125 option.Server = hostname; 126 option.ClientId = txtclientid.Text; 127 option.UserName = txtusername.Text; 128 option.Password = txtpassword.Text; 129 option.Port = port; 130 option.CleanSession = true; 131 return option; 132 } 133 134 private PayloadModel getPayload() 135 { 136 PayloadModel payload = new PayloadModel(); 137 //略 138 return payload; 139 } 140 141 Random rand1 = new Random(System.DateTime.Now.Millisecond); 142 private int getRandomNum() 143 { 144 int data = rand1.Next(0, 100); 145 return data; 146 } 147 148 int linenum = 0; 149 Random rand2 = new Random(System.DateTime.Now.Millisecond); 150 private int getLineNum() 151 { 152 int f = rand2.Next(0, 100); 153 int data = rand2.Next(0, 5); 154 if(f % 2 == 1) 155 { 156 linenum += data; 157 } 158 else 159 { 160 linenum -= data; 161 } 162 return linenum; 163 } 164 165 } 166 }