基於Netty-SocketIO的主動推送服務

背景

前端時間,公司開發了一款主動服務的機器人的程序,講產生的消息經過服務端主動推送到客戶端(H五、IOS、Android),支持用戶的個性化開關設置,用戶可自由選擇接受的消息類型;同時支持用戶主動提問;在此記錄下整個部署以及實現的大體思路;前端

同時感謝個人Leader給予的幫助。java

部署

Nginx配置

  • 爲了保持長鏈接有效,配置HTTP版本1.1;
  • 配置UpgradeConnection響應頭信息;

完整配置以下:node

location / {
    proxy_pass http://nodes;

    # enable WebSockets
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "upgrade";
}
複製代碼

Socket配置

Socket配置類nginx

public class WebSocketConfig {

    private Logger log = LoggerFactory.getLogger(WebSocketConfig.class);

    @Value("${wss.server.host}")
    private String host;

    @Value("${wss.server.port}")
    private Integer port;

    @Value("${redis.passwd}")
    private String redisPasswd;

    @Value("${redis.address}")
    private String redisAddress;

    @Bean
    public PubSubStore pubSubStore() {
        return socketIOServer().getConfiguration().getStoreFactory().pubSubStore();
    }

    @Bean
    public SocketIOServer socketIOServer() {


        Config redissonConfig = new Config();
      	// 高版本需求 redis:// 前綴
      redissonConfig.useSingleServer().setPassword("xxx").setAddress("redis://xxx:xx").setDatabase();

        RedissonClient redisson = Redisson.create(redissonConfig);
        RedissonStoreFactory redisStoreFactory = new RedissonStoreFactory(redisson);


        Configuration config = new Configuration();
        config.setHostname(host);
        config.setPort(port);
        config.setOrigin(origin);
        config.setHttpCompression(false);
        config.setWebsocketCompression(false);

        config.setStoreFactory(redisStoreFactory);

        // 注意若是開放跨域設置,須要設置爲null而不是"*"
        config.setOrigin(null);
        // 協議升級超時時間(毫秒),默認10000。HTTP握手升級爲ws協議超時時間
        config.setUpgradeTimeout(10000);
        // Ping消息間隔(毫秒),默認25000。客戶端向服務器發送一條心跳消息間隔
        config.setPingInterval(25000);
        // Ping消息超時時間(毫秒),默認60000,這個時間間隔內沒有接收到心跳消息就會發送超時事件
        config.setPingTimeout(60000);

        /** 異常監聽事件,必須覆寫所有方法 */
        config.setExceptionListener(new ExceptionListener(){
            @Override
            public void onConnectException(Exception e, SocketIOClient client) {
                ResponseMessage error = ResponseMessage.error(-1, "鏈接異常!");
                client.sendEvent("exception", JSON.toJSON(new Response<String>(error, "鏈接異常!")));
            }
            @Override
            public void onDisconnectException(Exception e, SocketIOClient client) {
                ResponseMessage error = ResponseMessage.error(-1, "斷開異常!");
                client.sendEvent("exception",JSON.toJSON(new Response<String>(error, "鏈接異常!")));
            }
            @Override
            public void onEventException(Exception e, List<Object> data, SocketIOClient client) {
                ResponseMessage error = ResponseMessage.error(-1, "服務器異常!");
                client.sendEvent("exception",JSON.toJSON(new Response<String>(error, "鏈接異常!")));
            }
            @Override
            public void onPingException(Exception e, SocketIOClient client) {
                ResponseMessage error = ResponseMessage.error(-1, "PING 超時異常!");
                client.sendEvent("exception",JSON.toJSON(new Response<String>(error, "PING 超時異常!")));
            }
            @Override
            public boolean exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
                return false;
            }
        });
      // 相似於過濾器設置,此處不做處理
       config.setAuthorizationListener(data -> {
// // 可使用以下代碼獲取用戶密碼信息
// String appId = data.getSingleUrlParam("appId");
// String source = data.getSingleUrlParam("source");
// log.info("token {}, client {}", appId, source);
            return true;
        });

        return new SocketIOServer(config);
    }

    @Bean
    public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
        return new SpringAnnotationScanner(socketServer);
    }
}
複製代碼

Socket啓動類redis

@Log4j2
@Component
@Order(value=1)
public class ServerRunner implements CommandLineRunner {

    private final SocketIOServer server;


    @Autowired
    public ServerRunner(SocketIOServer server) {
        this.server = server;
    }

    @Override
    public void run(String... args) throws Exception {
        server.start();
        log.info("socket.io啓動成功!");
    }
}
複製代碼

最終架構

實現過程

主動推送服務監聽做爲KafKa消費者,數據生產者講加工好的數據推到KafKa中,消費者監聽到消息廣播給客戶端;推送時在數據庫查詢用戶對應的個性化設置,僅推送客戶端選擇接受的消息;spring

因爲主動推送服務部署了多個節點,而多個節點分配在同一個KafKa消費組中,這樣會引發多個節點僅消費到所有消息的一部分的問題;這裏使用Redis發佈/訂閱的機制解決了這個問題:當各個節點消費到消息以後,將消息發佈以後,其它節點訂閱該Topic將消息發送給各自節點上鍊接的客戶端,在這裏各個節點便是發佈者,又是訂閱者;數據庫

從數據的產生,到消費跨域

使用Redisson的Topic實現分佈式發佈/訂閱

Redisson爲了方便Redis中的發佈/訂閱機制的使用,將其封裝成Topic,並提供了代碼級別的發佈/訂閱操做,如此一來多個JVM進程鏈接到Redis(單機/集羣)後,即可以實如今一個JVM進程中發佈Topic,在其餘已經訂閱了該主題的JVM進程中就能及時收到消息。服務器

在Netty-SocketIO整合了Redisson以後,內部也使用了發佈/訂閱機制架構

消息的發佈
public void sendMessageToAllClient(String eventType, String message, String desc) {
    Collection<SocketIOClient> clients = server.getBroadcastOperations().getClients();
    for(final SocketIOClient client : clients){
      // Do Somthing
    }

    Packet packet = new Packet(PacketType.MESSAGE);
    packet.setData(new BroadcastMessage(message, eventType, desc));
    publishMessage(packet);
}

private void publishMessage(Packet packet) {
    DispatchMessage dispatchMessage = new DispatchMessage("", packet, "");
    pubSubStore.publish(PubSubType.DISPATCH, dispatchMessage);
    BroadcastMessage broadcastMessage = dispatchMessage.getPacket().getData();

}
複製代碼
消息的訂閱
@PostConstruct
public void init() {
  pubSubStore.subscribe(PubSubType.DISPATCH, dispatchMessage -> {
      BroadcastMessage messageData = dispatchMessage.getPacket().getData();
    
      Collection<SocketIOClient> clients = server.getBroadcastOperations().getClients();

      for(final SocketIOClient client : clients){
        // DO Somthing
      }, DispatchMessage.class);
}
複製代碼
相關文章
相關標籤/搜索