物聯網時代-跟着Thingsboard學IOT架構-HTTP設備協議及API相關限制

thingsboard官網: thingsboard.io/html

thingsboard GitHub: github.com/thingsboard…java

thingsboard提供的體驗地址: demo.thingsboard.io/git

BY Thingsboard teamgithub

如下內容是在原文基礎上演繹的譯文。除非另行註明,頁面上全部內容採用知識共享-署名(CC BY 2.5 AU)協議共享。web

原文地址: ThingsBoard API參考:HTTP設備APIspring


HTTP

協議介紹

HTTP是可用於IoT應用程序的通用網絡協議。您能夠在此處找到有關HTTP的更多信息。HTTP協議是基於TCP的,並使用請求 - 響應模型。固然它的缺點也極爲明顯,HTTP對於嵌入式設備來講過重了,也不靈活。apache

協議特色

http

  1. 支持客戶/服務器模式。
  2. 簡單快速: 客戶向服務器請求服務時,只需傳送請求方法和路徑。請求方法經常使用的有GET、PUT、POST。每種方法規定了客戶與服務器聯繫的類型不一樣。因爲HTTP協議簡單,使得HTTP服務器的程序規模小,所以通訊速度很快。
  3. 靈活: HTTP容許傳輸任意類型的數據對象。正在傳輸的類型由Content-Type加以標記。
  4. 無鏈接:無鏈接的含義是限制每次鏈接只處理一個請求。服務器處理完客戶的請求,並收到客戶的應答後,即斷開鏈接。採用這種方式能夠節省傳輸時間。
  5. 無狀態:HTTP協議是無狀態協議。無狀態是指協議對於事務處理沒有記憶能力。缺乏狀態意味着若是後續處理須要前面的信息,則它必須重傳,這樣可能致使每次鏈接傳送的數據量增大。另外一方面,在服務器不須要先前信息時它的應答就較快。

客戶端設置

Thingsboard的HTTP傳輸協議架構

由於Thingsboard最新release,是基於微服務架構,不利用單獨理解代碼。json

**Thingsboard CoAP設備傳輸協議源代碼:**github.com/thingsboard…api

本文基於上面源代碼後,剔除相關的安全驗證和處理以後搭建簡易的講解項目:緩存

github.com/sanshengshu…


Spring Boot框架

Thingsboard的HTTP設備傳輸協議是基於Spring Boot

Spring Boot 是 Spring 的子項目,正如其名字,提供 Spring 的引導( Boot )的功能。

經過 Spring Boot ,咱們開發者能夠快速配置 Spring 項目,引入各類 Spring MVC、Spring Transaction、Spring AOP、MyBatis 等等框架,而無需不斷重複編寫繁重的 Spring 配置,下降了 Spring 的使用成本。

猶記當年,Spring XML 爲主的時代,大晚上各類搜索 Spring 的配置,苦不堪言。如今有了 Spring Boot 以後,生活真美好。

Spring Boot 提供了各類 Starter 啓動器,提供標準化的默認配置。例如:

而且,Spring Boot 基本已經一統 Java 項目的開發,大量的開源項目都實現了其的 Starter 啓動器。例如:

項目解讀

項目結構

├── java
│   └── com
│       └── sanshengshui
│           └── http
│               ├── controller
│               │   └── DeviceApiController.java	// 設備傳輸API接口
│               ├── HttpApiServer.java	//項目啓動主類
│               └── quota	//API限制類包
│                   ├── AbstractQuotaService.java	//抽象限制服務類
│                   ├── Clock.java		//時鐘類
│                   ├── host
│                   │   ├── HostIntervalRegistryCleaner.java	//主機API清理器
│                   │   ├── HostIntervalRegistryLogger.java	 //主機API記錄器
│                   │   ├── HostRequestIntervalRegistry.java	//主機API請求註冊表
│                   │   ├── HostRequestLimitPolicy.java	 //主機API請求限制條件
│                   │   └── HostRequestsQuotaService.java	 //主機請求限制開關
│                   ├── inmemory
│                   │   ├── IntervalCount.java	 //間歇計數
│                   │   ├── IntervalRegistryCleaner.java	//時間間隔內註冊表清理器
│                   │   ├── IntervalRegistryLogger.java 	//時間間隔內註冊表記錄器
│                   │   └── KeyBasedIntervalRegistry.java	 //基礎API請求邏輯
│                   ├── QuotaService.java	//限制服務類
│                   └── RequestLimitPolicy.java //請求限制策略
└── resources
    └── application.yml

複製代碼

項目代碼

引入依賴

<dependencies>
        <dependency>
            <groupId>com.sanshengshui</groupId>
            <artifactId>IOT-Guide-TSL</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
        </dependency>
</dependencies>
複製代碼
  • Spring Boot提供的web框架基於Tomcat,能夠經過引入spring-boot-starter-web來配置依賴關係。
  • commons-lang3guava用於API請求限制服務。

參數配置

server:
 port: 8080


http:
 request_timeout: "${HTTP_REQUEST_TIMEOUT:60000}"


quota:
 host:
 limit: "${QUOTA_HOST_LIMIT:10}"
 intervalMs: "${QUOTA_HOST_INTERVAL_MS:60000}"
 ttlMs: "${QUOTA_HOST_TTL_MS:60000}"
 cleanPeriodMs: "${QUOTA_HOST_CLEAN_PERIOD_MS:300000}"
 enabled: "${QUOTA_HOST_ENABLED:true}"
 whitelist: "${QUOTA_HOST_WHITELIST:localhost,127.0.0.1}"
 blacklist: "${QUOTA_HOST_BLACKLIST:}"
 log:
 topSize: 10
 intervalMin: 2
複製代碼
  • server.port: 8080: 服務器啓動綁定的端口,缺省狀況下是:8080。
  • http.request_timeout : 請求超時時間,此處設定爲60000。
  • quota.host.limitquota.host.intervalMs: 分別爲API請求限額數和單位時間。此處爲了驗證方便,設定爲10次和60s,即60s內API請求限額數爲10次。
  • quota.host.cleanPeriodMsquota.host.ttlMs : 分別爲清理週期時間和TTL時間。
  • quota.host.enabledquota.host.whitelistquota.host.blacklist分別表示API請求開關、白名單及黑名單。
  • quota.host.log.topSizequota.host.log.intervalMin: 指的是高速緩存中的(近似)最大條目數和間隔時間。

API限制服務類

KeyBasedIntervalRegistry:基礎API請求邏輯

package com.sanshengshui.http.quota.inmemory;

import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/** * @author james mu * @date 2019/8/10 下午4:50 */
@Slf4j
public class KeyBasedIntervalRegistry {

    private final Map<String, IntervalCount> hostCounts = new ConcurrentHashMap<>();
    private final long intervalDurationMs;
    private final long ttlMs;
    private final Set<String> whiteList;
    private final Set<String> blackList;

    public KeyBasedIntervalRegistry(long intervalDurationMs, long ttlMs, String whiteList, String blackList, String name) {
        this.intervalDurationMs = intervalDurationMs;
        this.ttlMs = ttlMs;
        this.whiteList = Sets.newHashSet(StringUtils.split(whiteList, ','));
        this.blackList = Sets.newHashSet(StringUtils.split(blackList, ','));

    }

    private void validate(String name) {
        if (ttlMs < intervalDurationMs) {
            log.warn("TTL for {} IntervalRegistry [{}] smaller than interval duration [{}]", name, ttlMs, intervalDurationMs);
        }
        log.info("Start {} KeyBasedIntervalRegistry with whitelist {}", name, whiteList);
        log.info("Start {} KeyBasedIntervalRegistry with blacklist {}", name, blackList);
    }

    public long tick(String clientHostId) {
        IntervalCount intervalCount = hostCounts.computeIfAbsent(clientHostId, s -> new IntervalCount(intervalDurationMs));
        long currentCount = intervalCount.resetIfExpiredAndTick();
        if (whiteList.contains(clientHostId)) {
            return 0;
        } else if (blackList.contains(clientHostId)) {
            return Long.MAX_VALUE;
        }
        return currentCount;
    }

    public void clean() {
        hostCounts.entrySet().removeIf(entry -> entry.getValue().silenceDuration() > ttlMs);
    }

    public Map<String, Long> getContent() {
        return hostCounts.entrySet().stream()
                .collect(
                        Collectors.toMap(
                                Map.Entry:: getKey,
                                interval -> interval.getValue().getCount()
                        )
                );
    }
}

複製代碼
  • validate(string name): 要求ttlMs<intervalDurationMs,並打印出API請求的黑名單和白名單。

  • 第42行經過computeIfAbsent函數對map中不存在key時的處理,在這裏經過新建intervalCount(intervalDurationMs)的方式來處理。

  • 第43行經過intervalCount的resetIfExpiredAndTick()對時間間隔內進行計數。

  • 第44-48行經過判斷API請求客戶端地址是否在黑白名單中,若是在白名單,返回0,若是在黑名單中,返回Long.MAX_VALUE

  • clean()爲經過時間間隔內是否大於ttlMs來過濾集合中的元素。

  • getContent()爲遍歷hostCounts中的客戶端地址的IntervalCount。

IntervalCount: 間歇時間內計數

package com.sanshengshui.http.quota.inmemory;

import com.sanshengshui.http.quota.Clock;

import java.util.concurrent.atomic.LongAdder;

/** * @author james mu * @date 19-8-9 下午16:50 */
public class IntervalCount {

    private final LongAdder addr = new LongAdder();
    private final long intervalDurationMs;
    private volatile long startTime;
    private volatile long lastTickTime;

    public IntervalCount(long intervalDurationMs) {
        this.intervalDurationMs = intervalDurationMs;
        startTime = Clock.millis();
    }

    //計數或時間過時後重置時間
    public long resetIfExpiredAndTick(){
        if (isExpired()){
            reset();
        }
        tick();
        return addr.sum();
    }

   //計算已過期間
    public long silenceDuration() {
        return Clock.millis() - lastTickTime;
    }

    public long getCount() {
        return addr.sum();
    }

   //計數操做,累加一
    private void tick() {
        addr.add(1);
        lastTickTime = Clock.millis();
    }

   //重置計數時間
    private void reset() {
        addr.reset();
        lastTickTime = Clock.millis();
    }

//判斷間隔時間是否失效
    private boolean isExpired() {
        return (Clock.millis() - startTime) > intervalDurationMs;
    }

}

複製代碼

剩下的處理類,留給讀者去本身研究了!

  1. 主機API清理器: HostIntervalRegistryCleaner注入quota.host.cleanPeriodMs並繼承抽象類IntervalRegistryCleaner
  2. 主機API記錄器: HostIntervalRegistryLogger注入quota.host.log.topSizequota.host.log.intervalMin並繼承IntervalRegistryLogger
  3. 主機API請求註冊表: HostRequestIntervalRegistry注入quota.host.intervalMsquota.host.ttlMsquota.host.whitelistquota.host.blacklist並繼承KeyBasedIntervalRegistry
  4. 主機API請求限制條件: HostRequestLimitPolicy注入quota.host.limit並繼承RequestLimitPolicy
  5. 主機請求限制開關: HostRequestsQuotaService注入quota.host.enabled並繼承AbstractQuotaService

屬性API和遙測數據上傳API

@RestController
@RequestMapping("/api/v1")
@Slf4j
public class DeviceApiController {
    
    @Autowired(required = false)
    private HostRequestsQuotaService quotaService;//API限制服務類
    
    @RequestMapping(value = "/attributes",method = RequestMethod.POST)
    public DeferredResult<ResponseEntity> postDeviceAttributes( @RequestBody String json, HttpServletRequest request) {
        DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
        if (quotaExceeded(request, responseWriter)) {
            return responseWriter;
        }
        responseWriter.setResult(new ResponseEntity<>(HttpStatus.ACCEPTED));
        Set<AttributeKvEntry> attributeKvEntrySet = JsonConverter.convertToAttributes(new JsonParser().parse(json)).getAttributes();
        for (AttributeKvEntry attributeKvEntry : attributeKvEntrySet){
            System.out.println("屬性名="+attributeKvEntry.getKey()+" 屬性值="+attributeKvEntry.getValueAsString());
        }
        return responseWriter;
    }
    
    @RequestMapping(value = "/telemetry",method = RequestMethod.POST)
    public DeferredResult<ResponseEntity> postTelemetry(@RequestBody String json, HttpServletRequest request){
        DeferredResult<ResponseEntity> responseWriter = new DeferredResult<ResponseEntity>();
        if (quotaExceeded(request, responseWriter)) {
            return responseWriter;
        }
        responseWriter.setResult(new ResponseEntity(HttpStatus.ACCEPTED));
        Map<Long, List<KvEntry>> telemetryMaps = JsonConverter.convertToTelemetry(new JsonParser().parse(json)).getData();
        for (Map.Entry<Long,List<KvEntry>> entry : telemetryMaps.entrySet()) {
            System.out.println("key= " + entry.getKey());
            for (KvEntry kvEntry: entry.getValue()) {
                System.out.println("屬性名="+kvEntry.getKey()+ " 屬性值="+kvEntry.getValueAsString());
            }
        }
        return responseWriter;
    }
}
複製代碼

項目演示

遙測上傳API

要將遙測數據發佈到服務器節點,請將POST請求發送到如下URL:

http://localhost:8080/api/v1/telemetry
複製代碼

最簡單的支持數據格式是:

{"key1":"value1", "key2":"value2"}
複製代碼

要麼

[{"key1":"value1"}, {"key2":"value2"}]
複製代碼

請注意,在這種狀況下,服務器端時間戳將分配給上傳的數據!

若是您的設備可以獲取客戶端時間戳,您可使用如下格式:

{"ts":1451649600512, "values":{"key1":"value1", "key2":"value2"}}
複製代碼

在上面的示例中,咱們假設「1451649600512」是具備毫秒精度的unix時間戳。例如,值'1451649600512'對應於'Fri,2016年1月1日12:00:00.512 GMT'

例子:

curl -v -X POST -d "{"stringKey":"value1", "booleanKey":true, "doubleKey":42.0, "longKey":73}" http://localhost:8080/api/v1/telemetry --header "Content-Type:application/json"
複製代碼
C:\Users\james>curl -v -X POST -d "{"stringKey":"value1", "booleanKey":true, "doubleKey":42.0, "longKey":73}" http://localhost:8080/api/v1/telemetry --header "Content-Type:application/json"
Note: Unnecessary use of -X or --request, POST is already inferred.
*   Trying ::1...
* TCP_NODELAY set
* Connected to localhost (::1) port 8080 (#0)
> POST /api/v1/telemetry HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.55.1
> Accept: */*
> Content-Type:application/json
> Content-Length: 63
>
* upload completely sent off: 63 out of 63 bytes
< HTTP/1.1 202
< Content-Length: 0
< Date: Sun, 18 Aug 2019 16:16:07 GMT
<
* Connection #0 to host localhost left intact
複製代碼

結果:

key= 1566144967139
屬性名=stringKey 屬性值=value1
屬性名=booleanKey 屬性值=true
屬性名=doubleKey 屬性值=42.0
屬性名=longKey 屬性值=73
複製代碼

屬性API

屬性API容許設備

  • 將客戶端設備屬性上載到服務器。
  • 從服務器請求客戶端和共享設備屬性。

將屬性更新發布到服務器

要將客戶端設備屬性發布到ThingsBoard服務器節點,請將POST請求發送到如下URL:

http://localhost:8080/api/v1/attributes
複製代碼

例子:

curl -v -X POST -d "{"stringKey":"value1", "booleanKey":true, "doubleKey":42.0, "longKey":73}" http://localhost:8080/api/v1/attributes --header "Content-Type:application/json"
複製代碼
C:\Users\james>curl -v -X POST -d "{"stringKey":"value1", "booleanKey":true, "doubleKey":42.0, "longKey":73}" http://localhost:8080/api/v1/attributes --header "Content-Type:application/json"
Note: Unnecessary use of -X or --request, POST is already inferred.
*   Trying ::1...
* TCP_NODELAY set
* Connected to localhost (::1) port 8080 (#0)
> POST /api/v1/attributes HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.55.1
> Accept: */*
> Content-Type:application/json
> Content-Length: 63
>
* upload completely sent off: 63 out of 63 bytes
< HTTP/1.1 202
< Content-Length: 0
< Date: Sun, 18 Aug 2019 16:21:00 GMT
<
* Connection #0 to host localhost left intact
複製代碼

結果:

屬性名=stringKey 屬性值=value1
屬性名=booleanKey 屬性值=true
屬性名=doubleKey 屬性值=42.0
屬性名=longKey 屬性值=73
複製代碼

API限額服務

爲了演示方便,咱們設置60s內最多API請求測試爲10次,如今咱們使用遙測上傳API連續發起接口調用,咱們會發現以下的狀況出現:

屬性名=longKey 屬性值=73
屬性名=stringKey 屬性值=value1
屬性名=booleanKey 屬性值=true
屬性名=doubleKey 屬性值=42.0
屬性名=longKey 屬性值=73
2019-08-19 00:26:25.696  WARN 16332 --- [nio-8080-exec-1] c.s.http.controller.DeviceApiController  : REST Quota exceeded for [0:0:0:0:0:0:0:1] . Disconnect
2019-08-19 00:26:26.402  WARN 16332 --- [nio-8080-exec-2] c.s.http.controller.DeviceApiController  : REST Quota exceeded for [0:0:0:0:0:0:0:1] . Disconnect
複製代碼

這說明了咱們的API限額服務起了做用,固然你也能夠測試黑白名單等功能。

當在真實狀況下,一般的API限額會很大,我這裏提供了一個gatling自動化測試來提供接口測試。地址爲:github.com/sanshengshu…

關於gatling的其餘信息,你們能夠參考:

到此,物聯網時代,相信你們對IOT架構下的HTTP協議和API相關限制有所瞭解了,感謝你們的閱讀!

相關文章
相關標籤/搜索