如下是個人場景描述前端
- 資源:4臺服務器。其中只有一臺服務器具有ssl認證域名,一臺redis+mysql服務器,兩臺應用服務器(集羣)
- 應用發佈限制條件:因爲場景須要,應用場所須要ssl認證的域名才能發佈。所以ssl認證的域名服務器用來當api網關,負責https請求與wss(安全認證的ws)鏈接。俗稱https卸載,用戶請求https域名服務器(eg:https://oiscircle.com/xxx),但真實訪問到的是http+ip地址的形式。只要網關配置高,能handle多個應用
- 需求:用戶登陸應用,須要與服務器創建wss鏈接,不一樣角色之間能夠單發消息,也能夠羣發消息
- 集羣中的應用服務類型:每一個集羣實例都負責http無狀態請求服務與ws長鏈接服務
在個人實現裏,每一個應用服務器都負責http and ws請求,其實也能夠將ws請求創建的聊天模型單獨成立爲一個模塊。從分佈式的角度來看,這兩種實現類型差很少,但從實現方便性來講,一個應用服務http+ws請求的方式更爲方便。下文會有解釋java
- Eureka 服務發現與註冊
- Redis Session共享
- Redis 消息訂閱
- Spring Boot
- Zuul 網關
- Spring Cloud Gateway 網關
- Spring WebSocket 處理長鏈接
- Ribbon 負載均衡
- Netty 多協議NIO網絡通訊框架
- Consistent Hash 一致性哈希算法
相信能走到這一步的人都瞭解過我上面列舉的技術棧了,若是尚未,能夠先去網上找找入門教程瞭解一下。下面的內容都與上述技術相關,題主默認你們都瞭解過了...
這裏是描述一致性Hash算法最易懂的文章傳送門mysql
下面我將描述session特性,以及根據這些特性列舉出n個解決分佈式架構中處理ws請求的集羣方案
WebSocketSession與HttpSession
在Spring所集成的WebSocket裏面,每一個ws鏈接都有一個對應的session:WebSocketSession
,在Spring WebSocket中,咱們創建ws鏈接以後能夠經過相似這樣的方式進行與客戶端的通訊:webprotected void handleTextMessage(WebSocketSession session, TextMessage message) { System.out.println("服務器接收到的消息: "+ message ); //send message to client session.sendMessage(new TextMessage("message")); }那麼問題來了:ws的session沒法序列化到redis,所以在集羣中,咱們沒法將全部
WebSocketSession
都緩存到redis進行session共享。每臺服務器都有各自的session。於此相反的是HttpSession
,redis能夠支持httpsession共享,可是目前沒有websocket session共享的方案,所以走redis websocket session共享這條路是行不通的。
有的人可能會想:我可不能夠將sessin關鍵信息緩存到redis,集羣中的服務器從redis拿取session關鍵信息而後從新構建websocket session...我只想說這種方法若是有人能試出來,請告訴我一聲...redis
以上即是websocket session與http session共享的區別,總的來講就是http session共享已經有解決方案了,並且很簡單,只要引入相關依賴:spring-session-data-redis
和spring-boot-starter-redis
,你們能夠從網上找個demo玩一下就知道怎麼作了。而websocket session共享的方案因爲websocket底層實現的方式,咱們沒法作到真正的websocket session共享。算法
剛開始的時候,我嘗試着用netty實現了websocket服務端的搭建。在netty裏面,並無websocket session這樣的概念,與其相似的是channel
,每個客戶端鏈接都表明一個channel。前端的ws請求經過netty監聽的端口,走websocket協議進行ws握手鍊接以後,經過一些列的handler(責鏈模式)進行消息處理。與websocket session相似地,服務端在鏈接創建後有一個channel,咱們能夠經過channel進行與客戶端的通訊spring
/** * TODO 根據服務器傳進來的id,分配到不一樣的group */ private static final ChannelGroup GROUP = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { //retain增長引用計數,防止接下來的調用引用失效 System.out.println("服務器接收到來自 " + ctx.channel().id() + " 的消息: " + msg.text()); //將消息發送給group裏面的全部channel,也就是發送消息給客戶端 GROUP.writeAndFlush(msg.retain()); }
那麼,服務端用netty仍是用spring websocket?如下我將從幾個方面列舉這兩種實現方式的優缺點sql
玩過netty的人都知道netty是的線程模型是nio模型,併發量很是高,spring5以前的網絡線程模型是servlet實現的,而servlet不是nio模型,因此在spring5以後,spring的底層網絡實現採用了netty。若是咱們單獨使用netty來開發websocket服務端,速度快是絕對的,可是可能會遇到下列問題:
1.與系統的其餘應用集成不方便,在rpc調用的時候,沒法享受springcloud裏feign服務調用的便利性
2.業務邏輯可能要重複實現
3.使用netty可能須要重複造輪子
4.怎麼鏈接上服務註冊中心,也是一件麻煩的事情
5.restful服務與ws服務須要分開實現,若是在netty上實現restful服務,有多麻煩可想而知,用spring一站式restful開發相信不少人都習慣了。api
spring websocket已經被springboot很好地集成了,因此在springboot上開發ws服務很是方便,作法很是簡單
第一步:添加依賴緩存
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
第二步:添加配置類
@Configuration public class WebSocketConfig implements WebSocketConfigurer { @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(myHandler(), "/") .setAllowedOrigins("*"); } @Bean public WebSocketHandler myHandler() { return new MessageHandler(); } }
第三步:實現消息監聽類
@Component @SuppressWarnings("unchecked") public class MessageHandler extends TextWebSocketHandler { private List<WebSocketSession> clients = new ArrayList<>(); @Override public void afterConnectionEstablished(WebSocketSession session) { clients.add(session); System.out.println("uri :" + session.getUri()); System.out.println("鏈接創建: " + session.getId()); System.out.println("current seesion: " + clients.size()); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { clients.remove(session); System.out.println("斷開鏈接: " + session.getId()); } @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) { String payload = message.getPayload(); Map<String, String> map = JSONObject.parseObject(payload, HashMap.class); System.out.println("接受到的數據" + map); clients.forEach(s -> { try { System.out.println("發送消息給: " + session.getId()); s.sendMessage(new TextMessage("服務器返回收到的信息," + payload)); } catch (Exception e) { e.printStackTrace(); } }); } }
從這個demo中,使用spring websocket實現ws服務的便利性你們可想而知了。爲了能更好地向spring cloud你們族看齊,我最終採用了spring websocket實現ws服務。
所以個人應用服務架構是這樣子的:一個應用既負責restful服務,也負責ws服務。沒有將ws服務模塊拆分是由於拆分出去要使用feign來進行服務調用。第一本人比較懶惰,第二拆分與不拆分相差在多了一層服務間的io調用,因此就沒有這麼作了。
要實現websocket集羣,咱們必不可免地得從zuul轉型到spring cloud gateway。緣由以下:
zuul1.0版本不支持websocket轉發,zuul 2.0開始支持websocket,zuul2.0幾個月前開源了,可是2.0版本沒有被spring boot集成,並且文檔不健全。所以轉型是必須的,同時轉型也很容易實現。
在gateway中,爲了實現ssl認證和動態路由負載均衡,yml文件中如下的某些配置是必須的,在這裏提早避免你們採坑 server: port: 443 ssl: enabled: true key-store: classpath:xxx.jks key-store-password: xxxx key-store-type: JKS key-alias: alias spring: application: name: api-gateway cloud: gateway: httpclient: ssl: handshake-timeout-millis: 10000 close-notify-flush-timeout-millis: 3000 close-notify-read-timeout-millis: 0 useInsecureTrustManager: true discovery: locator: enabled: true lower-case-service-id: true routes: - id: dc uri: lb://dc predicates: - Path=/dc/** - id: wecheck uri: lb://wecheck predicates: - Path=/wecheck/**
若是要愉快地玩https卸載,咱們還須要配置一個filter,不然請求網關時會出現錯誤not an SSL/TLS record
@Component public class HttpsToHttpFilter implements GlobalFilter, Ordered { private static final int HTTPS_TO_HTTP_FILTER_ORDER = 10099; @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { URI originalUri = exchange.getRequest().getURI(); ServerHttpRequest request = exchange.getRequest(); ServerHttpRequest.Builder mutate = request.mutate(); String forwardedUri = request.getURI().toString(); if (forwardedUri != null && forwardedUri.startsWith("https")) { try { URI mutatedUri = new URI("http", originalUri.getUserInfo(), originalUri.getHost(), originalUri.getPort(), originalUri.getPath(), originalUri.getQuery(), originalUri.getFragment()); mutate.uri(mutatedUri); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } ServerHttpRequest build = mutate.build(); ServerWebExchange webExchange = exchange.mutate().request(build).build(); return chain.filter(webExchange); } @Override public int getOrder() { return HTTPS_TO_HTTP_FILTER_ORDER; }}
這樣子咱們就可使用gateway來卸載https請求了,到目前爲止,咱們的基本框架已經搭建完畢,網關既能夠轉發https請求,也能夠轉發wss請求。接下來就是用戶多對多之間session互通的通信解決方案了。接下來,我將根據方案的優雅性,從最不優雅的方案開始講起。
這是最簡單的websocket集羣通信解決方案。場景以下:
教師A想要羣發消息給他的學生們
session廣播實現很簡單,可是有一個致命缺陷:計算力浪費現象,當服務器沒有消息接收者session的時候,至關於浪費了一次循環遍歷的計算力,該方案在併發需求不高的狀況下能夠優先考慮,實現很容易。
spring cloud中獲取服務集羣中每臺服務器信息的方法以下
@Resource private EurekaClient eurekaClient; Application app = eurekaClient.getApplication("service-name"); //instanceInfo包括了一臺服務器ip,port等消息 InstanceInfo instanceInfo = app.getInstances().get(0); System.out.println("ip address: " + instanceInfo.getIPAddr());
服務器須要維護關係映射表,將用戶的id與session作映射,session創建時在映射表中添加映射關係,session斷開後要刪除映射表內關聯關係
(本文的要點)
這種方法是本人認爲最優雅的實現方案,理解這種方案須要必定的時間,若是你耐心看下去,相信你必定會有所收穫。再強調一次,不瞭解一致性哈希算法的同窗請先看這裏,現先假設哈希環是順時針查找的。
首先,想要將一致性哈希算法的思想應用到咱們的websocket集羣,咱們須要解決如下新問題:
在集羣中,總會出現服務UP/DOWN的問題。
針對節點DOWN的問題分析以下:
一個服務器DOWN的時候,其擁有的websocket session會自動關閉鏈接,而且前端會收到通知。此時會影響到哈希環的映射錯誤。咱們只須要當監聽到服務器DOWN的時候,刪除哈希環上面對應的實際結點和虛結點,避免讓網關轉發到狀態是DOWN的服務器上。
實現方法:在eureka治理中心監聽集羣服務DOWN事件,並及時更新哈希環。
針對節點UP的問題分析以下:
現假設集羣中有服務CacheB
上線了,該服務器的ip地址恰好被映射到key1和cacheA
之間。那麼key1對應的用戶每次要發消息時都跑去CacheB
發送消息,結果明顯是發送不了消息,由於CacheB
沒有key1對應的session。
此時咱們有兩種解決方案。
方案A簡單,動做大:
eureka監聽到節點UP事件以後,根據現有集羣信息,更新哈希環。而且斷開全部session鏈接,讓客戶端從新鏈接,此時客戶端會鏈接到更新後的哈希環節點,以此避免消息沒法送達的狀況。
方案B複雜,動做小:
咱們先看看沒有虛擬節點的狀況,假設CacheC
和CacheA
之間上線了服務器CacheB
。全部映射在CacheC
到CacheB
的用戶發消息時都會去CacheB
裏面找session發消息。也就是說CacheB
一但上線,便會影響到CacheC
到CacheB
之間的用戶發送消息。因此咱們只須要將CacheA
斷開CacheC
到CacheB
的用戶所對應的session,讓客戶端重連。
接下來是有虛擬節點的狀況,假設淺色的節點是虛擬節點。咱們用長括號來表明某段區域映射的結果屬於某個
Cache
。首先是C節點未上線的狀況。圖你們應該都懂吧,全部B的虛擬節點都會指向真實的B節點,因此全部B節點逆時針那一部分都會映射到B(由於咱們規定哈希環順時針查找)。
接下來是C節點上線的狀況,能夠看到某些區域被C佔領了。
由以上狀況咱們能夠知道:節點上線,會有許多對應虛擬節點也同時上線,所以咱們須要將多段範圍key對應的session斷開鏈接(上圖紅色的部分)。具體算法有點複雜,實現的方式因人而異,你們能夠嘗試一下本身實現算法。
哈希環應該放在哪裏?
- gateway本地建立並維護哈希環。當ws請求進來的時候,本地獲取哈希環並獲取映射服務器信息,轉發ws請求。這種方法看上去不錯,但其實是不太可取的,回想一下上面服務器DOWN的時候只能經過eureka監聽,那麼eureka監聽到DOWN事件以後,須要經過io來通知gateway刪除對應節點嗎?顯然太麻煩了,將eureka的職責分散到gateway,不建議這麼作。
- eureka建立,並放到redis共享讀寫。這個方案可行,當eureka監聽到服務DOWN的時候,修改哈希環並推送到redis上。爲了請求響應時間儘可能地短,咱們不可讓gateway每次轉發ws請求的時候都去redis取一次哈希環。哈希環修改的機率的確很低,gateway只須要應用redis的消息訂閱模式,訂閱哈希環修改事件即可以解決此問題。
至此咱們的spring websocket集羣已經搭建的差很少了,最重要的地方仍是一致性哈希算法。如今有最後一個技術瓶頸,網關如何根據ws請求轉發到指定的集羣服務器上?答案在負載均衡。spring cloud gateway或zuul都默認集成了ribbon做爲負載均衡,咱們只須要根據創建ws請求時客戶端發來的user id,重寫ribbon負載均衡算法,根據user id進行hash,並在哈希環上尋找ip,並將ws請求轉發到該ip便完事了。流程以下圖所示:
接下來用戶溝通的時候,只須要根據id進行hash,在哈希環上獲取對應ip,即可以知道與該用戶創建ws鏈接時的session存在哪臺服務器上了!
題主在實際操做的時候發現了ribbon兩個不完善的地方......
AbstractLoadBalancerRule
重寫負載均衡策略以後,多個不一樣應用的請求變得混亂。假如eureka上有兩個service A和B,重寫負載均衡策略以後,請求A或B的服務,最終只會映射到其中一個服務上。很是奇怪!可能spring cloud gateway官網須要給出一個正確的重寫負載均衡策略的demo。default
!
難道這樣子咱們就沒有辦法了嗎?其實還有一個可行而且暫時可替代的辦法!
以下圖所示,客戶端發送一個普通的http請求(包含id參數)給網關,網關根據id進行hash,在哈希環中尋找ip地址,將ip地址返回給客戶端,客戶端再根據該ip地址進行ws請求。
因爲ribbon未完善key的處理,咱們暫時沒法在ribbon上實現一致性哈希算法。只能間接地經過客戶端發起兩次請求(一次http,一次ws)的方式來實現一致性哈希。但願不久以後ribbon能更新這個缺陷!讓咱們的websocket集羣實現得更優雅一點。
以上即是我這幾天探索的結果。期間遇到了許多問題,並逐一解決難題,列出兩個websocket集羣解決方案。第一個是session廣播,第二個是一致性哈希。這兩種方案針對不一樣場景各有優缺點,本文並未用到ActiveMQ,Karfa等消息隊列實現消息推送,只是想經過本身的想法,不依靠消息隊列來簡單地實現多用戶之間的長鏈接通信。但願能爲你們提供一條不一樣於尋常的思路。