書接上回,當設備發送數據後,咱們須要在管理頁面看到設備發送的數據是什麼,因此如今咱們就來完成設備數據監控模塊git
建立iot-device-data模塊,由於此模塊也是屬於訂閱板塊的服務,因此它也是消費kakfa中的Mapping數據,引入對應的redis,kakfa模塊。web
數據是實時與頁面交互,須要用到web socketredis
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> <version>2.1.4.RELEASE</version> </dependency>
spring boot 對web socket的使用有兩種方式,ServerEndpointExporter和TextWebSocketHandler方式,而ServerEndpointExporter,由於webSocket bean不是由spring 容器管理,因此會有注入沒法使用的問題,雖然能夠解決,但有點不爽;而TextWebSocketHandler,則徹底由spring實現,因此不會有注入的問題,所以筆者採用TextWebSocketHandler的方式。spring
[@Component](https://my.oschina.net/u/3907912) public class DeviceDateHandler extends TextWebSocketHandler { @Autowired private DeviceDateService deviceDateService; /** * 打開會話 * [@param](https://my.oschina.net/u/2303379) session */ [@Override](https://my.oschina.net/u/1162528) public void afterConnectionEstablished(WebSocketSession session) throws Exception { deviceDateService.onOpen(session); } /** * 關閉會話 * [@param](https://my.oschina.net/u/2303379) session */ @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { deviceDateService.onClose(session); } /** * 異常處理 * @param session * @param */ @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { deviceDateService.onError(session,exception); } }
@Configuration @EnableWebSocket //開啓web socket public class WebSocketConfig implements WebSocketConfigurer { @Autowired private DeviceDateHandler deviceDateHandler; @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(deviceDateHandler, "/ws/device"). setAllowedOrigins("*"); //容許跨域 } }
web socket創建登陸時,須要把設備的IMEI和id號一塊兒發送過來,咱們在onOpen()方法中對數據進行校驗,只有經過校驗的連接,才能完成登陸。sql
@Service public class DeviceDateService { @Autowired private BaseRedisUtil baseRedisUtil; private static final String IEMI = "imei"; private static final String DID = "did"; /** * 打開會話 * @param session */ public void onOpen(WebSocketSession session) { Map<String,String> map = getMap(session); RedisDeviceVO vo = baseRedisUtil.get(map.get(IEMI)); if (vo == null){ throw new RuntimeException("設備未註冊"); } if (vo.getId().equals(map.get(DID))){ throw new RuntimeException("設備號錯誤"); } WebSocketUtil.put(map.get(DID),session); } /**8 * 關閉會話 * @param session */ public void onClose(WebSocketSession session) throws IOException { Map<String,String> map = getMap(session); WebSocketUtil.remove(map.get(DID),session); session.close(); } /** * 異常處理 * @param session * @param error */ public void onError(WebSocketSession session, Throwable error) throws IOException { if (error instanceof RuntimeException){ session.sendMessage(new TextMessage(error.getMessage())); }else { session.sendMessage(new TextMessage("系統異常")); } onClose(session); } private Map<String,String> getMap(WebSocketSession session){ String query = session.getUri().getQuery(); if (StringUtils.isEmpty(query)){ throw new RuntimeException("參數錯誤"); } String[] param = query.split("&"); if (param.length != 2){ throw new RuntimeException("參數錯誤"); } Map<String,String> map = new HashMap<>(); map.put(IEMI,param[0]); map.put(DID,param[1]); return map; } }
對於已經登陸成功的連接,須要緩存session,而一個設備可能同時有好幾個web socket同時監控,因此這裏按照一對多的形式保存session。session是不一樣的線程同時在建立和銷燬,屬於併發操做,因此這裏使用併發集合進行處理。跨域
public class WebSocketUtil { private static final Map<String, CopyOnWriteArrayList<WebSocketSession>> SESSION_MAP = new HashMap<>(); public static void put(String did,WebSocketSession session){ CopyOnWriteArrayList<WebSocketSession> list = SESSION_MAP.get(did); if (list == null){ list = new CopyOnWriteArrayList(); SESSION_MAP.put(did,list); } list.add(session); } public static void remove(String did,WebSocketSession session){ CopyOnWriteArrayList<WebSocketSession> list = SESSION_MAP.get(did); if (CollectionUtils.isEmpty(list)){ return; } int index = -1; Iterator<WebSocketSession> it = list.iterator(); while (it.hasNext()){ index++; WebSocketSession se = it.next(); if (se == session){ list.remove(index); return; } } if (CollectionUtils.isEmpty(list)){ SESSION_MAP.remove(did); } } public static boolean isEmpty(){ return SESSION_MAP.size() > 0 ? false : true; } public static CopyOnWriteArrayList<WebSocketSession> findSocketConnect(String did){ return SESSION_MAP.get(did); } }
訂閱kakfa的下行數據(mapping 數據),判斷是否有設備連接,若是有就把數據發送給對應的session。緩存
@Component public class DeviceListener { @KafkaListener(topics = DOWN_TOPIC) public void listener(String msg){ System.out.println(msg); if (WebSocketUtil.isEmpty()){ return; } KafkaDownVO vo = JSONObject.parseObject(msg,KafkaDownVO.class); List<WebSocketSession> socket = WebSocketUtil.findSocketConnect( String.valueOf(vo.getDeviceId())); if (CollectionUtils.isEmpty(socket)){ return; } TextMessage textMessage = new TextMessage(msg); socket.forEach(session -> { try { session.sendMessage(textMessage); } catch (IOException e) { e.printStackTrace(); } }); } }
#訪問端口 server.port=8082 #項目路徑 server.servlet.context-path=/device-data #項目名稱 spring.application.name=device-data spring.kafka.consumer.group-id=websocket-device-data
啓動項目,建立web socket連接,驗證數據是否推送到了頁面。websocket
筆者這裏使用的是web socket在線測試工具,http://www.blue-zero.com/WebSocket/session
iot-pt,咱們已經實現了整個數據的上行,從設備的注入,設備發送數據,Mapping設備數據,pgsql持久化數據,web socket推送實時數據,已經有一個完整的流程了,因此接下來就要使用spring cloud搭建分佈式了。併發