在使用阿里雲官方IoT JAVA Device SDK鏈接雲端測試的時候,發現日誌老是會打印一些莫名其妙Topic消息的訂閱和發佈,可是用戶並無操做這些Topic,這是由於SDK底層默認作了不少系統Topic的訂閱和發佈設置,且沒法關閉,致使不少測試不能知足預期的測試指望。若是不但願一些系統Topic的默認訂閱和發佈,建議可使用開源MQTT Client進行Topic消息的訂閱和發佈。html
一、建立產品和設備java
參考:阿里雲物聯網平臺Qucik Start 建立產品和設備部分。app
二、pom.xmleclipse
<dependencies> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>23.0</version> </dependency> </dependencies>
三、工具類 AliyunIoTSignUtiltcp
import javax.crypto.Mac; import javax.crypto.SecretKey; import javax.crypto.spec.SecretKeySpec; import java.util.Arrays; import java.util.Map; /** * AliyunIoTSignUtil */ public class AliyunIoTSignUtil { public static String sign(Map<String, String> params, String deviceSecret, String signMethod) { //將參數Key按字典順序排序 String[] sortedKeys = params.keySet().toArray(new String[] {}); Arrays.sort(sortedKeys); //生成規範化請求字符串 StringBuilder canonicalizedQueryString = new StringBuilder(); for (String key : sortedKeys) { if ("sign".equalsIgnoreCase(key)) { continue; } canonicalizedQueryString.append(key).append(params.get(key)); } try { String key = deviceSecret; return encryptHMAC(signMethod,canonicalizedQueryString.toString(), key); } catch (Exception e) { throw new RuntimeException(e); } } /** * HMACSHA1加密 * */ public static String encryptHMAC(String signMethod,String content, String key) throws Exception { SecretKey secretKey = new SecretKeySpec(key.getBytes("utf-8"), signMethod); Mac mac = Mac.getInstance(secretKey.getAlgorithm()); mac.init(secretKey); byte[] data = mac.doFinal(content.getBytes("utf-8")); return bytesToHexString(data); } public static final String bytesToHexString(byte[] bArray) { StringBuffer sb = new StringBuffer(bArray.length); String sTemp; for (int i = 0; i < bArray.length; i++) { sTemp = Integer.toHexString(0xFF & bArray[i]); if (sTemp.length() < 2) { sb.append(0); } sb.append(sTemp.toUpperCase()); } return sb.toString(); } }
四、main方法ide
import com.alibaba.taro.AliyunIoTSignUtil; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import java.util.HashMap; import java.util.Map; public class IoTDemoPubSubDemo { public static String productKey = "********"; public static String deviceName = "OpenMQTTDevice"; public static String deviceSecret = "********"; public static String regionId = "cn-shanghai"; // 物模型-屬性上報topic private static String pubTopic = "/sys/" + productKey + "/" + deviceName + "/thing/event/property/post"; // 自定義topic,在產品Topic列表位置定義 private static String subTopic = "/"+productKey + "/" + deviceName+"/user/newdatademo"; private static MqttClient mqttClient; public static void main(String [] args){ initAliyunIoTClient(); // ScheduledExecutorService scheduledThreadPool = new ScheduledThreadPoolExecutor(1, // new ThreadFactoryBuilder().setNameFormat("thread-runner-%d").build()); // // scheduledThreadPool.scheduleAtFixedRate(()->postDeviceProperties(), 10,10, TimeUnit.SECONDS); // 彙報屬性 postDeviceProperties(); try { mqttClient.subscribe(subTopic); // 訂閱Topic } catch (MqttException e) { System.out.println("error:" + e.getMessage()); e.printStackTrace(); } // 設置訂閱監聽 mqttClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable throwable) { System.out.println("connection Lost"); } @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { System.out.println("Sub message"); System.out.println("Topic : " + s); System.out.println(new String(mqttMessage.getPayload())); //打印輸出消息payLoad } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { } }); } /** * 初始化 Client 對象 */ private static void initAliyunIoTClient() { try { // 構造鏈接須要的參數 String clientId = "java" + System.currentTimeMillis(); Map<String, String> params = new HashMap<>(16); params.put("productKey", productKey); params.put("deviceName", deviceName); params.put("clientId", clientId); String timestamp = String.valueOf(System.currentTimeMillis()); params.put("timestamp", timestamp); // cn-shanghai String targetServer = "tcp://" + productKey + ".iot-as-mqtt."+regionId+".aliyuncs.com:1883"; String mqttclientId = clientId + "|securemode=3,signmethod=hmacsha1,timestamp=" + timestamp + "|"; String mqttUsername = deviceName + "&" + productKey; String mqttPassword = AliyunIoTSignUtil.sign(params, deviceSecret, "hmacsha1"); connectMqtt(targetServer, mqttclientId, mqttUsername, mqttPassword); } catch (Exception e) { System.out.println("initAliyunIoTClient error " + e.getMessage()); } } public static void connectMqtt(String url, String clientId, String mqttUsername, String mqttPassword) throws Exception { MemoryPersistence persistence = new MemoryPersistence(); mqttClient = new MqttClient(url, clientId, persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); // MQTT 3.1.1 connOpts.setMqttVersion(4); connOpts.setAutomaticReconnect(false); connOpts.setCleanSession(true); connOpts.setUserName(mqttUsername); connOpts.setPassword(mqttPassword.toCharArray()); connOpts.setKeepAliveInterval(60); mqttClient.connect(connOpts); } /** * 彙報屬性 */ private static void postDeviceProperties() { try { //上報數據 //高級版 物模型-屬性上報payload System.out.println("上報屬性值"); String payloadJson = "{\"params\":{\"Status\":0,\"Data\":\"15\"}}"; MqttMessage message = new MqttMessage(payloadJson.getBytes("utf-8")); message.setQos(1); mqttClient.publish(pubTopic, message); } catch (Exception e) { System.out.println(e.getMessage()); } } }
五、運行測試狀況
工具
基於開源MQTT自主接入阿里雲IoT平臺(Java)
MQTT-TCP鏈接通訊post
原文連接測試
本文爲雲棲社區原創內容,未經容許不得轉載。ui