從構建分佈式秒殺系統聊聊WebSocket推送通知

摘要:
前言 秒殺架構到後期,咱們採用了消息隊列的形式實現搶購邏輯,那麼以前拋出過這樣一個問題:消息隊列異步處理完每一個用戶請求後,如何通知給相應用戶秒殺成功? 場景映射 首先,咱們舉一個生活中比較常見的例子:咱們去銀行辦理業務,通常會選擇相關業務打印一個排號紙,而後就能夠坐在小板凳上玩着手機,等待被小喇叭報號。

前言

秒殺架構到後期,咱們採用了消息隊列的形式實現搶購邏輯,那麼以前拋出過這樣一個問題:消息隊列異步處理完每一個用戶請求後,如何通知給相應用戶秒殺成功?html

場景映射

timg

首先,咱們舉一個生活中比較常見的例子:咱們去銀行辦理業務,通常會選擇相關業務打印一個排號紙,而後就能夠坐在小板凳上玩着手機,等待被小喇叭報號。當小喇叭喊到你所持有的號碼,就能夠拿着排號紙去櫃檯辦理本身的業務。git

這裏,假設當咱們取排號紙的時候,銀行根據時間段內的排隊狀況,比較人性化的提示用戶:排隊人數較多,您是否繼續等待?否的話咱們能夠換個時間段再來辦理。github

由此咱們把生活場景映射到真實的秒殺業務邏輯中來:web

  • 咱們能夠把櫃檯比喻成商品下單處理邏輯單元
  • 拿到排號紙說明你進入相應商品處理隊列
  • 拿到排號紙的請求直接返回前臺,提示用戶搶購進行中
  • 排號紙進入隊列後,等待商品業務處理邏輯
  • 小喇叭叫到本身的排號至關於服務端通知用戶秒殺成功,這時候能夠進行支付邏輯
  • 那些拿不到票號的同窗,至關於隊列已滿直接返回秒殺失敗

解決方案

經過上面的場景,咱們很容易可以想到一種方案就是服務端通知,那麼如何作到服務端異步通知的呢?下面,主角開始登場了,就是咱們的Websocket。redis

1327868440

WebSocket是HTML5開始提供的一種瀏覽器與服務器間進行全雙工通信的網絡技術。依靠這種技術能夠實現客戶端和服務器端的長鏈接,雙向實時通訊。spring

bg2017051502

特色:數據庫

  • 異步、事件觸發
  • 能夠發送文本,圖片等流文件
  • 數據格式比較輕量,性能開銷小,通訊高效
  • 使用ws或者wss協議的客戶端socket,可以實現真正意義上的推送功能

缺點:後端

  • 部分瀏覽器不支持,瀏覽器支持的程度與方式有區別,須要各類兼容寫法。

集成案例

因爲咱們的秒殺架構項目案例中使用了SpringBoot,所以集成webSocket也是相對比較簡單的。瀏覽器

首先pom.xml引入如下依賴:緩存

<!-- webSocket 秒殺通知-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>複製代碼

WebSocketConfig 配置:

/**
 * WebSocket配置
 * 建立者  爪哇筆記
 * 建立時間    2018年5月29日
 */
@Configuration  
public class WebSocketConfig {  
    @Bean  
    public ServerEndpointExporter serverEndpointExporter() {  
        return new ServerEndpointExporter();  
    }  
} 複製代碼

WebSocketServer 配置:

@ServerEndpoint("/websocket/{userId}")
@Component
public class WebSocketServer {
    private final static Logger log = LoggerFactory.getLogger(WebSocketServer.class);
    //靜態變量,用來記錄當前在線鏈接數。應該把它設計成線程安全的。
    private static int onlineCount = 0;
    //concurrent包的線程安全Set,用來存放每一個客戶端對應的MyWebSocket對象。
    private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new 
CopyOnWriteArraySet<WebSocketServer>();

    //與某個客戶端的鏈接會話,須要經過它來給客戶端發送數據
    private Session session;

    //接收userId
    private String userId="";
    /**
     * 鏈接創建成功調用的方法*/
    @OnOpen
    public void onOpen(Session session,@PathParam("userId") String userId) {
        this.session = session;
        webSocketSet.add(this);     //加入set中
        addOnlineCount();           //在線數加1
        log.info("有新窗口開始監聽:"+userId+",當前在線人數爲" + getOnlineCount());
        this.userId=userId;
        try {
             sendMessage("鏈接成功");
        } catch (IOException e) {
            log.error("websocket IO異常");
        }
    }

    /**
     * 鏈接關閉調用的方法
     */
    @OnClose
    public void onClose() {
        webSocketSet.remove(this);  //從set中刪除
        subOnlineCount();           //在線數減1
        log.info("有一鏈接關閉!當前在線人數爲" + getOnlineCount());
    }

    /**
     * 收到客戶端消息後調用的方法
     * @param message 客戶端發送過來的消息*/
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("收到來自窗口"+userId+"的信息:"+message);
        //羣發消息
        for (WebSocketServer item : webSocketSet) {
            try {
                item.sendMessage(message);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("發生錯誤");
        error.printStackTrace();
    }
    /**
     * 實現服務器主動推送
     */
    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }
    /**
     * 羣發自定義消息
     * */
    public static void sendInfo(String message,@PathParam("userId") String userId){
        log.info("推送消息到窗口"+userId+",推送內容:"+message);
        for (WebSocketServer item : webSocketSet) {
            try {
                //這裏能夠設定只推送給這個userId的,爲null則所有推送
                if(userId==null) {
                    item.sendMessage(message);
                }else if(item.userId.equals(userId)){
                    item.sendMessage(message);
                }
            } catch (IOException e) {
                continue;
            }
        }
    }

    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }

    public static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }
}複製代碼

KafkaConsumer 消費配置,通知用戶是否秒殺成功:

/**
 * 消費者 spring-kafka 2.0 + 依賴JDK8
 * @author 科幫網 By https://blog.52itstyle.com
 */
@Component
public class KafkaConsumer {
    @Autowired
    private ISeckillService seckillService;
    
    private static RedisUtil redisUtil = new RedisUtil();
    /**
     * 監聽seckill主題,有消息就讀取
     * @param message
     */
    @KafkaListener(topics = {"seckill"})
    public void receiveMessage(String message){
        //收到通道的消息以後執行秒殺操做
        String[] array = message.split(";"); 
        if(redisUtil.getValue(array[0])!=null){//control層已經判斷了,其實這裏不須要再判斷了
            Result result = seckillService.startSeckil(Long.parseLong(array[0]), Long.
parseLong(array[1]));
            if(result.equals(Result.ok())){
                WebSocketServer.sendInfo(array[0].toString(), "秒殺成功");//推送給前臺
            }else{
                WebSocketServer.sendInfo(array[0].toString(), "秒殺失敗");//推送給前臺
                redisUtil.cacheValue(array[0], "ok");//秒殺結束
            }
        }else{
            WebSocketServer.sendInfo(array[0].toString(), "秒殺失敗");//推送給前臺
        }
    }
}複製代碼

webSocket.js 前臺通知邏輯:

$(function(){
    socket.init();
});
var basePath = "ws://localhost:8080/seckill/";
socket = {
    webSocket : "",
    init : function() {
        //userId:自行追加
        if ('WebSocket' in window) {
            webSocket = new WebSocket(basePath+'websocket/1'); 
        } 
        else if ('MozWebSocket' in window) {
            webSocket = new MozWebSocket(basePath+"websocket/1");
        } 
        else {
            webSocket = new SockJS(basePath+"sockjs/websocket");
        }
        webSocket.onerror = function(event) {
            alert("websockt鏈接發生錯誤,請刷新頁面重試!")
        };
        webSocket.onopen = function(event) {
            
        };
        webSocket.onmessage = function(event) {
            var message = event.data;
            alert(message)//判斷秒殺是否成功、自行處理邏輯
        };
    }
}複製代碼

客戶端API

客戶端與服務器通訊

  • send() 向遠程服務器發送數據
  • close() 關閉該websocket連接

監聽函數 

  • onopen 當網絡鏈接創建時觸發該事件
  • onerror 當網絡發生錯誤時觸發該事件
  • onclose 當websocket被關閉時觸發該事件
  • onmessage 當websocket接收到服務器發來的消息的時觸發的事件,也是通訊中最重要的一個監聽事件。msg.data

readyState屬性

這個屬性能夠返回websocket所處的狀態。

  • CONNECTING(0) websocket正嘗試與服務器創建鏈接
  • OPEN(1) websocket與服務器已經創建鏈接
  • CLOSING(2) websocket正在關閉與服務器的鏈接
  • CLOSED(3) websocket已經關閉了與服務器的鏈接

開源方案

goeasy

GoEasy實時Web推送,支持後臺推送和前臺推送兩種:後臺推送能夠選擇Java SDK、 Restful API支持全部開發語言;前臺推送:JS推送。不管選擇哪一種方式推送代碼都十分簡單(10分鐘可搞定)。因爲它支持websocket 和polling兩種鏈接方式因此兼顧大多數主流瀏覽器,低版本的IE瀏覽器也是支持的。

地址:goeasy.io/

Pushlets

Pushlets 是經過長鏈接方式實現「推」消息的。推送模式分爲:Poll(輪詢)、Pull(拉)。

地址:www.pushlets.com/

Pushlet

Pushlet 是一個開源的 Comet 框架,Pushlet 使用了觀察者模型:客戶端發送請求,訂閱感興趣的事件;服務器端爲每一個客戶端分配一個會話 ID 做爲標記,事件源會把新產生的事件以多播的方式發送到訂閱者的事件隊列裏。

地址:github.com/wjw465150/P…

總結

其實前面有提過,儘管WebSocket有諸多優勢,可是,若是服務端維護不少長鏈接也是挺耗費資源的,服務器集羣以及覽器或者客戶端兼容性問題,也會帶來了一些不肯定性因素。大致瞭解了一下各大廠的作法,大多數都仍是基於輪詢的方式實現的,好比:騰訊PC端微信掃碼登陸、京東商城支付成功通知等等。

有些小夥伴可能會問了,輪詢豈不是會更耗費資源?其實在我看來,有些輪詢是不可能穿透到後端數據庫查詢服務的,好比秒殺,一個緩存標記位就能夠斷定是否秒殺成功。相對於WS的長鏈接以及其不肯定因素,在秒殺場景下,輪詢仍是相對比較合適的。

思考

最後,思考一個問題:100件商品,假若有一萬人進行搶購,該如何設置隊列長度?

秒殺案例:gitee.com/52itstyle/s…

參考

blog.52itstyle.com/archives/73…

www.xoriant.com/blog/mobili…

原文連接

相關文章
相關標籤/搜索