物聯網架構成長之路(32)-SpringBoot集成MQTT客戶端

1、前言
  這裏雖然是說MQTT客戶端。其實對於服務器來講,這裏的一個具備超級權限的MQTT客戶端,就能夠作不少事情。好比手機APP或者網頁或者第三方服務須要發送數據到設備,可是這些又不是設備,又不能讓他們連到MQTT。那麼就能夠經過HTTP請求業務服務器。而後由業務服務器利用這個MQTT客戶端進行發送數據。
  還有,以前好多人問我,怎麼保存這些物聯網數據,真的要像前面的博客那樣,要本身寫插件嗎?特別麻煩的啊。這裏給出的結論是不須要。保存數據,除了寫EMQ插件,還能夠在EMQ的規則引擎上進行配置Web消息轉發【EMQ 3.x 版本】,還有就是這種經過業務服務器訂閱根Topic來保存物聯網原始數據。
  這篇博客這討論如何把MQTT客戶端集成到業務服務器上(基於SpringBoot 2.0)。下一篇博客會講到數據保存到InfluxDB,而後如何經過Grafana進行可視化Dashboard看板模式展現。html

 

2、配置pom.xml,引入第三方庫java

 1         <!-- MQTT -->
 2         <dependency>
 3             <groupId>org.springframework.boot</groupId>
 4             <artifactId>spring-boot-starter-integration</artifactId>
 5         </dependency>
 6         <dependency>
 7             <groupId>org.springframework.integration</groupId>
 8             <artifactId>spring-integration-stream</artifactId>
 9         </dependency>
10         <dependency>
11             <groupId>org.springframework.integration</groupId>
12             <artifactId>spring-integration-mqtt</artifactId>
13         </dependency>

 

3、MQTT客戶端代碼(Java)spring

  MqttDemoApplication.javajson

 1 package com.wunaozai.mqtt;
 2 
 3 import org.springframework.boot.SpringApplication;
 4 import org.springframework.boot.autoconfigure.SpringBootApplication;
 5 
 6 import com.wunaozai.mqtt.tools.MqttPushClient;
 7 
 8 @SpringBootApplication
 9 public class MqttDemoApplication {
10 
11     public static void main(String[] args) {
12         SpringApplication.run(MqttDemoApplication.class, args);
13         
14         test();
15     }
16 
17     
18     private static void test(){
19         MqttPushClient.MQTT_HOST = "tcp://mqtt.com:1883";
20         MqttPushClient.MQTT_CLIENTID = "client";
21         MqttPushClient.MQTT_USERNAME = "username";
22         MqttPushClient.MQTT_PASSWORD = "password";
23         MqttPushClient client = MqttPushClient.getInstance();
24         client.subscribe("/#");
25     }
26 }

  MqttPushCallback.java服務器

 1 package com.wunaozai.mqtt.tools;
 2 
 3 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
 4 import org.eclipse.paho.client.mqttv3.MqttCallback;
 5 import org.eclipse.paho.client.mqttv3.MqttMessage;
 6 import org.slf4j.Logger;
 7 import org.slf4j.LoggerFactory;
 8 
 9 /**
10  * MQTT 推送回調
11  * @author wunaozai
12  * @date 2018-08-22
13  */
14 public class MqttPushCallback implements MqttCallback {
15     
16     private static final Logger log = LoggerFactory.getLogger(MqttPushCallback.class);
17 
18     @Override
19     public void connectionLost(Throwable cause) {
20         log.info("斷開鏈接,建議重連" + this);
21         //斷開鏈接,建議重連
22     }
23 
24     @Override
25     public void deliveryComplete(IMqttDeliveryToken token) {
26         //log.info(token.isComplete() + "");
27     }
28 
29     @Override
30     public void messageArrived(String topic, MqttMessage message) throws Exception {
31         log.info("Topic: " + topic);
32         log.info("Message: " + new String(message.getPayload()));
33     }
34 
35 }

  MqttPushClient.javadom

  1 package com.wunaozai.mqtt.tools;
  2 
  3 import org.eclipse.paho.client.mqttv3.MqttClient;
  4 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  5 import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
  6 import org.eclipse.paho.client.mqttv3.MqttMessage;
  7 import org.eclipse.paho.client.mqttv3.MqttTopic;
  8 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  9 import org.slf4j.Logger;
 10 import org.slf4j.LoggerFactory;
 11 
 12 /**
 13  * 建立一個MQTT客戶端
 14  * @author wunaozai
 15  * @date 2018-08-22
 16  */
 17 public class MqttPushClient {
 18     
 19     private static final Logger log = LoggerFactory.getLogger(MqttPushClient.class);
 20     public static String MQTT_HOST = "";
 21     public static String MQTT_CLIENTID = "";
 22     public static String MQTT_USERNAME = "";
 23     public static String MQTT_PASSWORD = "";
 24     public static int MQTT_TIMEOUT = 10;
 25     public static int MQTT_KEEPALIVE = 10;
 26     
 27     private MqttClient client;
 28     private static volatile MqttPushClient mqttClient = null;
 29     public static MqttPushClient getInstance() {
 30         if(mqttClient == null) {
 31             synchronized (MqttPushClient.class) {
 32                 if(mqttClient == null) {
 33                     mqttClient = new MqttPushClient();
 34                 }
 35             }
 36         }
 37         return mqttClient;
 38     }
 39     
 40     private MqttPushClient() {
 41         log.info("Connect MQTT: " + this);
 42         connect();
 43     }
 44     
 45     private void connect() {
 46         try {
 47             client = new MqttClient(MQTT_HOST, MQTT_CLIENTID, new MemoryPersistence());
 48             MqttConnectOptions option = new MqttConnectOptions();
 49             option.setCleanSession(true);
 50             option.setUserName(MQTT_USERNAME);
 51             option.setPassword(MQTT_PASSWORD.toCharArray());
 52             option.setConnectionTimeout(MQTT_TIMEOUT);
 53             option.setKeepAliveInterval(MQTT_KEEPALIVE);
 54             option.setAutomaticReconnect(true);
 55             try {
 56                 client.setCallback(new MqttPushCallback());
 57                 client.connect(option);
 58             } catch (Exception e) {
 59                 e.printStackTrace();
 60             }
 61         } catch (Exception e) {
 62             e.printStackTrace();
 63         }
 64     }
 65     /**
 66      * 發佈主題,用於通知<br>
 67      * 默認qos爲1 非持久化
 68      * @param topic
 69      * @param data
 70      */
 71     public void publish(String topic, String data) {
 72         publish(topic, data, 1, false);
 73     }
 74     /**
 75      * 發佈
 76      * @param topic
 77      * @param data
 78      * @param qos
 79      * @param retained
 80      */
 81     public void publish(String topic, String data, int qos, boolean retained) {
 82         MqttMessage message = new MqttMessage();
 83         message.setQos(qos);
 84         message.setRetained(retained);
 85         message.setPayload(data.getBytes());
 86         MqttTopic mqttTopic = client.getTopic(topic);
 87         if(null == mqttTopic) {
 88             log.error("Topic Not Exist");
 89         }
 90         MqttDeliveryToken token;
 91         try {
 92             token = mqttTopic.publish(message);
 93             token.waitForCompletion();
 94         } catch (Exception e) {
 95             e.printStackTrace();
 96         }
 97     }
 98     /**
 99      * 訂閱某個主題 qos默認爲1
100      * @param topic
101      */
102     public void subscribe(String topic) {
103         subscribe(topic, 1);
104     }
105     /**
106      * 訂閱某個主題
107      * @param topic
108      * @param qos
109      */
110     public void subscribe(String topic, int qos) {
111         try {
112             client.subscribe(topic, qos);
113         } catch (Exception e) {
114             e.printStackTrace();
115         }
116     }
117 }

 

4、MQTT客戶端代碼(C#)
  爲了下下篇博客Grafana有數據能夠展現,我須要開發一個PC小工具【設備仿真】,用來模擬設備一直髮送數據。這裏就不對C#開發進行過多的說明了。經過nuget,引入第三方mqtt庫。這個工具是我如今開發平臺工具鏈的一個小工具。至於裏面的Payload協議,能夠不用管。讀者能夠根據本身的業務制定本身的通訊協議。
eclipse

  部分C#代碼(鏈接服務器與發送數據)async

  1 using MQTTClient.Model;
  2 using MQTTnet;
  3 using MQTTnet.Core;
  4 using MQTTnet.Core.Client;
  5 using Newtonsoft.Json;
  6 using System;
  7 using System.Collections.Generic;
  8 using System.Text;
  9 using System.Threading.Tasks;
 10 using System.Windows.Forms;
 11 
 12 namespace MQTTClient
 13 {
 14     public partial class MainPage : Form
 15     {
 16         public MainPage()
 17         {
 18             InitializeComponent();
 19             init();
 20         }
 21         private void init()
 22         {
 23             txtusername.Text = "";
 24             txtpassword.Text = "";
 25             txtclientid.Text = "";
 26             txttopic.Text = "iot/UUID/device/devicepub/update";
 27         }
 28 
 29         IMqttClient client = null;
 30         private async Task ConnectMqttServerAsync()
 31         {
 32             if(client == null)
 33             {
 34                 client = new MqttClientFactory().CreateMqttClient() as MqttClient;
 35                 client.ApplicationMessageReceived += mqttClientApplicationMessageReceived;
 36                 client.Connected += mqttClientConnected;
 37                 client.Disconnected += mqttClientDisconnected;
 38             }
 39             try
 40             {
 41                 await client.DisconnectAsync();
 42                 var option = getMQTTOption();
 43                 await client.ConnectAsync(option);
 44             }catch(Exception e)
 45             {
 46                 Invoke((new Action(() =>
 47                 {
 48                     lblStatus.Text = "鏈接服務器失敗: " + e.Message;
 49                 })));
 50             }
 51         }
 52         private void mqttClientDisconnected(object sender, EventArgs e)
 53         {
 54             Invoke((new Action(() =>
 55             {
 56                 lblStatus.Text = "鏈接服務器失敗: ERROR";
 57             })));
 58         }
 59         private void mqttClientConnected(object sender, EventArgs e)
 60         {
 61             Invoke((new Action(() =>
 62             {
 63                 lblStatus.Text = "鏈接服務器成功";
 64             })));
 65         }
 66         private void mqttClientApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
 67         {
 68             //本工具部收數據
 69             throw new NotImplementedException();
 70         }
 71 
 72         private void btnconnect_Click(object sender, EventArgs e)
 73         {
 74             Task.Run(async () => { await ConnectMqttServerAsync(); });
 75         }
 76         private void btndisconnect_Click(object sender, EventArgs e)
 77         {
 78             client.DisconnectAsync();
 79         }
 80         private void btnsendone_Click(object sender, EventArgs e)
 81         {
 82             sendPayload();
 83         }
 84         private void btnsendts_Click(object sender, EventArgs e)
 85         {
 86             timer1.Interval = Convert.ToInt32(txttime.Text);
 87             timer1.Enabled = true;
 88         }
 89         private void btnstopts_Click(object sender, EventArgs e)
 90         {
 91             timer1.Enabled = false;
 92         }
 93         private void timer1_Tick(object sender, EventArgs e)
 94         {
 95             sendPayload();
 96         }
 97         private int sendPayload()
 98         {
 99             if (client.IsConnected == false)
100             {
101                 return -1;
102             }
103             PayloadModel payload = getPayload();
104             string json = JsonConvert.SerializeObject(payload, Formatting.Indented);
105             txtview.Text = json;
106             string topic = txttopic.Text;
107             var msg = new MqttApplicationMessage(topic, Encoding.Default.GetBytes(json),
108                 MQTTnet.Core.Protocol.MqttQualityOfServiceLevel.AtMostOnce, false);
109             client.PublishAsync(msg);
110             lblSendStatus.Text = "發送: " + DateTime.Now.ToLongTimeString();
111             return 0;
112         }
113 
114         private MqttClientTcpOptions getMQTTOption()
115         {
116             MqttClientTcpOptions option = new MqttClientTcpOptions();
117             string hostname = txthostname.Text;
118             string[] host_port = hostname.Split(':');
119             int port = 1883;
120             if(host_port.Length >= 2)
121             {
122                 hostname = host_port[0];
123                 port = Convert.ToInt32(host_port[1]);
124             }
125             option.Server = hostname;
126             option.ClientId = txtclientid.Text;
127             option.UserName = txtusername.Text;
128             option.Password = txtpassword.Text;
129             option.Port = port;
130             option.CleanSession = true;
131             return option;
132         }
133 
134         private PayloadModel getPayload()
135         {
136             PayloadModel payload = new PayloadModel();
137             //
138             return payload;
139         }
140 
141         Random rand1 = new Random(System.DateTime.Now.Millisecond);
142         private int getRandomNum()
143         {
144             int data = rand1.Next(0, 100);
145             return data;
146         }
147 
148         int linenum = 0;
149         Random rand2 = new Random(System.DateTime.Now.Millisecond);
150         private int getLineNum()
151         {
152             int f = rand2.Next(0, 100);
153             int data = rand2.Next(0, 5);
154             if(f % 2 == 1)
155             {
156                 linenum += data;
157             }
158             else
159             {
160                 linenum -= data;
161             }
162             return linenum;
163         }
164 
165     }
166 }

 

本文地址: http://www.javashuo.com/article/p-zljbkuwe-cs.htmltcp

相關文章
相關標籤/搜索