分佈式項目(五)iot-pgsql

書接上回,在Mapping server中,咱們已經把數據都整理好了,如今利用postgresql存儲歷史數據。git

iot-pgsql

構建iot-pgsql模塊,這裏咱們寫數據庫爲了性能考慮不在使用mybatis,換成spring jdbc批處理寫數據庫。spring

引入spring jdbc依賴sql

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
            <version>2.1.4.RELEASE</version>
        </dependency>

由於數據來此訂閱的kafka數據,因此還須要引入kafka依賴,這裏已經kakfa隔離成了一個獨立的模塊,因此加入kakfa模塊就好了。數據庫

<dependency>
            <groupId>cn.le</groupId>
            <artifactId>iot-kafka</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

kakfa監聽

[@Component](https://my.oschina.net/u/3907912)
public class PgListener {

    @Autowired
    private PgService pgService;
    private static List<DeviceDataPO> INSERT_LIST = new ArrayList<>(1100);

    @KafkaListener(topics = DOWN_TOPIC)
    public void pgListener(String msg){
        KafkaDownVO downVO = JSONObject.parseObject(msg,KafkaDownVO.class);

        downVO.getDataList().forEach(propertyData -> {
            DeviceDataPO po = new DeviceDataPO();
            po.setDeviceId(downVO.getDeviceId());
            po.setPropertyId(propertyData.getPropertyId());
            po.setData(propertyData.getData());
            po.setCollTime(po.getCollTime());
            po.setCreateAt(new Date());
            INSERT_LIST.add(po);
        });

        //批量寫入數據庫
        if (INSERT_LIST.size() > 1000){
            jdbcBachInsert(INSERT_LIST);
        }
    }

    public void jdbcBachInsert(List<DeviceDataPO> downVOList){
        if (CollectionUtils.isEmpty(downVOList)){
            return;
        }
        pgService.jdbcBachInsert(downVOList);
        INSERT_LIST = new ArrayList<>(1100);
    }
}

jdbc批量寫入

[@Service](https://my.oschina.net/service)
public class PgService {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Transactional(rollbackFor = Exception.class)
    public void jdbcBachInsert(List<DeviceDataPO> list){
        jdbcTemplate.batchUpdate(PG_INSERT_SQL, new BatchPreparedStatementSetter() {
            @Override
            public void setValues(PreparedStatement ps, int i) throws SQLException {
                DeviceDataPO po = list.get(i);
                ps.setLong(1,po.getDeviceId());
                ps.setLong(2,po.getPropertyId());
                ps.setString(3,po.getData());
                ps.setTimestamp(4,new Timestamp(po.getCollTime().getTime()));
                ps.setTimestamp(5,new Timestamp(po.getCreateAt().getTime()));
            }

            @Override
            public int getBatchSize() {
                return list.size();
            }
        });
    }
}

OK,完成,啓動項目,看一下數據庫是否有寫入的數據 mybatis

結束語

數據已經存儲到數據庫中了,下面就開始對設備數據的監控了。app

https://gitee.com/distant/iot-pt.gitide

相關文章
相關標籤/搜索