利用GitHub上的溫度傳感器的例子做爲講解,實現從雲端獲取設備終端狀態及使用Java模擬設備數據。其實和官網給的視頻同樣,只須要將終端設備的數據轉換爲支持MQTT協議傳輸的數據,雲端就能夠拿到數據了。java
1.雲端:建立文件,及開啓cloudCorenode
vim device.yaml //案例中的設備配置,直接使用請刪除全部註釋 #apiVersion,該屬性定義了咱們從k8s獲取改設備數據的url路徑 apiVersion: devices.kubeedge.io/v1alpha1 kind: Device metadata: name: temperature3 labels: description: 'temperature3' manufacturer: 'test' spec: deviceModelRef: name: temperature3-model #與設備模板名稱進行綁定 nodeSelector: nodeSelectorTerms: - matchExpressions: - key: '' operator: In values: - sunsheen-edge #部署該設備的節點 # status中的屬性爲咱們能夠定義的屬性,屬性名爲propertyName的屬性與初始指望值 status: twins: - propertyName: temperatureState desired: metadata: type: string value: 'on' - propertyName: temperature desired: metadata: type: string value: ''
vim devicemodel.yaml //設備模板文件,直接使用請刪除全部註釋。 #apiVersion與設備端保持一致 apiVersion: devices.kubeedge.io/v1alpha1 kind: DeviceModel metadata: name: temperature3-model namespace: default spec: #屬性與設備的保持一致,這裏能夠設備權限,這裏咱們只能修改溫度狀態,沒法控制實際溫度 properties: - name: temperatureState description: Temperature collected from the edge device type: string: accessMode: ReadWrite defaultValue: 'on' - name: temperature description: Temperature collected from the edge device type: string: accessMode: ReadOnly defaultValue: ''
vim deployment.yaml //使用deployment控制器(k8s內容), 建立POD,邊緣節點會自動去拉取鏡像(很慢,建議手動拉取,或配置私有鏡像倉庫) apiVersion: apps/v1 kind: Deployment metadata: name: temperature3-mapper labels: app: temperature spec: replicas: 1 selector: matchLabels: app: temperature3 template: metadata: labels: app: temperature3 spec: hostNetwork: true nodeSelector: name: "sunsheen-edge" containers: - name: temperature3 image: kubeedge-mapper:v2.2 #須要部署的鏡像 imagePullPolicy: IfNotPresent securityContext: privileged: true
2.邊緣端:開啓 mosquitto,啓動edgeCoredocker
mosquitto -d -p 1883 //邊緣端開啓mosquitto,用於傳輸消息
<dependency> <groupId>org.yaml</groupId> <artifactId>snakeyaml</artifactId> <version>1.13</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.58</version> </dependency> <!-- http請求 --> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.2</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient-cache</artifactId> <version>4.5.2</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpmime</artifactId> <version>4.5.2</version> </dependency> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.0</version> </dependency>
import java.util.concurrent.ScheduledExecutorService; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; /** * @author wanchen.chen * @ClassName KubeedageClient * @Despriction: MQTTP 鏈接類,用於推送/訂閱 消息 * @date 2020/4/15 9:20 * @Version 1.0 **/ public class KubeedageClient { private MqttMessage message; private MqttClient client; private MqttConnectOptions options; private MqttTopic clientTopic; private MqttTopic serverTopic; //定義主題,document爲雲端反饋的主題;update爲邊緣向雲端推送的主題。temperature3爲設備名稱,其餘都固定。 private static String clientTopicStr ="$hw/events/device/temperature3/twin/update/document"; private static String serverTopicStr ="$hw/events/device/temperature3/twin/update"; private static final String url ="tcp://0.0.0.0:1883"; //我這裏是要打包爲鏡像部署,全部須要配置邊緣節點的用戶及密碼 private static final String userName ="xxx"; private static final String password ="xxx"; private ScheduledExecutorService scheduler; public KubeedageClient(){ } /** * 初始化 */ public void start() { try { // host爲主機名,clientid即鏈接MQTT的客戶端ID,通常以惟一標識符表示,MemoryPersistence設置clientid的保存形式,默認爲之內存保存 client = new MqttClient(url, "KubeEdgeClient", new MemoryPersistence()); // MQTT的鏈接設置 options = new MqttConnectOptions(); // 設置是否清空session,這裏若是設置爲false表示服務器會保留客戶端的鏈接記錄,這裏設置爲true表示每次鏈接到服務器都以新的身份鏈接 options.setCleanSession(true); // 設置鏈接的用戶名 options.setUserName(userName); // 設置鏈接的密碼 options.setPassword(password.toCharArray()); // 設置超時時間 單位爲秒 options.setConnectionTimeout(10); // 設置會話心跳時間 單位爲秒 服務器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並無重連的機制 options.setKeepAliveInterval(20); // 設置回調 client.setCallback(new PushCallback()); clientTopic = client.getTopic(clientTopicStr); serverTopic = client.getTopic(serverTopicStr); //setWill方法,若是項目中須要知道客戶端是否掉線能夠調用該方法。設置最終端口的通知消息 // options.setWill(clientTopoc, "close".getBytes(), 2, true); client.connect(options); } catch (Exception e) { e.printStackTrace(); } } /** * 訂閱主題消息 */ public void listerData(){ //訂閱消息 int[] Qos = {1}; String[] topic1 = {clientTopicStr}; try { client.subscribe(topic1, Qos); } catch (MqttException e) { e.printStackTrace(); } } /** * push 消息到主題 * @param topic * @param message * @throws MqttPersistenceException * @throws MqttException */ public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException, MqttException { MqttDeliveryToken token = topic.publish(message); token.waitForCompletion(); // System.out.println("message is published completely! " // + token.isComplete()); } /** * 發送消息 * @param deviceInfo */ public void putData(String deviceInfo){ message = new MqttMessage(); message.setQos(2); message.setRetained(true); message.setPayload(deviceInfo.getBytes()); try { publish(serverTopic,message); } catch (MqttException e) { e.printStackTrace(); } } }
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; /** * @author wanchen.chen * @ClassName PushCallback 發佈消息的回調類 * @Despriction: 必須實現MqttCallback的接口並實現對應的相關接口方法CallBack 類將實現 MqttCallBack。 * 每一個客戶機標識都須要一個回調實例。在此示例中,構造函數傳遞客戶機標識以另存爲實例數據。 * 在回調中,將它用來標識已經啓動了該回調的哪一個實例。 * 必須在回調類中實現三個方法: * @date 2020/4/15 9:17 * @Version 1.0 **/ public class PushCallback implements MqttCallback { public void connectionLost(Throwable cause) { // 鏈接丟失後,通常在這裏面進行重連 System.out.println("鏈接斷開,能夠作重連"); } public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); } public void messageArrived(String topic, MqttMessage message) throws Exception { // subscribe後獲得的消息會執行到這裏面,消息只能被消費一次 System.out.println("接收消息主題 : " + topic); System.out.println("接收消息Qos : " + message.getQos()); System.out.println("接收的消息爲:"+str); } }
.yaml文件,用於json數據結構apache
event_id: 0 timestamp: 0 twin: temperature: actual: value: 0 metadata: type: Updated temperatureState: actual: value: height metadata: type: Updated
經過Java代碼將其轉換爲JSON,將數據put進JSON中就能夠發送了:json
import org.yaml.snakeyaml.Yaml; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.InputStream; import java.net.URL; import java.util.Map; /** * @author wanchen.chen * @ClassName AnalysisYAML * @Despriction: 解析YAML文件的內容 * @date 2020/4/28 9:32 * @Version 1.0 **/ public class AnalysisYAML { /** * 傳參解析 * @param urlStr * @return */ public Map<String,Object> getYamlData(String urlStr){ URL url = AnalysisYAML.class.getClassLoader().getResource(urlStr); return analysisData(url); } /** * 默認解析 * @return */ public Map<String,Object> getYamlData(){ URL url = AnalysisYAML.class.getClassLoader().getResource("attribute.yaml"); return analysisData(url); } /** * 獲取URL 解析內容 * @param url * @return */ public Map<String,Object> analysisData(URL url){ InputStream input = null; try { input = new FileInputStream(url.getFile()); } catch (FileNotFoundException e) { e.printStackTrace(); } Yaml yaml = new Yaml(); Map<String,Object> map = (Map<String,Object>)yaml.load(input); return map; } }
經過DockerFile將jar文件打包爲鏡像:vim
FROM java:latest RUN mkdir -p /usr RUN mkdir -p /usr/local COPY . /usr/local/ WORKDIR /usr/local EXPOSE 8892 ENTRYPOINT ["java","-jar","xxx.jar"] //在DockerFile 文件目錄下建立鏡像 docker build -t kubeedge-mapper:v2.0 .