前端時間,公司開發了一款主動服務的機器人的程序,講產生的消息經過服務端主動推送到客戶端(H五、IOS、Android),支持用戶的個性化開關設置,用戶可自由選擇接受的消息類型;同時支持用戶主動提問;在此記錄下整個部署以及實現的大體思路;前端
同時感謝個人Leader給予的幫助。java
Upgrade
和Connection
響應頭信息;完整配置以下:node
location / {
proxy_pass http://nodes;
# enable WebSockets
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
複製代碼
Socket配置類nginx
public class WebSocketConfig {
private Logger log = LoggerFactory.getLogger(WebSocketConfig.class);
@Value("${wss.server.host}")
private String host;
@Value("${wss.server.port}")
private Integer port;
@Value("${redis.passwd}")
private String redisPasswd;
@Value("${redis.address}")
private String redisAddress;
@Bean
public PubSubStore pubSubStore() {
return socketIOServer().getConfiguration().getStoreFactory().pubSubStore();
}
@Bean
public SocketIOServer socketIOServer() {
Config redissonConfig = new Config();
// 高版本需求 redis:// 前綴
redissonConfig.useSingleServer().setPassword("xxx").setAddress("redis://xxx:xx").setDatabase();
RedissonClient redisson = Redisson.create(redissonConfig);
RedissonStoreFactory redisStoreFactory = new RedissonStoreFactory(redisson);
Configuration config = new Configuration();
config.setHostname(host);
config.setPort(port);
config.setOrigin(origin);
config.setHttpCompression(false);
config.setWebsocketCompression(false);
config.setStoreFactory(redisStoreFactory);
// 注意若是開放跨域設置,須要設置爲null而不是"*"
config.setOrigin(null);
// 協議升級超時時間(毫秒),默認10000。HTTP握手升級爲ws協議超時時間
config.setUpgradeTimeout(10000);
// Ping消息間隔(毫秒),默認25000。客戶端向服務器發送一條心跳消息間隔
config.setPingInterval(25000);
// Ping消息超時時間(毫秒),默認60000,這個時間間隔內沒有接收到心跳消息就會發送超時事件
config.setPingTimeout(60000);
/** 異常監聽事件,必須覆寫所有方法 */
config.setExceptionListener(new ExceptionListener(){
@Override
public void onConnectException(Exception e, SocketIOClient client) {
ResponseMessage error = ResponseMessage.error(-1, "鏈接異常!");
client.sendEvent("exception", JSON.toJSON(new Response<String>(error, "鏈接異常!")));
}
@Override
public void onDisconnectException(Exception e, SocketIOClient client) {
ResponseMessage error = ResponseMessage.error(-1, "斷開異常!");
client.sendEvent("exception",JSON.toJSON(new Response<String>(error, "鏈接異常!")));
}
@Override
public void onEventException(Exception e, List<Object> data, SocketIOClient client) {
ResponseMessage error = ResponseMessage.error(-1, "服務器異常!");
client.sendEvent("exception",JSON.toJSON(new Response<String>(error, "鏈接異常!")));
}
@Override
public void onPingException(Exception e, SocketIOClient client) {
ResponseMessage error = ResponseMessage.error(-1, "PING 超時異常!");
client.sendEvent("exception",JSON.toJSON(new Response<String>(error, "PING 超時異常!")));
}
@Override
public boolean exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
return false;
}
});
// 相似於過濾器設置,此處不做處理
config.setAuthorizationListener(data -> {
// // 可使用以下代碼獲取用戶密碼信息
// String appId = data.getSingleUrlParam("appId");
// String source = data.getSingleUrlParam("source");
// log.info("token {}, client {}", appId, source);
return true;
});
return new SocketIOServer(config);
}
@Bean
public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
return new SpringAnnotationScanner(socketServer);
}
}
複製代碼
Socket啓動類redis
@Log4j2
@Component
@Order(value=1)
public class ServerRunner implements CommandLineRunner {
private final SocketIOServer server;
@Autowired
public ServerRunner(SocketIOServer server) {
this.server = server;
}
@Override
public void run(String... args) throws Exception {
server.start();
log.info("socket.io啓動成功!");
}
}
複製代碼
主動推送服務監聽做爲KafKa消費者,數據生產者講加工好的數據推到KafKa中,消費者監聽到消息廣播給客戶端;推送時在數據庫查詢用戶對應的個性化設置,僅推送客戶端選擇接受的消息;spring
因爲主動推送服務部署了多個節點,而多個節點分配在同一個KafKa消費組中,這樣會引發多個節點僅消費到所有消息的一部分的問題;這裏使用Redis
的發佈/訂閱
的機制解決了這個問題:當各個節點消費到消息以後,將消息發佈以後,其它節點訂閱該Topic
將消息發送給各自節點上鍊接的客戶端,在這裏各個節點便是發佈者,又是訂閱者;數據庫
從數據的產生,到消費跨域
Redisson爲了方便Redis中的發佈/訂閱
機制的使用,將其封裝成Topic,並提供了代碼級別的發佈/訂閱
操做,如此一來多個JVM進程鏈接到Redis(單機/集羣)後,即可以實如今一個JVM進程中發佈
的Topic
,在其餘已經訂閱
了該主題的JVM進程中就能及時收到消息。服務器
在Netty-SocketIO整合了Redisson
以後,內部也使用了發佈/訂閱
機制架構
public void sendMessageToAllClient(String eventType, String message, String desc) {
Collection<SocketIOClient> clients = server.getBroadcastOperations().getClients();
for(final SocketIOClient client : clients){
// Do Somthing
}
Packet packet = new Packet(PacketType.MESSAGE);
packet.setData(new BroadcastMessage(message, eventType, desc));
publishMessage(packet);
}
private void publishMessage(Packet packet) {
DispatchMessage dispatchMessage = new DispatchMessage("", packet, "");
pubSubStore.publish(PubSubType.DISPATCH, dispatchMessage);
BroadcastMessage broadcastMessage = dispatchMessage.getPacket().getData();
}
複製代碼
@PostConstruct
public void init() {
pubSubStore.subscribe(PubSubType.DISPATCH, dispatchMessage -> {
BroadcastMessage messageData = dispatchMessage.getPacket().getData();
Collection<SocketIOClient> clients = server.getBroadcastOperations().getClients();
for(final SocketIOClient client : clients){
// DO Somthing
}, DispatchMessage.class);
}
複製代碼