分佈式項目(四)Mapping Server 數據映射

上回說道CoAp client和server的實現,數據也按照既定格式發送到了kafka中,接下來就是Mapping server的實現,物理設備數據映射到抽象設備上,並賦予數據業務含義。git

iot-mapping

構建iot-mapping模塊,引入kafka公共模塊web

SourceListener

SourceListener監聽Coap server 發送的原始數據,並從redis中取出web manage緩存的產品物模型和設備數據,因爲redis公用的比較多,因此這也構建一個iot-redis模塊,用與redis操做。redis

邏輯說明

  • 獲取kafka中的數據轉成KafkaSourceVO
  • 根據imei獲取redis中的RedisDeviceVO
  • 根據productId獲取RedisProductVO
  • 根據RedisProductVO中的format,肯定是byte處理仍是json處理
@Autowired
    private BaseRedisUtil redisUtil;
    @KafkaListener(topics = SOURCE_TOPIC)
    public void iotListener(String msg){
        System.out.println("-----------"+msg);
        KafkaSourceVO sourceVO = JSONObject.parseObject(msg,
		KafkaSourceVO.class);
        //設備信息
        RedisDeviceVO deviceVO = redisUtil.get(sourceVO.getImei());
        //產品信息
        RedisProductVO productVO = redisUtil.get(deviceVO.getProductId());
        if (EDataFormat.BYTE.getFormat().equals(productVO.getFormat())){
            analysisByte(sourceVO,productVO,deviceVO);
        }else if (EDataFormat.JSON.getFormat().equals(
		productVO.getFormat())){
            analysisJson(sourceVO,productVO,deviceVO);
        }
    }

byte 解析

  • 獲取KafkaSourceVO中的原始數據data,並轉成char[]
  • 獲取產品物模型中的屬性模型RedisPropertyVO
  • 獲取屬性模型,按照屬性模型中定義的ofset,在原始數據char[]中獲取對應的值
  • 把解析出來的數據封裝爲KafkaDownVO
public void analysisByte(KafkaSourceVO sourceVO,
  RedisProductVO productVO,RedisDeviceVO deviceVO){
        char[] chars = sourceVO.getData().toCharArray();
        List<RedisPropertyVO> propertys = productVO.getPropertys();

        KafkaDownVO downVO = new KafkaDownVO();
        downVO.setDeviceId(deviceVO.getId());
        downVO.setCollTime(sourceVO.getCollTime());

        List<KafkaDownVO.PropertyData> propertyDatas = 
		new ArrayList<>(propertys.size());

        propertys.forEach(property->{
            String[] str = property.getOfset().split("-");
            int begin = Integer.valueOf(str[0]);
            int end = Integer.valueOf(str[1]);

            KafkaDownVO.PropertyData data = new KafkaDownVO.PropertyData();
            data.setPropertyId(property.getId());
            StringBuilder sb = new StringBuilder();
            for (int i = begin;i <= end; i++){
                sb.append(chars[i]);
            }
            data.setData(sb.toString());
            propertyDatas.add(data);
        });
        downVO.setDataList(propertyDatas);
        System.out.println("byte---"+downVO);
    }

json 解析

  • json定義中,物理設備發送的key與屬性模型中的identifier一一對應,而在封裝RedisPropertyVO的時候,爲了與byte保持統一把identifier賦值給了ofset,因此這裏獲取屬性模型,並轉成Map<ofset,id> 格式。
  • KafkaSourceVO中的原始數據也序列化成map<key,value>
  • 便利屬性模型對應的map(propertyMap),並從原始數據map(dataMap)取出對應的數據
  • 爲了數據的統一格式,這裏也把數據封裝成KafkaDownVO
public void analysisJson(KafkaSourceVO sourceVO,
RedisProductVO productVO,RedisDeviceVO deviceVO){
        Map<String,Long> propertyMap = productVO.getPropertys().
		stream().collect(Collectors.toMap(RedisPropertyVO ::
		getOfset,RedisPropertyVO::getId));
        Map<String,String> dataMap = JSONObject.parseObject(
		sourceVO.getData(), HashMap.class);

        KafkaDownVO downVO = new KafkaDownVO();
        downVO.setDeviceId(deviceVO.getId());
        downVO.setCollTime(sourceVO.getCollTime());

        List<KafkaDownVO.PropertyData> propertyDatas = 
		new ArrayList<>(dataMap.size());

        dataMap.forEach((key,val)->{
            KafkaDownVO.PropertyData data = new KafkaDownVO.PropertyData();
            data.setPropertyId(propertyMap.get(key));
            data.setData(val);
            propertyDatas.add(data);
        });
        downVO.setDataList(propertyDatas);
        System.out.println("json---"+downVO);
    }

啓動項目,檢驗一下數據是否封裝正確json

按iot-pt架構設計,如今須要把映射好的數據,再次寫入kakfa中,供訂閱費服務使用緩存

kafka 寫入

在iot-kafka模塊中添加對Mapping 數據的寫入架構

[@Component](https://my.oschina.net/u/3907912)
public class KafkaMapping {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void send(KafkaDownVO downVO){
        String json = JSONObject.toJSONString(downVO);
        kafkaTemplate.send(DOWN_TOPIC,json);
    }

}

修改analysisByte()和analysisJson()app

public void analysisByte(KafkaSourceVO sourceVO,
  RedisProductVO productVO,RedisDeviceVO deviceVO){
        char[] chars = sourceVO.getData().toCharArray();
        List<RedisPropertyVO> propertys = productVO.getPropertys();

        KafkaDownVO downVO = new KafkaDownVO();
        downVO.setDeviceId(deviceVO.getId());
        downVO.setCollTime(sourceVO.getCollTime());

        List<KafkaDownVO.PropertyData> propertyDatas = 
		new ArrayList<>(propertys.size());

        propertys.forEach(property->{
            String[] str = property.getOfset().split("-");
            int begin = Integer.valueOf(str[0]);
            int end = Integer.valueOf(str[1]);

            KafkaDownVO.PropertyData data = new KafkaDownVO.PropertyData();
            data.setPropertyId(property.getId());
            StringBuilder sb = new StringBuilder();
            for (int i = begin;i <= end; i++){
                sb.append(chars[i]);
            }
            data.setData(sb.toString());
            propertyDatas.add(data);
        });
        downVO.setDataList(propertyDatas);
       kafkaMapping.send(downVO);
    }
public void analysisJson(KafkaSourceVO sourceVO,
RedisProductVO productVO,RedisDeviceVO deviceVO){
        Map<String,Long> propertyMap = productVO.getPropertys().
		stream().collect(Collectors.toMap(RedisPropertyVO ::
		getOfset,RedisPropertyVO::getId));
        Map<String,String> dataMap = JSONObject.parseObject(
		sourceVO.getData(), HashMap.class);

        KafkaDownVO downVO = new KafkaDownVO();
        downVO.setDeviceId(deviceVO.getId());
        downVO.setCollTime(sourceVO.getCollTime());

        List<KafkaDownVO.PropertyData> propertyDatas = 
		new ArrayList<>(dataMap.size());

        dataMap.forEach((key,val)->{
            KafkaDownVO.PropertyData data = new KafkaDownVO.PropertyData();
            data.setPropertyId(propertyMap.get(key));
            data.setData(val);
            propertyDatas.add(data);
        });
        downVO.setDataList(propertyDatas);
        kafkaMapping.send(downVO);
    }

再次啓動項目,教研Mapping數據是否成寫入kakfaide

結束語

接下來就是訂閱服務的實現了,請聽下回分解,具體的代碼細節在gitui

https://gitee.com/distant/iot-pt.git.net

相關文章
相關標籤/搜索