分佈式項目(三)CoAp client and server

上回說到Web manage的構建,完成的對產品,物模型中的屬於數據,設備數據,並把對應的數據緩存到redis中,接下來就開始coap客戶端和服務器的構建。html

coap

現階段PC網絡交互中較多的是使用tcp和http協議,但物聯網設備都要求較小的功耗、較小的帶寬,而且CPU、內存都是有限的,因此在這種需求下,http相對就不實用了,由於http相對臃腫,而CoAP是受限制的應用協議的代名詞,CoAp和http同樣都是應用層的協議,而且它是一種類http協議,相同的如請求響應模式,url方式,請求方法(作了縮減),響應碼(作了簡化),不一樣的是CoAp是基於UDP的,能夠雙向通訊(既是客戶端又是服務端),而且CoAp協議很是的小,最小的數據包僅爲4k。git

coap報文

  • Var:版本編號redis

  • T:報文類型,coap定義了4種報文類型spring

    1. CON:須要確認的消息,可實現可靠性傳輸
    2. NON:不要確認的消息,消息傳輸不可靠
    3. ACK:確認應到消息,與CON對象
    4. RST:復位,要求消息重傳
  • TKL:標識符長度,CoAp定義了兩種標識符,Message Id(必須)和Token(非必須)json

  • Code:響應碼,如4.04,5.00,和http中的40四、500功能相似緩存

  • Message Id:報文編號服務器

  • Token:標識符具體的內容網絡

  • Option:可選選項參數,一個或多個,可設置Uri-Host、Uri-Port、Uri-Path和Uri-Query,CoAp對Option定了3部分架構

    1. Delta:當前Option的編號,等於前面全部Option delta的總和
    2. Length:表示value部分具體的長度
    3. value:當前Option承載的具體內容
  • 1111 1111B:報文與所承載的數據分隔符併發

  • Payload:所承載的數據實體

請求方法

請求方法和http相似,coap定義了4種請求方式

  • get:獲取資源
  • post:建立資源
  • put:更新資源
  • delete:刪除資源

coap數據格式

和http相似,用戶定義所承載的數據的具體格式,如text/html,application/json

上面就是對coap作了一個簡單的介紹,對coap協議有個大概的瞭解,接下來就開始對client和server的編碼了,固然筆者這裏也不可能本身寫一個對coap的實現,筆者這裏使用的是californium-core。

californium-core client

構建iot-coap-client模塊,加入californium-core依賴

<dependency>
            <groupId>org.eclipse.californium</groupId>
            <artifactId>californium-core</artifactId>
            <version>2.0.0-M14</version>
        </dependency>

這裏咱們用定時任務來模擬物理網設備數據的定時發送

建立Scheduler類,因咱們定了byte和json兩種數據格式,因此這裏編寫sendByte()和sendJson(),iot-pt設定一類物理設備只能發送一種數據格式,但這裏爲了方便,筆者就使用具體發送數據格式的方法來模擬一種具體的設備,使用隨機數來模擬設備的數據變化。

private Random ra = new Random();
    @Scheduled(fixedRate = 2000)
    public void sendByte() throws URISyntaxException {
        //建立請求資源
        URI uri = new URI("coap://localhost:5683/iot-byte?201904151718");
        CoapClient client = new CoapClient(uri);

        StringBuilder sb = new StringBuilder();
        sb.append(ra.nextInt(999)%(999-100+1)+100);
        sb.append(new BigDecimal(ra.nextDouble()).setScale(2,
		BigDecimal.ROUND_HALF_UP));
        //請求資源
        CoapResponse response = client.post(sb.toString(),
		MediaTypeRegistry.TEXT_PLAIN);
        if(response !=null){
            System.out.println(response.getCode());  //請求狀態碼
            System.out.println(response.getOptions());  //選項參數
            System.out.println(response.getResponseText());  //內容文本信息
            System.out.println(Utils.prettyPrint(response));  //報文內容
        }
    }
@Scheduled(fixedRate = 4000)
    public void sendJson() throws URISyntaxException {
	//建立請求資源,201904151718 設備惟一編碼,模擬imie
        URI uri =  new URI("coap://localhost:5683/iot-json?2019041717"); 
        CoapClient client = new CoapClient(uri);
        //溫度
        int temperature = ra.nextInt(999)%(999-100+1)+100;
        //溼度
        String humidity = String.valueOf(new BigDecimal(ra.nextDouble())
		.setScale(2, 
		BigDecimal.ROUND_HALF_UP));
        Map map = new HashMap<String,String>();
        map.put("T",String.valueOf(temperature));
        map.put("H",humidity);
        String json = JSONObject.toJSONString(map);
        client.post(json,MediaTypeRegistry.APPLICATION_JSON);
    }

Copa發送數據的客戶端已經寫好了,就下了開始server的擼碼

californium-core Server

首先仍是構建iot-coap-server模塊,加入californium-core依賴

  1. 建立服務器啓動,ServerStart類
[@Component](https://my.oschina.net/u/3907912)
public class ServerStart {
    @Value("${coap.port}")
    private int port;
    @Autowired
    private IotByteHandler iotHandler;
    @Autowired
    private IotJsonHandler iotJsonHandler;
    public void start(){
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                CoapServer server = new CoapServer(port);
                server.add(iotHandler);
                server.add(iotJsonHandler);
                server.start();
            }
        });
        thread.start();
    }
}

由於我這裏使用的是spring boot 的核心組件,spring boot啓動完成後因爲沒有應用線程運行,因此項目jvm會自動退出,由於這裏使用Thread線程來啓動CoapServer,CoapServer會一直監聽消息接受,jvm守護進程就不會退出。

接下來編寫IotByteHandler和IotJsonHandler,這種Handler的實現方式和netty有點相似。

@Component
public class IotByteHandler extends CoapResource {

    public IotByteHandler(@Value("${coap.iot.byte}") String name) {
        super(name);
    }

    @Override
    public void handlePOST(CoapExchange exchange) {
        //狀態碼
        System.out.println("code---"+exchange.getRequestCode());
        //選項參數
        System.out.println("Options---"+exchange.getRequestOptions());
        //文本內容
        System.out.println("text"+exchange.getRequestText());

        System.out.println(exchange.getRequestOptions());

    }
}
@Component
public class IotJsonHandler extends CoapResource {
    public IotJsonHandler(@Value("${coap.iot.json}") String name) {
        super(name);
    }

    @Override
    public void handlePOST(CoapExchange exchange) {
        System.out.println("option---"+exchange.getRequestOptions());
        System.out.println("json---" + exchange.getRequestText());
    }
}

spring boot runner 啓動coapServer

@Component
public class CoapApplicationRunner implements ApplicationRunner {
    @Autowired
    private ServerStart serverStart;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        serverStart.start();
    }
}

接着啓動CoapServer和CoapClient,看數據是否符合咱們預約的格式發送過來了

ok,數據以及發到服務器了,安裝咱們的架構設備,CoApServer須要把數據整理併發送到kafka的,所以kafka不少地方都須要使用,因此在這裏獨立構建一個kafka模塊,iot-kafka。

KafkaSource負責把協議服務收到的消息發送給kafak

@Component
public class KafkaSource {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void send(KafkaSourceVO vo){
        kafkaTemplate.send(SOURCE_TOPIC,JSONObject.toJSONString(vo));
    }
}

修改CoApServer的IotByteHandler和IotJsonHandler,加入kafka寫消息

@Component
public class IotByteHandler extends CoapResource {

    @Autowired
    private KafkaSource kafkaSource;

    public IotByteHandler(@Value("${coap.iot.byte}") String name) {
        super(name);
    }

    @Override
    public void handlePOST(CoapExchange exchange) {
        //狀態碼
        System.out.println("code---"+exchange.getRequestCode());
        //選項參數
        System.out.println("Options---"+exchange.getRequestOptions());
        //文本內容
        System.out.println("text"+exchange.getRequestText());

        System.out.println(exchange.getRequestOptions());

        KafkaSourceVO vo = new KafkaSourceVO(exchange.getRequestOptions().
		getUriQuery().get(0),exchange.getRequestText(),new Date());
        kafkaSource.send(vo);
        exchange.respond(CoAP.ResponseCode.CONTENT,"ok");
    }
}
@Component
public class IotJsonHandler extends CoapResource {
    @Autowired
    private KafkaSource kafkaSource;

    public IotJsonHandler(@Value("${coap.iot.json}") String name) {
        super(name);
    }

    @Override
    public void handlePOST(CoapExchange exchange) {
        KafkaSourceVO vo = new KafkaSourceVO(exchange.getRequestOptions().
		getUriQuery().get(0),exchange.g
etRequestText(),new Date());
        kafkaSource.send(vo);
        exchange.respond(CoAP.ResponseCode.CONTENT,"ok");
    }
}
public class KafkaSourceVO {
    //設備惟一碼
    private String imei;
    //數據
    private String data;
    //這裏用服務接送到消息的時間模擬設備採集數據的時間
    private Date collTime;

再次啓動CoApServer和CoApClient,驗證是否把數據寫如kafka了。

結束語

接下來就是Mapping Server的實現了,請聽下回分解,具體的代碼細節在git

https://gitee.com/distant/iot-pt.git

相關文章
相關標籤/搜索