分佈式項目(六)iot-device-data 設備數據監控

書接上回,當設備發送數據後,咱們須要在管理頁面看到設備發送的數據是什麼,因此如今咱們就來完成設備數據監控模塊git

iot-device-data

建立iot-device-data模塊,由於此模塊也是屬於訂閱板塊的服務,因此它也是消費kakfa中的Mapping數據,引入對應的redis,kakfa模塊。web

邏輯圖

web socket

數據是實時與頁面交互,須要用到web socketredis

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
            <version>2.1.4.RELEASE</version>
        </dependency>

spring boot 對web socket的使用有兩種方式,ServerEndpointExporter和TextWebSocketHandler方式,而ServerEndpointExporter,由於webSocket bean不是由spring 容器管理,因此會有注入沒法使用的問題,雖然能夠解決,但有點不爽;而TextWebSocketHandler,則徹底由spring實現,因此不會有注入的問題,所以筆者採用TextWebSocketHandler的方式。spring

web socket handler

[@Component](https://my.oschina.net/u/3907912)
public class DeviceDateHandler extends TextWebSocketHandler {

    @Autowired
    private DeviceDateService deviceDateService;

    /**
     * 打開會話
     * [@param](https://my.oschina.net/u/2303379) session
     */

    [@Override](https://my.oschina.net/u/1162528)
    public void afterConnectionEstablished(WebSocketSession session) 
	throws Exception {
        deviceDateService.onOpen(session);
    }

    /**
     * 關閉會話
     * [@param](https://my.oschina.net/u/2303379) session
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, 
	CloseStatus status) throws Exception {
        deviceDateService.onClose(session);
    }

    /**
     * 異常處理
     * @param session
     * @param
     */
    @Override
    public void handleTransportError(WebSocketSession session, 
	Throwable exception) throws Exception {
        deviceDateService.onError(session,exception);
    }
}

web socket config

@Configuration
@EnableWebSocket //開啓web socket
public class WebSocketConfig implements WebSocketConfigurer {

    @Autowired
    private DeviceDateHandler deviceDateHandler;

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry 
	registry) {
        registry.addHandler(deviceDateHandler, "/ws/device").
		setAllowedOrigins("*"); //容許跨域
    }
}

DeviceDateService web socket創建登陸

web socket創建登陸時,須要把設備的IMEI和id號一塊兒發送過來,咱們在onOpen()方法中對數據進行校驗,只有經過校驗的連接,才能完成登陸。sql

@Service
public class DeviceDateService {

    @Autowired
    private BaseRedisUtil baseRedisUtil;

    private static final String IEMI = "imei";
    private static final String DID = "did";

    /**
     * 打開會話
     * @param session
     */
    public void onOpen(WebSocketSession session) {
        Map<String,String> map = getMap(session);
        RedisDeviceVO vo = baseRedisUtil.get(map.get(IEMI));
        if (vo == null){
            throw new RuntimeException("設備未註冊");
        }
        if (vo.getId().equals(map.get(DID))){
            throw new RuntimeException("設備號錯誤");
        }
        WebSocketUtil.put(map.get(DID),session);
    }

    /**8
     * 關閉會話
     * @param session
     */
    public void onClose(WebSocketSession session) throws IOException {
        Map<String,String> map = getMap(session);
        WebSocketUtil.remove(map.get(DID),session);
        session.close();
    }

    /**
     * 異常處理
     * @param session
     * @param error
     */
    public void onError(WebSocketSession session, Throwable error) 
	throws IOException {
        if (error instanceof RuntimeException){
            session.sendMessage(new TextMessage(error.getMessage()));
        }else {
            session.sendMessage(new TextMessage("系統異常"));
        }
        onClose(session);
    }

    private Map<String,String> getMap(WebSocketSession session){
        String query = session.getUri().getQuery();
        if (StringUtils.isEmpty(query)){
            throw new RuntimeException("參數錯誤");
        }
        String[] param = query.split("&");
        if (param.length != 2){
            throw new RuntimeException("參數錯誤");
        }
        Map<String,String> map = new HashMap<>();
        map.put(IEMI,param[0]);
        map.put(DID,param[1]);
        return map;
    }
}

web socket 連接緩存

對於已經登陸成功的連接,須要緩存session,而一個設備可能同時有好幾個web socket同時監控,因此這裏按照一對多的形式保存session。session是不一樣的線程同時在建立和銷燬,屬於併發操做,因此這裏使用併發集合進行處理。跨域

public class WebSocketUtil {

    private static final Map<String, CopyOnWriteArrayList<WebSocketSession>>
	SESSION_MAP = new HashMap<>();


    public static void put(String did,WebSocketSession session){
        CopyOnWriteArrayList<WebSocketSession> list = SESSION_MAP.get(did);
        if (list == null){
            list = new CopyOnWriteArrayList();
            SESSION_MAP.put(did,list);
        }
        list.add(session);
    }

    public static void remove(String did,WebSocketSession session){
        CopyOnWriteArrayList<WebSocketSession> list = SESSION_MAP.get(did);
        if (CollectionUtils.isEmpty(list)){
            return;
        }

        int index = -1;
        Iterator<WebSocketSession> it = list.iterator();
        while (it.hasNext()){
            index++;
            WebSocketSession se = it.next();
            if (se == session){
                list.remove(index);
                return;
            }
        }
        if (CollectionUtils.isEmpty(list)){
            SESSION_MAP.remove(did);
        }
    }

    public static boolean isEmpty(){
        return SESSION_MAP.size() > 0 ? false : true;
    }

    public static CopyOnWriteArrayList<WebSocketSession>
	findSocketConnect(String did){
        return SESSION_MAP.get(did);
    }
}

kafka 監聽

訂閱kakfa的下行數據(mapping 數據),判斷是否有設備連接,若是有就把數據發送給對應的session。緩存

@Component
public class DeviceListener {

    @KafkaListener(topics = DOWN_TOPIC)
    public void listener(String msg){
        System.out.println(msg);
        if (WebSocketUtil.isEmpty()){
            return;
        }
        KafkaDownVO vo = JSONObject.parseObject(msg,KafkaDownVO.class);

        List<WebSocketSession> socket = WebSocketUtil.findSocketConnect(
		String.valueOf(vo.getDeviceId()));
        if (CollectionUtils.isEmpty(socket)){
            return;
        }
        TextMessage textMessage = new TextMessage(msg);
        socket.forEach(session -> {
            try {
                session.sendMessage(textMessage);
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
    }
}

application.properties

#訪問端口
server.port=8082
#項目路徑
server.servlet.context-path=/device-data
#項目名稱
spring.application.name=device-data

spring.kafka.consumer.group-id=websocket-device-data

啓動項目,建立web socket連接,驗證數據是否推送到了頁面。websocket

筆者這裏使用的是web socket在線測試工具,http://www.blue-zero.com/WebSocket/session

結束語

iot-pt,咱們已經實現了整個數據的上行,從設備的注入,設備發送數據,Mapping設備數據,pgsql持久化數據,web socket推送實時數據,已經有一個完整的流程了,因此接下來就要使用spring cloud搭建分佈式了。併發

https://gitee.com/distant/iot-pt.git

相關文章
相關標籤/搜索