上回說道CoAp client和server的實現,數據也按照既定格式發送到了kafka中,接下來就是Mapping server的實現,物理設備數據映射到抽象設備上,並賦予數據業務含義。git
構建iot-mapping模塊,引入kafka公共模塊web
SourceListener監聽Coap server 發送的原始數據,並從redis中取出web manage緩存的產品物模型和設備數據,因爲redis公用的比較多,因此這也構建一個iot-redis模塊,用與redis操做。redis
@Autowired private BaseRedisUtil redisUtil; @KafkaListener(topics = SOURCE_TOPIC) public void iotListener(String msg){ System.out.println("-----------"+msg); KafkaSourceVO sourceVO = JSONObject.parseObject(msg, KafkaSourceVO.class); //設備信息 RedisDeviceVO deviceVO = redisUtil.get(sourceVO.getImei()); //產品信息 RedisProductVO productVO = redisUtil.get(deviceVO.getProductId()); if (EDataFormat.BYTE.getFormat().equals(productVO.getFormat())){ analysisByte(sourceVO,productVO,deviceVO); }else if (EDataFormat.JSON.getFormat().equals( productVO.getFormat())){ analysisJson(sourceVO,productVO,deviceVO); } }
public void analysisByte(KafkaSourceVO sourceVO, RedisProductVO productVO,RedisDeviceVO deviceVO){ char[] chars = sourceVO.getData().toCharArray(); List<RedisPropertyVO> propertys = productVO.getPropertys(); KafkaDownVO downVO = new KafkaDownVO(); downVO.setDeviceId(deviceVO.getId()); downVO.setCollTime(sourceVO.getCollTime()); List<KafkaDownVO.PropertyData> propertyDatas = new ArrayList<>(propertys.size()); propertys.forEach(property->{ String[] str = property.getOfset().split("-"); int begin = Integer.valueOf(str[0]); int end = Integer.valueOf(str[1]); KafkaDownVO.PropertyData data = new KafkaDownVO.PropertyData(); data.setPropertyId(property.getId()); StringBuilder sb = new StringBuilder(); for (int i = begin;i <= end; i++){ sb.append(chars[i]); } data.setData(sb.toString()); propertyDatas.add(data); }); downVO.setDataList(propertyDatas); System.out.println("byte---"+downVO); }
public void analysisJson(KafkaSourceVO sourceVO, RedisProductVO productVO,RedisDeviceVO deviceVO){ Map<String,Long> propertyMap = productVO.getPropertys(). stream().collect(Collectors.toMap(RedisPropertyVO :: getOfset,RedisPropertyVO::getId)); Map<String,String> dataMap = JSONObject.parseObject( sourceVO.getData(), HashMap.class); KafkaDownVO downVO = new KafkaDownVO(); downVO.setDeviceId(deviceVO.getId()); downVO.setCollTime(sourceVO.getCollTime()); List<KafkaDownVO.PropertyData> propertyDatas = new ArrayList<>(dataMap.size()); dataMap.forEach((key,val)->{ KafkaDownVO.PropertyData data = new KafkaDownVO.PropertyData(); data.setPropertyId(propertyMap.get(key)); data.setData(val); propertyDatas.add(data); }); downVO.setDataList(propertyDatas); System.out.println("json---"+downVO); }
啓動項目,檢驗一下數據是否封裝正確json
按iot-pt架構設計,如今須要把映射好的數據,再次寫入kakfa中,供訂閱費服務使用緩存
在iot-kafka模塊中添加對Mapping 數據的寫入架構
[@Component](https://my.oschina.net/u/3907912) public class KafkaMapping { @Autowired private KafkaTemplate kafkaTemplate; public void send(KafkaDownVO downVO){ String json = JSONObject.toJSONString(downVO); kafkaTemplate.send(DOWN_TOPIC,json); } }
修改analysisByte()和analysisJson()app
public void analysisByte(KafkaSourceVO sourceVO, RedisProductVO productVO,RedisDeviceVO deviceVO){ char[] chars = sourceVO.getData().toCharArray(); List<RedisPropertyVO> propertys = productVO.getPropertys(); KafkaDownVO downVO = new KafkaDownVO(); downVO.setDeviceId(deviceVO.getId()); downVO.setCollTime(sourceVO.getCollTime()); List<KafkaDownVO.PropertyData> propertyDatas = new ArrayList<>(propertys.size()); propertys.forEach(property->{ String[] str = property.getOfset().split("-"); int begin = Integer.valueOf(str[0]); int end = Integer.valueOf(str[1]); KafkaDownVO.PropertyData data = new KafkaDownVO.PropertyData(); data.setPropertyId(property.getId()); StringBuilder sb = new StringBuilder(); for (int i = begin;i <= end; i++){ sb.append(chars[i]); } data.setData(sb.toString()); propertyDatas.add(data); }); downVO.setDataList(propertyDatas); kafkaMapping.send(downVO); }
public void analysisJson(KafkaSourceVO sourceVO, RedisProductVO productVO,RedisDeviceVO deviceVO){ Map<String,Long> propertyMap = productVO.getPropertys(). stream().collect(Collectors.toMap(RedisPropertyVO :: getOfset,RedisPropertyVO::getId)); Map<String,String> dataMap = JSONObject.parseObject( sourceVO.getData(), HashMap.class); KafkaDownVO downVO = new KafkaDownVO(); downVO.setDeviceId(deviceVO.getId()); downVO.setCollTime(sourceVO.getCollTime()); List<KafkaDownVO.PropertyData> propertyDatas = new ArrayList<>(dataMap.size()); dataMap.forEach((key,val)->{ KafkaDownVO.PropertyData data = new KafkaDownVO.PropertyData(); data.setPropertyId(propertyMap.get(key)); data.setData(val); propertyDatas.add(data); }); downVO.setDataList(propertyDatas); kafkaMapping.send(downVO); }
再次啓動項目,教研Mapping數據是否成寫入kakfaide
接下來就是訂閱服務的實現了,請聽下回分解,具體的代碼細節在gitui