書接上回,在Mapping server中,咱們已經把數據都整理好了,如今利用postgresql存儲歷史數據。git
構建iot-pgsql模塊,這裏咱們寫數據庫爲了性能考慮不在使用mybatis,換成spring jdbc批處理寫數據庫。spring
引入spring jdbc依賴sql
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> <version>2.1.4.RELEASE</version> </dependency>
由於數據來此訂閱的kafka數據,因此還須要引入kafka依賴,這裏已經kakfa隔離成了一個獨立的模塊,因此加入kakfa模塊就好了。數據庫
<dependency> <groupId>cn.le</groupId> <artifactId>iot-kafka</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>
[@Component](https://my.oschina.net/u/3907912) public class PgListener { @Autowired private PgService pgService; private static List<DeviceDataPO> INSERT_LIST = new ArrayList<>(1100); @KafkaListener(topics = DOWN_TOPIC) public void pgListener(String msg){ KafkaDownVO downVO = JSONObject.parseObject(msg,KafkaDownVO.class); downVO.getDataList().forEach(propertyData -> { DeviceDataPO po = new DeviceDataPO(); po.setDeviceId(downVO.getDeviceId()); po.setPropertyId(propertyData.getPropertyId()); po.setData(propertyData.getData()); po.setCollTime(po.getCollTime()); po.setCreateAt(new Date()); INSERT_LIST.add(po); }); //批量寫入數據庫 if (INSERT_LIST.size() > 1000){ jdbcBachInsert(INSERT_LIST); } } public void jdbcBachInsert(List<DeviceDataPO> downVOList){ if (CollectionUtils.isEmpty(downVOList)){ return; } pgService.jdbcBachInsert(downVOList); INSERT_LIST = new ArrayList<>(1100); } }
[@Service](https://my.oschina.net/service) public class PgService { @Autowired private JdbcTemplate jdbcTemplate; @Transactional(rollbackFor = Exception.class) public void jdbcBachInsert(List<DeviceDataPO> list){ jdbcTemplate.batchUpdate(PG_INSERT_SQL, new BatchPreparedStatementSetter() { @Override public void setValues(PreparedStatement ps, int i) throws SQLException { DeviceDataPO po = list.get(i); ps.setLong(1,po.getDeviceId()); ps.setLong(2,po.getPropertyId()); ps.setString(3,po.getData()); ps.setTimestamp(4,new Timestamp(po.getCollTime().getTime())); ps.setTimestamp(5,new Timestamp(po.getCreateAt().getTime())); } @Override public int getBatchSize() { return list.size(); } }); } }
OK,完成,啓動項目,看一下數據庫是否有寫入的數據 mybatis
數據已經存儲到數據庫中了,下面就開始對設備數據的監控了。app