一、在實際項目中,因爲數據量的增大及併發數的增多,咱們不可能只用一臺Websocket服務,這個時候就須要用到Webscoket的集羣。可是Websocket集羣會遇到一些問題。首先咱們確定會想到直接將Websocket的Session放到Redis等緩存服務器中,而後用的時候直接在Redis中獲取。可是Webscoket的Session比較特殊,它不能被序列化,由於 WebSocket的session是有狀態的,還有就是 WebSocket的session是有時效性的,只要鏈接一斷開,該Session就會失效。html
二、解決Websocket集羣的三種方法java
2.一、經過相應的算法,將有關聯的用戶(即有可能發生聊天的對象)所有指定到一臺Webscoket服務。這樣就不會存在聊天對象收不到消息的狀況。可是這種方法有侷限性,就是用戶只能和有關聯的用戶聊天,不能和其餘未創建關聯的用戶聊天。web
2.二、使用Redis的消息訂閱功能來實現WebSocket集羣。大體思路以下圖。算法
2.三、使用Kafka等消息中間件來實現Webscoket集羣。這也是目前我選用的方式。其實該方法和Redis的消息訂閱大體思路差很少。可是Redis咱們只把他做爲緩存使用,不想Redis涉及太多的業務處理,所以就選用了Kafka。spring
2.3.一、Kafka安裝。(百度上有)apache
2.3.二、Kafka實現集羣的大體思路,以下圖(若是一個groupId下有多個消費者,則只會有一個消費者能獲取到消息,因此爲了保證Websocket集羣都能收到消息,則須要不一樣的groupId。我使用的是服務器的IP來做爲groupId)json
2.3.三、在項目的pom文件中添加Kafka依賴(注:Kafka依賴的版本必須和服務器上安裝的版本一致)bootstrap
<!-- kafka依賴 --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.0.RELEASE</version> </dependency>
2.3.四、創建Kafka的生產者Bean緩存
package com.yxl.configuration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; import java.util.Map; /** * @Author: yxl * @Description: Kafka生產者(消息發送者) * @DATE: Created in 2018/11/14 */ @Configuration @EnableKafka public class KafkaProducerConfig { public Map<String, Object> producerConfigs() { Map<String, Object> properties = new HashMap<>(); properties.put("bootstrap.servers", "kafka集羣IP1:9092,kafka集羣IP2:9092"); properties.put("acks", "all");//ack是判別請求是否爲完整的條件(就是是判斷是否是成功發送了)。咱們指定了「all」將會阻塞消息,這種設置性能最低,可是是最可靠的。 properties.put("retries", 0);//若是請求失敗,生產者會自動重試,咱們指定是0次,若是啓用重試,則會有重複消息的可能性。 properties.put("batch.size", 16384);//producer(生產者)緩存每一個分區未發送消息。緩存的大小是經過 batch.size 配置指定的 properties.put("linger.ms", 1); properties.put("buffer.memory", 33554432); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return properties; } public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<String, String>(producerFactory()); } }
2.3.四、創建Kafka的消費者Bean以及消費者監聽安全
package com.yxl.configuration; import com.yxl.myListener.MyKafkaListener; import org.apache.commons.lang3.StringUtils; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.HashMap; import java.util.Map; import java.util.UUID; /** * @Author: yxl * @Description: Kafka消費者 * @DATE: Created in 2018/11/14 */ @Configuration @EnableKafka public class KafkaConsumerConfig { @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties().setPollTimeout(1500); return factory; } public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map<String, Object> consumerConfigs() { Map<String, Object> properties = new HashMap<>(); properties.put("bootstrap.servers", "kafka集羣IP1:9092,kafka集羣IP2:9092"); properties.put("group.id", getIPAddress()); //獲取服務器Ip做爲groupId properties.put("enable.auto.commit", "true"); properties.put("auto.commit.interval.ms", "1000"); properties.put("auto.offset.reset", "earliest"); properties.put("session.timeout.ms", "30000"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return properties; } public String getIPAddress() { try { InetAddress address = InetAddress.getLocalHost(); if (address != null && StringUtils.isNotBlank(address.getHostAddress())) { return address.getHostAddress(); } }catch (UnknownHostException e) { return UUID.randomUUID().toString().replace("-",""); } return UUID.randomUUID().toString().replace("-",""); } /** * 自定義監聽 */ @Bean public MyKafkaListener listener() { return new MyKafkaListener(); } }
2.3.四、消費者監聽
package com.yxl.myListener; import com.yxl.websocket.ChatWebsocket; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.log4j.Logger; import org.springframework.kafka.annotation.KafkaListener; /** * @Author: yxl * @Description: * @DATE: Created in 2018/11/14 */ public class MyKafkaListener { Logger logger = Logger.getLogger(MyKafkaListener.class); /** * 發送聊天消息時的監聽 * @param record */ @KafkaListener(topics = {"chatMessage"}) public void listen(ConsumerRecord<?, ?> record) { logger.info("chatMessage發送聊天消息監聽:"+record.value().toString()); ChatWebsocket chatWebsocket = new ChatWebsocket(); chatWebsocket.kafkaReceiveMsg(record.value().toString()); } /** * 關閉鏈接時的監聽 * @param record */ @KafkaListener(topics = {"closeWebsocket"}) private void closeListener(ConsumerRecord<?, ?> record) { logger.info("closeWebsocket關閉websocket鏈接監聽:"+record.value().toString()); ChatWebsocket chatWebsocket = new ChatWebsocket(); chatWebsocket.kafkaCloseWebsocket(record.value().toString()); } }
2.3.六、Websocket集羣java代碼
package com.kk.server.chat.websocket; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; import org.springframework.context.ApplicationContext; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * Websocket集羣 * Created by yxl on 2018-11-17. */ @ServerEndpoint("/chat/{userId}") @Component public class ChatWebsocket { private Logger logger = Logger.getLogger(ChatWebsocket.class); private static ApplicationContext applicationContext; private KafkaTemplate kafkaTemplate; //靜態變量,用來記錄當前在線鏈接數。應該把它設計成線程安全的。 private static int onlineCount = 0; //concurrent包的線程安全Set,用來存放每一個客戶端對應的MyWebSocket對象。若要實現服務端與單一客戶端通訊的話,可使用Map來存放,其中Key能夠爲用戶標識 private static Map<String, Session> drWebSocketSet = new ConcurrentHashMap<>(); //醫生web /** * 鏈接創建成功調用的方法 * * @param userId 用戶標識 */ @OnOpen public void onOpen(@PathParam("userId") String userId, Session session) { if (kafkaTemplate == null) { kafkaTemplate = applicationContext.getBean(KafkaTemplate.class); //獲取kafka的Bean實例 } drWebSocketSet.put(userId, session); } /** * s * 收到客戶端消息後調用的方法 * * @param message 客戶端發送過來的消息 * @param session 可選的參數 */ @OnMessage public void onMessage(String message, Session session) throws IOException { if ("ping".equals(message)) { session.getBasicRemote().sendText("pong"); //心跳 } else { sendMessage(message, session); //調用Kafka進行消息分發 } } /** * 發送消息 * * @param message * @param session * @throws IOException */ public void sendMessage(String message, Session session) throws IOException { if (StringUtils.isNotBlank(message)) { JSONObject jsonObject = JSONObject.parseObject(message); String sender_id = jsonObject.getString("sender_id"); //發送者ID String receiver_id = jsonObject.getString("receiver_id"); //接受者ID //TODO 這裏能夠進行優化。能夠首先根據接收方的userId,即receiver_id判斷接收方是否在當前服務器,若在,直接獲取session發送便可就不須要走Kafka了,節約資源 kafkaTemplate.send("chatMessage", s); } } /** * 鏈接關閉調用的方法 */ @OnClose public void onClose(Session session) { Map<String, String> pathParameters = session.getPathParameters(); String userId = pathParameters.get("userId"); //從session中獲取userId Map<String, String> map = new HashMap<>(); map.put("username", userId); kafkaTemplate.send("closeWebsocket", JSON.toJSONString(map)); } } /** * 關閉鏈接 * * @param map 當前登陸客戶端的map */ private void close(Map<String, Session> map, String username) { if (StringUtils.isNotBlank(username)) { logger.info("關閉websocket連接,關閉客戶端username:" + username); if (map.get(username) != null) { map.remove(username); } } } /** * kafka發送消息監聽事件,有消息分發 * * @param message * @author yxl */ public void kafkaReceiveMsg(String message) { JSONObject jsonObject = JSONObject.parseObject(message); String receiver_id = jsonObject.getString("receiver_id"); //接受者ID if (drWebSocketSet.get(receiver_id) != null) { drWebSocketSet.get(receiver_id).getBasicRemote.sendText(message); //進行消息發送 } } /** * kafka監聽關閉websocket鏈接 * * @param closeMessage */ public void kafkaCloseWebsocket(String closeMessage) { JSONObject jsonObject = JSONObject.parseObject(closeMessage); String userId = jsonObject.getString("userId"); drWebSocketSet.remove(userId); } /** * 發生錯誤時調用 * * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { logger.info("webscoket發生錯誤!關閉websocket連接"); //onClose(session); error.printStackTrace(); logger.info("webscoket發生錯誤!" + error.getMessage()); } }
websocket中不能直接注入相應的Bean實例,這個時候能夠看個人另外一篇博客http://www.javashuo.com/article/p-rhjkeoys-eb.html