使用Go基於WebSocket構建千萬級視頻直播彈幕系統

(1)業務複雜度介紹

開門見山,假設一個直播間同時500W人在線,那麼1秒鐘1000條彈幕,那麼彈幕系統的推送頻率就是:500W * 1000條/秒=50億條/秒,想一想B站2019跨年晚會那次彈幕系統得是多麼的NB,何況一個大型網站不可能只有一個直播間!git

使用Go基於WebSocket構建千萬級視頻直播彈幕系統

使用Go作WebSocket開發無非就是三種狀況:github

  • 使用Go原生自帶的庫,也就是golang.org/x/net,可是這個官方庫真是出了奇Bug多
  • 使用GitHub大佬gorilla/websocket庫,能夠結合到某些Web開發框架,好比Gin、iris等,只要使用的框架式基於golang.org/net的,那麼這個庫就能夠與這個框架結合
  • 手擼一個WebSocket框架

根據估算結果,彈幕推送量很大的時候,Linux內核將會出現瓶頸,由於Linux內核發送TCP包的時候極限包發送頻率是100W。所以能夠將同一秒內的彈幕消息合併爲1條推送,減小網絡小數據包的發送,從而下降推送頻率。golang

彈幕系統須要維護在線的用戶長鏈接來實現定向推送到在線的用戶,一般是使用Hash字典結構,一般推送消息就是遍歷在線用的Hash字典。在彈幕推送期間用戶在不斷的上下線,爲了維護上線用戶,那麼就得不斷的修改Hash字典,不斷地進行鎖操做,用戶量過大致使鎖瓶頸。所以能夠將整個Hash結構拆分爲多個Hash結構,分別對多個Hash結構加不一樣的鎖,而且使用讀寫鎖替代互斥鎖。web

一般服務器與客戶端交互使用JSON結構,那麼須要不斷的編碼解碼JSON數據,這將會致使CPU瓶頸。將消息先進行合併,而後進行編碼,最後輪詢Hash結構進行推送。跨域

以上是單體架構存在的問題,爲了支持更多的用戶負載,一般彈幕系統採用分佈式架構,進行彈性擴容縮容。安全

(2)推送仍是拉取?

若是是客戶端拉取服務器端數據,那麼將會存在如下幾個問題:服務器

  • 直播在線人數多就意味着消息數據更新頻率高,拉取消息意味着彈幕沒法知足時效性
  • 若是不少客戶端同時拉取,那麼服務器端的壓力無異於DDOS
  • 一個彈幕系統應該是通用的,所以對於直播間彈幕較少的場景,意味着消息數據拉取請求都是無效的

所以咱們考慮推送模式:當數據發生更新的時候服務器端主動推送到客戶端,這樣能夠有效減小客戶端的請求次數。若是須要實現消息推送,那麼就意味着服務器端維護大量的長鏈接。websocket

(3)爲何使用WebSocket?

實現彈幕消息的實時更新必定是使用Socket的方式,那麼爲啥要使用WebSocket呢?如今大部分直播應用的開發都是跨平臺的,然而跨平臺的開發框架本質就是Web開發,那麼必定離不開WebSocket,並且一部分用戶會選擇在Web端看視頻,好比Bilibili,現現在也有一些桌面應用是用Electron等跨平臺框架開發的,好比Lark飛書等,所以實現消息推送的最佳方案就是使用WebSocket。網絡

使用WebSocket能夠輕鬆的維持服務器端長鏈接,其次WebSocket是架構在HTTP協議之上的,而且也可使用HTTPS方式,所以WebSocket是可靠傳輸,而且不須要開發者關注底層細節。架構

使用Go基於WebSocket構建千萬級視頻直播彈幕系統

爲啥要使用Go搞WebSocket呢?首先說到WebSocket你可能會想到Node.js,可是Node.js是單線程模型,若是實現高併發,不得不建立多個Node.js進程,可是這又不容易服務端遍歷整個鏈接集合;若是使用Java就會顯得比較笨重,Java項目的部署,編寫Dockerfile都不如Go的目標二進制更加簡潔,而且Go協程很容易實現高併發,上一章說到Go語言目前也有成熟的WebSocket輪子。

(4)服務端基本Demo

首先搭建好一個框架:

package main

import (
    "fmt"
    "net/http"
)

func main() {
  fmt.Println("Listen localhost:8080")
     // 註冊一個用於WebSocket的路由,實際業務中不可能只有一個路由
    http.HandleFunc("/messages", messageHandler)
    // 監聽8080端口,沒有實現服務異常處理器,所以第二個參數是nil
    http.ListenAndServe("localhost:8080", nil)
}

func messageHandler(response http.ResponseWriter, request *http.Request) {
    // TODO: 實現消息處理
    response.Write([]byte("HelloWorld"))
}

而後完善messageHandler函數:

func messageHandler(response http.ResponseWriter, request *http.Request) {
    var upgrader = websocket.Upgrader{
        // 容許跨域
        CheckOrigin: func(resquest *http.Request) bool {
            return true
        },
    }

    // 創建鏈接
    conn, err := upgrader.Upgrade(response, request, nil)
    if err != nil {
        return
    }

    // 收發消息
    for {
        // 讀取消息
        _, bytes, err := conn.ReadMessage()
        if err != nil {
            _ = conn.Close()
        }
        // 寫入消息
        err = conn.WriteMessage(websocket.TextMessage, bytes)
        if err != nil {
            _ = conn.Close()
        }
    }
}

如今基本上實現了WebSocket功能,可是websocket的原生API不是線程安全的(Close方法是線程安全的,而且是可重入的),而且其餘模塊沒法複用業務邏輯,所以進行封裝:

  • 封裝Connection對象描述一個WebSocket鏈接
  • 爲Connection對象提供線程安全的關閉、接收、發送API
// main.go
package main

import (
    "bluemiaomiao.github.io/websocket-go/service"
    "fmt"
    "net/http"

    "github.com/gorilla/websocket"
)

func main() {
    fmt.Println("Listen localhost:8080")
    http.HandleFunc("/messages", messageHandler)
    _ = http.ListenAndServe("localhost:8080", nil)
}

func messageHandler(response http.ResponseWriter, request *http.Request) {
    var upgrader = websocket.Upgrader{
        // 容許跨域
        CheckOrigin: func(resquest *http.Request) bool {
            return true
        },
    }

    // 創建鏈接
    conn, err := upgrader.Upgrade(response, request, nil)
    wsConn, err := service.Create(conn)
    if err != nil {
        return
    }

    // 收發消息
    for {
        // 讀取消息
        msg, err := wsConn.ReadOne()
        if err != nil {
            wsConn.Close()
        }
        // 寫入消息
        err = wsConn.WriteOne(msg)
        if err != nil {
            _ = conn.Close()
        }
    }
}
// service/messsage_service.go
package service

import (
    "errors"
    "github.com/gorilla/websocket"
    "sync"
)

// 封裝的鏈接對象
// 
// 因爲websocket的Close()方法是可重入的,因此能夠屢次調用,可是關閉Channel的close()
// 方法不是可重入的,所以經過isClosed進行判斷
// isClosed可能發生資源競爭,所以經過互斥鎖避免
// 關閉websocket鏈接後,也要自動關閉輸入輸出消息流,所以經過signalCloseLoopChan實現
type Connection struct {
    conn                                   *websocket.Conn    // 具體的鏈接對象
    inputStream                         chan []byte             // 輸入流,使用Channel模擬
    outputStream                      chan []byte             // 輸出流,使用chaneel模擬
    signalCloseLoopChan         chan byte              // 關閉信號
    isClosed                              bool                       // 是否調用過close()方法
    lock                                     sync.Mutex            // 簡單的鎖
}

// 用於初始化一個鏈接對象
func Create(conn *websocket.Conn) (connection *Connection, err error) {
    connection = &Connection{
        conn:                            conn,
        inputStream:                make(chan []byte, 1000),
        outputStream:              make(chan []byte, 1000),
        signalCloseLoopChan: make(chan byte, 1),
        isClosed:                       false,
    }

    // 啓動讀寫循環
    go connection.readLoop()
    go connection.writeLoop()
    return
}

// 讀取一條消息
func (c *Connection) ReadOne() (msg []byte, err error) {
    select {
    case msg = <-(*c).inputStream:
    case <-(*c).signalCloseLoopChan:
        err = errors.New("connection is closed")
    }
    return
}

// 寫入一條消息
func (c *Connection) WriteOne(msg []byte) (err error) {
    select {
    case (*c).outputStream <- msg:
    case <-(*c).signalCloseLoopChan:
        err = errors.New("connection is closed")
    }
    return
}

// 關閉鏈接對象
func (c *Connection) Close() {
    _ = (*c).conn.Close()
    (*c).lock.Lock()
    if !(*c).isClosed {
        close((*c).signalCloseLoopChan)
    }
    (*c).lock.Unlock()

}

// 讀取循環
func (c *Connection) readLoop() {
    // 不停的讀取長鏈接中的消息,只要存在消息就將其放到隊列中
    for {
        _, bytes, err := (*c).conn.ReadMessage()
        if err != nil {
            (*c).Close()
        }
        select {
        case <-(*c).signalCloseLoopChan:
            (*c).Close()
        case (*c).inputStream <- bytes:
        }
    }
}

// 寫入循環
func (c *Connection) writeLoop() {
    // 只要隊列中存在消息,就將其寫入
    var data []byte
    for {
        select {
        case data = <-(*c).outputStream:
        case <-(*c).signalCloseLoopChan:
            (*c).Close()
        }
        err := (*c).conn.WriteMessage(websocket.TextMessage, data)
        if err != nil {
            _ = (*c).conn.Close()
        }
    }
}

至此,你已經學會了如何使用Go構建WebSocket服務。

相關文章
相關標籤/搜索