Kubeedge-mapper 實現

應用場景:

利用GitHub上的溫度傳感器的例子做爲講解,實現從雲端獲取設備終端狀態及使用Java模擬設備數據。其實和官網給的視頻同樣,只須要將終端設備的數據轉換爲支持MQTT協議傳輸的數據,雲端就能夠拿到數據了。java

須要使用的文件及技術

1.雲端:建立文件,及開啓cloudCorenode

vim device.yaml    //案例中的設備配置,直接使用請刪除全部註釋
#apiVersion,該屬性定義了咱們從k8s獲取改設備數據的url路徑
apiVersion: devices.kubeedge.io/v1alpha1  
kind: Device
metadata:
  name: temperature3
  labels:
    description: 'temperature3'
    manufacturer: 'test'
spec:
  deviceModelRef:
    name: temperature3-model    #與設備模板名稱進行綁定
  nodeSelector:
    nodeSelectorTerms:
      - matchExpressions:
          - key: ''
            operator: In
            values:
              - sunsheen-edge   #部署該設備的節點
# status中的屬性爲咱們能夠定義的屬性,屬性名爲propertyName的屬性與初始指望值
status:    
  twins:
    - propertyName: temperatureState
      desired:
        metadata:
          type: string
        value: 'on'
    - propertyName: temperature
      desired:
        metadata:
          type: string
        value: ''
vim devicemodel.yaml   //設備模板文件,直接使用請刪除全部註釋。
#apiVersion與設備端保持一致
apiVersion: devices.kubeedge.io/v1alpha1
kind: DeviceModel
metadata:
  name: temperature3-model
  namespace: default
spec:
#屬性與設備的保持一致,這裏能夠設備權限,這裏咱們只能修改溫度狀態,沒法控制實際溫度
  properties:
    - name: temperatureState
      description: Temperature collected from the edge device
      type:
        string:
          accessMode: ReadWrite
          defaultValue: 'on'
    - name: temperature
      description: Temperature collected from the edge device
      type:
        string:
          accessMode: ReadOnly
          defaultValue: ''
vim deployment.yaml  //使用deployment控制器(k8s內容), 建立POD,邊緣節點會自動去拉取鏡像(很慢,建議手動拉取,或配置私有鏡像倉庫)

apiVersion: apps/v1
kind: Deployment
metadata:
  name: temperature3-mapper
  labels:
    app: temperature
spec:
  replicas: 1
  selector:
    matchLabels:
      app: temperature3
  template:
    metadata:
      labels:
        app: temperature3
    spec:
      hostNetwork: true
      nodeSelector:
        name: "sunsheen-edge"
      containers:
      - name: temperature3
        image: kubeedge-mapper:v2.2    #須要部署的鏡像
        imagePullPolicy: IfNotPresent
        securityContext:
          privileged: true

2.邊緣端:開啓 mosquitto,啓動edgeCoredocker

mosquitto -d -p 1883  //邊緣端開啓mosquitto,用於傳輸消息

JAVA代碼模擬設備推送接收與推送消息:

<dependency>
            <groupId>org.yaml</groupId>
            <artifactId>snakeyaml</artifactId>
            <version>1.13</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.58</version>
        </dependency>
        <!--   http請求     -->
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient-cache</artifactId>
            <version>4.5.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpmime</artifactId>
            <version>4.5.2</version>
        </dependency>

        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.0</version>
        </dependency>
import java.util.concurrent.ScheduledExecutorService;

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
 * @author wanchen.chen
 * @ClassName KubeedageClient
 * @Despriction: MQTTP 鏈接類,用於推送/訂閱 消息
 * @date 2020/4/15 9:20
 * @Version 1.0
 **/
public class KubeedageClient {

    private MqttMessage message;
    private MqttClient client;
    private MqttConnectOptions options;
    private MqttTopic clientTopic;
    private MqttTopic serverTopic;
    //定義主題,document爲雲端反饋的主題;update爲邊緣向雲端推送的主題。temperature3爲設備名稱,其餘都固定。
    private static String clientTopicStr ="$hw/events/device/temperature3/twin/update/document";
    private static String serverTopicStr ="$hw/events/device/temperature3/twin/update";
    private static final String url ="tcp://0.0.0.0:1883";
    //我這裏是要打包爲鏡像部署,全部須要配置邊緣節點的用戶及密碼
    private static final String userName ="xxx";
    private static final String password ="xxx";

    private ScheduledExecutorService scheduler;

    public KubeedageClient(){
    }

    /**
     * 初始化
     */
    public void start() {
        try {
            // host爲主機名,clientid即鏈接MQTT的客戶端ID,通常以惟一標識符表示,MemoryPersistence設置clientid的保存形式,默認爲之內存保存
            client = new MqttClient(url, "KubeEdgeClient", new MemoryPersistence());
            // MQTT的鏈接設置
            options = new MqttConnectOptions();
            // 設置是否清空session,這裏若是設置爲false表示服務器會保留客戶端的鏈接記錄,這裏設置爲true表示每次鏈接到服務器都以新的身份鏈接
            options.setCleanSession(true);
            // 設置鏈接的用戶名
            options.setUserName(userName);
            // 設置鏈接的密碼
            options.setPassword(password.toCharArray());
            // 設置超時時間 單位爲秒
            options.setConnectionTimeout(10);
            // 設置會話心跳時間 單位爲秒 服務器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並無重連的機制
            options.setKeepAliveInterval(20);
            // 設置回調
            client.setCallback(new PushCallback());
            clientTopic = client.getTopic(clientTopicStr);
            serverTopic = client.getTopic(serverTopicStr);
            //setWill方法,若是項目中須要知道客戶端是否掉線能夠調用該方法。設置最終端口的通知消息
//            options.setWill(clientTopoc, "close".getBytes(), 2, true);
            client.connect(options);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 訂閱主題消息
     */
    public void listerData(){
        //訂閱消息
        int[] Qos  = {1};
        String[] topic1 = {clientTopicStr};
        try {
            client.subscribe(topic1, Qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * push 消息到主題
     * @param topic
     * @param message
     * @throws MqttPersistenceException
     * @throws MqttException
     */
    public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,
            MqttException {
        MqttDeliveryToken token = topic.publish(message);
        token.waitForCompletion();
//        System.out.println("message is published completely! "
//                + token.isComplete());
    }

    /**
     * 發送消息
     * @param deviceInfo
     */
    public void putData(String deviceInfo){
        message = new MqttMessage();
        message.setQos(2);
        message.setRetained(true);
        message.setPayload(deviceInfo.getBytes());
        try {
            publish(serverTopic,message);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

}
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
 * @author wanchen.chen
 * @ClassName PushCallback 發佈消息的回調類
 * @Despriction: 必須實現MqttCallback的接口並實現對應的相關接口方法CallBack 類將實現 MqttCallBack。
 *  每一個客戶機標識都須要一個回調實例。在此示例中,構造函數傳遞客戶機標識以另存爲實例數據。
 *  在回調中,將它用來標識已經啓動了該回調的哪一個實例。
 *  必須在回調類中實現三個方法:
 * @date 2020/4/15 9:17
 * @Version 1.0
 **/
public class PushCallback implements MqttCallback {

    public void connectionLost(Throwable cause) {
        // 鏈接丟失後,通常在這裏面進行重連
        System.out.println("鏈接斷開,能夠作重連");
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete---------" + token.isComplete());
    }

    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // subscribe後獲得的消息會執行到這裏面,消息只能被消費一次
        System.out.println("接收消息主題 : " + topic);
        System.out.println("接收消息Qos : " + message.getQos());
        System.out.println("接收的消息爲:"+str);
    }

}

發送的消息結構:

.yaml文件,用於json數據結構apache

event_id: 0
timestamp: 0
twin:
  temperature:
    actual:
      value: 0
    metadata:
      type: Updated
  temperatureState:
    actual:
      value: height
    metadata:
      type: Updated

經過Java代碼將其轉換爲JSON,將數據put進JSON中就能夠發送了:json

import org.yaml.snakeyaml.Yaml;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.net.URL;
import java.util.Map;

/**
 * @author wanchen.chen
 * @ClassName AnalysisYAML
 * @Despriction: 解析YAML文件的內容
 * @date 2020/4/28 9:32
 * @Version 1.0
 **/
public class AnalysisYAML {

    /**
     * 傳參解析
     * @param urlStr
     * @return
     */
    public  Map<String,Object> getYamlData(String urlStr){
        URL url = AnalysisYAML.class.getClassLoader().getResource(urlStr);
        return analysisData(url);
    }

    /**
     * 默認解析
     * @return
     */
    public  Map<String,Object> getYamlData(){
        URL url = AnalysisYAML.class.getClassLoader().getResource("attribute.yaml");
        return analysisData(url);
    }

    /**
     * 獲取URL 解析內容
     * @param url
     * @return
     */
    public Map<String,Object> analysisData(URL url){
        InputStream input = null;
        try {
            input = new FileInputStream(url.getFile());
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }
        Yaml yaml = new Yaml();
        Map<String,Object> map = (Map<String,Object>)yaml.load(input);
        return map;
    }
}

鏡像打包:

經過DockerFile將jar文件打包爲鏡像:vim

FROM java:latest
RUN mkdir -p /usr
RUN mkdir -p /usr/local
COPY . /usr/local/
WORKDIR /usr/local
EXPOSE 8892
ENTRYPOINT ["java","-jar","xxx.jar"]

//在DockerFile 文件目錄下建立鏡像
docker build -t kubeedge-mapper:v2.0 .
相關文章
相關標籤/搜索