上回說到Web manage的構建,完成的對產品,物模型中的屬於數據,設備數據,並把對應的數據緩存到redis中,接下來就開始coap客戶端和服務器的構建。html
現階段PC網絡交互中較多的是使用tcp和http協議,但物聯網設備都要求較小的功耗、較小的帶寬,而且CPU、內存都是有限的,因此在這種需求下,http相對就不實用了,由於http相對臃腫,而CoAP是受限制的應用協議的代名詞,CoAp和http同樣都是應用層的協議,而且它是一種類http協議,相同的如請求響應模式,url方式,請求方法(作了縮減),響應碼(作了簡化),不一樣的是CoAp是基於UDP的,能夠雙向通訊(既是客戶端又是服務端),而且CoAp協議很是的小,最小的數據包僅爲4k。git
Var:版本編號redis
T:報文類型,coap定義了4種報文類型spring
TKL:標識符長度,CoAp定義了兩種標識符,Message Id(必須)和Token(非必須)json
Code:響應碼,如4.04,5.00,和http中的40四、500功能相似緩存
Message Id:報文編號服務器
Token:標識符具體的內容網絡
Option:可選選項參數,一個或多個,可設置Uri-Host、Uri-Port、Uri-Path和Uri-Query,CoAp對Option定了3部分架構
1111 1111B:報文與所承載的數據分隔符併發
Payload:所承載的數據實體
請求方法和http相似,coap定義了4種請求方式
和http相似,用戶定義所承載的數據的具體格式,如text/html,application/json
上面就是對coap作了一個簡單的介紹,對coap協議有個大概的瞭解,接下來就開始對client和server的編碼了,固然筆者這裏也不可能本身寫一個對coap的實現,筆者這裏使用的是californium-core。
構建iot-coap-client模塊,加入californium-core依賴
<dependency> <groupId>org.eclipse.californium</groupId> <artifactId>californium-core</artifactId> <version>2.0.0-M14</version> </dependency>
這裏咱們用定時任務來模擬物理網設備數據的定時發送
建立Scheduler類,因咱們定了byte和json兩種數據格式,因此這裏編寫sendByte()和sendJson(),iot-pt設定一類物理設備只能發送一種數據格式,但這裏爲了方便,筆者就使用具體發送數據格式的方法來模擬一種具體的設備,使用隨機數來模擬設備的數據變化。
private Random ra = new Random(); @Scheduled(fixedRate = 2000) public void sendByte() throws URISyntaxException { //建立請求資源 URI uri = new URI("coap://localhost:5683/iot-byte?201904151718"); CoapClient client = new CoapClient(uri); StringBuilder sb = new StringBuilder(); sb.append(ra.nextInt(999)%(999-100+1)+100); sb.append(new BigDecimal(ra.nextDouble()).setScale(2, BigDecimal.ROUND_HALF_UP)); //請求資源 CoapResponse response = client.post(sb.toString(), MediaTypeRegistry.TEXT_PLAIN); if(response !=null){ System.out.println(response.getCode()); //請求狀態碼 System.out.println(response.getOptions()); //選項參數 System.out.println(response.getResponseText()); //內容文本信息 System.out.println(Utils.prettyPrint(response)); //報文內容 } }
@Scheduled(fixedRate = 4000) public void sendJson() throws URISyntaxException { //建立請求資源,201904151718 設備惟一編碼,模擬imie URI uri = new URI("coap://localhost:5683/iot-json?2019041717"); CoapClient client = new CoapClient(uri); //溫度 int temperature = ra.nextInt(999)%(999-100+1)+100; //溼度 String humidity = String.valueOf(new BigDecimal(ra.nextDouble()) .setScale(2, BigDecimal.ROUND_HALF_UP)); Map map = new HashMap<String,String>(); map.put("T",String.valueOf(temperature)); map.put("H",humidity); String json = JSONObject.toJSONString(map); client.post(json,MediaTypeRegistry.APPLICATION_JSON); }
Copa發送數據的客戶端已經寫好了,就下了開始server的擼碼
首先仍是構建iot-coap-server模塊,加入californium-core依賴
[@Component](https://my.oschina.net/u/3907912) public class ServerStart { @Value("${coap.port}") private int port; @Autowired private IotByteHandler iotHandler; @Autowired private IotJsonHandler iotJsonHandler; public void start(){ Thread thread = new Thread(new Runnable() { @Override public void run() { CoapServer server = new CoapServer(port); server.add(iotHandler); server.add(iotJsonHandler); server.start(); } }); thread.start(); } }
由於我這裏使用的是spring boot 的核心組件,spring boot啓動完成後因爲沒有應用線程運行,因此項目jvm會自動退出,由於這裏使用Thread線程來啓動CoapServer,CoapServer會一直監聽消息接受,jvm守護進程就不會退出。
接下來編寫IotByteHandler和IotJsonHandler,這種Handler的實現方式和netty有點相似。
@Component public class IotByteHandler extends CoapResource { public IotByteHandler(@Value("${coap.iot.byte}") String name) { super(name); } @Override public void handlePOST(CoapExchange exchange) { //狀態碼 System.out.println("code---"+exchange.getRequestCode()); //選項參數 System.out.println("Options---"+exchange.getRequestOptions()); //文本內容 System.out.println("text"+exchange.getRequestText()); System.out.println(exchange.getRequestOptions()); } }
@Component public class IotJsonHandler extends CoapResource { public IotJsonHandler(@Value("${coap.iot.json}") String name) { super(name); } @Override public void handlePOST(CoapExchange exchange) { System.out.println("option---"+exchange.getRequestOptions()); System.out.println("json---" + exchange.getRequestText()); } }
spring boot runner 啓動coapServer
@Component public class CoapApplicationRunner implements ApplicationRunner { @Autowired private ServerStart serverStart; @Override public void run(ApplicationArguments args) throws Exception { serverStart.start(); } }
接着啓動CoapServer和CoapClient,看數據是否符合咱們預約的格式發送過來了
ok,數據以及發到服務器了,安裝咱們的架構設備,CoApServer須要把數據整理併發送到kafka的,所以kafka不少地方都須要使用,因此在這裏獨立構建一個kafka模塊,iot-kafka。
KafkaSource負責把協議服務收到的消息發送給kafak
@Component public class KafkaSource { @Autowired private KafkaTemplate kafkaTemplate; public void send(KafkaSourceVO vo){ kafkaTemplate.send(SOURCE_TOPIC,JSONObject.toJSONString(vo)); } }
修改CoApServer的IotByteHandler和IotJsonHandler,加入kafka寫消息
@Component public class IotByteHandler extends CoapResource { @Autowired private KafkaSource kafkaSource; public IotByteHandler(@Value("${coap.iot.byte}") String name) { super(name); } @Override public void handlePOST(CoapExchange exchange) { //狀態碼 System.out.println("code---"+exchange.getRequestCode()); //選項參數 System.out.println("Options---"+exchange.getRequestOptions()); //文本內容 System.out.println("text"+exchange.getRequestText()); System.out.println(exchange.getRequestOptions()); KafkaSourceVO vo = new KafkaSourceVO(exchange.getRequestOptions(). getUriQuery().get(0),exchange.getRequestText(),new Date()); kafkaSource.send(vo); exchange.respond(CoAP.ResponseCode.CONTENT,"ok"); } }
@Component public class IotJsonHandler extends CoapResource { @Autowired private KafkaSource kafkaSource; public IotJsonHandler(@Value("${coap.iot.json}") String name) { super(name); } @Override public void handlePOST(CoapExchange exchange) { KafkaSourceVO vo = new KafkaSourceVO(exchange.getRequestOptions(). getUriQuery().get(0),exchange.g etRequestText(),new Date()); kafkaSource.send(vo); exchange.respond(CoAP.ResponseCode.CONTENT,"ok"); } }
public class KafkaSourceVO { //設備惟一碼 private String imei; //數據 private String data; //這裏用服務接送到消息的時間模擬設備採集數據的時間 private Date collTime;
再次啓動CoApServer和CoApClient,驗證是否把數據寫如kafka了。
接下來就是Mapping Server的實現了,請聽下回分解,具體的代碼細節在git