你們好!我是 Sergey Kamardin,是 Mail.Ru 的一名工程師。html
本文主要介紹如何使用 Go 開發高負載的 WebSocket 服務。linux
若是你熟悉 WebSockets,但對 Go 瞭解很少,仍但願你對這篇文章的想法和性能優化方面感興趣。nginx
爲了定義本文的討論範圍,有必要說明咱們爲何須要這個服務。git
Mail.Ru 有不少有狀態系統。用戶的電子郵件存儲就是其中之一。咱們有幾種方法能夠跟蹤該系統的狀態變化以及系統事件,主要是經過按期系統輪詢或者狀態變化時的系統通知來實現。github
兩種方式各有利弊。可是對於郵件而言,用戶收到新郵件的速度越快越好。golang
郵件輪詢大約每秒 50,000 個 HTTP 查詢,其中 60% 返回 304 狀態,這意味着郵箱中沒有任何更改。web
所以,爲了減小服務器的負載並加快向用戶發送郵件的速度,咱們決定經過用發佈 - 訂閱服務(也稱爲消息總線,消息代理或事件管道)的模式來造一個輪子。一端接收有關狀態更改的通知,另外一端訂閱此類通知。編程
以前的架構:瀏覽器
如今的架構:性能優化
第一個方案是以前的架構。瀏覽器按期輪詢 API 並查詢存儲(郵箱服務)是否有更改。
第二種方案是如今的架構。瀏覽器與通知 API 創建了 WebSocket 鏈接,通知 API 是總線服務的消費者。一旦接收到新郵件後,Storage 會將有關它的通知發送到總線(1),總線將其發送給訂閱者(2)。 API 經過鏈接發送這個收到的通知,將其發送到用戶的瀏覽器(3)。
因此如今咱們將討論這個 API 或者這個 WebSocket 服務。展望一下將來,咱們的服務未來可能會有 300 萬個在線鏈接。
咱們來看看如何在沒有任何優化的狀況下使用 Go 實現服務器的某些部分。
在咱們繼續使用 net/http
以前,來談談如何發送和接收數據。這個數據位於 WebSocket 協議上(例如 JSON 對象),咱們在下文中將其稱爲包。
咱們先來實現 Channel
結構體,該結構體將包含在 WebSocket 鏈接上發送和接收數據包的邏輯。
// WebSocket Channel 的實現
// Packet 結構體表示應用程序級數據
type Packet struct {
...
}
// Channel 裝飾用戶鏈接
type Channel struct {
conn net.Conn // WebSocket 鏈接
send chan Packet // 傳出的 packets 隊列
}
func NewChannel(conn net.Conn) *Channel {
c := &Channel{
conn: conn,
send: make(chan Packet, N),
}
go c.reader()
go c.writer()
return c
}
複製代碼
我想讓你注意的是 reader
和 writer
goroutines。每一個 goroutine 都須要內存棧,初始大小可能爲 2 到 8 KB,具體取決於操做系統和 Go 版本。
關於上面提到的 300 萬個線上鏈接,爲此咱們須要消耗 24 GB 的內存(假設單個 goroutine 消耗 4 KB 棧內存)用於全部的鏈接。而且這還沒包括爲 Channel
結構體分配的內存,ch.send
傳出的數據包占用的內存以及其餘內部字段的內存。
讓咱們來看看 reader
的實現:
// Channel’s reading goroutine.
func (c *Channel) reader() {
// 建立一個緩衝 read 來減小 read 的系統調用
buf := bufio.NewReader(c.conn)
for {
pkt, _ := readPacket(buf)
c.handle(pkt)
}
}
複製代碼
這裏咱們使用了 bufio.Reader
來減小 read()
系統調用的次數,並儘量多地讀取 buf
中緩衝區大小所容許的數量。在這個無限循環中,咱們等待新數據的到來。請先記住這句話:等待新數據的到來。咱們稍後會回顧。
咱們先不考慮傳入的數據包的解析和處理,由於它對咱們討論的優化並不重要。可是,buf
值得咱們關注:默認狀況下,它是 4 KB,這意味着鏈接還須要 12 GB 的內存。writer
也有相似的狀況:
// Channel’s writing goroutine.
func (c *Channel) writer() {
// 建立一個緩衝 write 來減小 write 的系統調用
buf := bufio.NewWriter(c.conn)
for pkt := range c.send {
_ := writePacket(buf, pkt)
buf.Flush()
}
}
複製代碼
咱們經過 Channel 的 c.send
遍歷將數據包傳出 並將它們寫入緩衝區。細心的讀者可能猜到了,這是咱們 300 萬個鏈接的另外 12 GB 的內存消耗。
已經實現了一個簡單的 Channel
,如今咱們須要使用 WebSocket 鏈接。因爲仍然處於經常使用的方式的標題下,因此咱們以經常使用的方式繼續。
注意:若是你不知道 WebSocket 的運行原理,須要記住客戶端會經過名爲 Upgrade 的特殊 HTTP 機制轉換到 WebSocket 協議。在成功處理 Upgrade 請求後,服務端和客戶端將使用 TCP 鏈接來傳輸二進制的 WebSocket 幀。這裏是鏈接的內部結構的說明。
// 經常使用的轉換爲 WebSocket 的方法
import (
"net/http"
"some/websocket"
)
http.HandleFunc("/v1/ws", func(w http.ResponseWriter, r *http.Request) {
conn, _ := websocket.Upgrade(r, w)
ch := NewChannel(conn)
//...
})
複製代碼
須要注意的是,http.ResponseWriter
爲 bufio.Reader
和 bufio.Writer
(均爲 4 KB 的緩衝區)分配了內存,用於對 *http.Request
的初始化和進一步的響應寫入。
不管使用哪一種 WebSocket 庫,在 Upgrade 成功後,服務端在調用 responseWriter.Hijack()
以後都會收到 I/O 緩衝區和 TCP 鏈接。
提示:在某些狀況下,
go:linkname
可被用於經過調用net/http.putBufio {Reader, Writer}
將緩衝區返回給net/http
內的sync.Pool
。
所以,咱們還須要 24 GB 的內存用於 300 萬個鏈接。
那麼,如今爲了一個什麼功能都沒有的應用程序,一共須要消耗 72 GB 的內存!
咱們回顧一下在簡介部分中談到的內容,並記住用戶鏈接的方式。在切換到 WebSocket 後,客戶端會經過鏈接發送包含相關事件的數據包。而後(不考慮 ping/pong
等消息),客戶端可能在整個鏈接的生命週期中不會發送任何其餘內容。
鏈接的生命週期可能持續幾秒到幾天。
所以,大部分時間 Channel.reader()
和 Channel.writer()
都在等待接收或發送數據。與它們一塊兒等待的還有每一個大小爲 4 KB 的 I/O 緩衝區。
如今咱們對哪些地方能夠作優化應該比較清晰了。
Channel.reader()
經過給 bufio.Reader.Read()
內的 conn.Read()
加鎖來等待新數據的到來(譯者注:上文中的伏筆),一旦鏈接中有數據,Go runtime(譯者注:runtime 包含 Go 運行時的系統交互的操做,這裏保留原文)「喚醒」 goroutine 並容許它讀取下一個數據包。在此以後,goroutine 再次被鎖定,同時等待新的數據。讓咱們看看 Go runtime 來理解 goroutine 爲何必須「被喚醒」。
若是咱們查看 conn.Read()
的實現,將會在其中看到 net.netFD.Read()
調用:
// Go 內部的非阻塞讀.
// net/fd_unix.go
func (fd *netFD) Read(p []byte) (n int, err error) {
//...
for {
n, err = syscall.Read(fd.sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN {
if err = fd.pd.waitRead(); err == nil {
continue
}
}
}
//...
break
}
//...
}
複製代碼
Go 在非阻塞模式下使用套接字。 EAGAIN 表示套接字中沒有數據,而且讀取空套接字時不會被鎖定,操做系統將返回控制權給咱們。(譯者注:EAGAIN 表示目前沒有可用數據,請稍後再試)
咱們從鏈接文件描述符中看到一個 read()
系統調用函數。若是 read 返回 EAGAIN 錯誤,則 runtime 調用 pollDesc.waitRead():
// Go 內部關於 netpoll 的使用
// net/fd_poll_runtime.go
func (pd *pollDesc) waitRead() error {
return pd.wait('r')
}
func (pd *pollDesc) wait(mode int) error {
res := runtime_pollWait(pd.runtimeCtx, mode)
//...
}
複製代碼
若是深刻挖掘,咱們將看到 netpoll 在 Linux 中是使用 epoll 實現的,而在 BSD 中是使用 kqueue 實現的。爲何不對鏈接使用相同的方法?咱們能夠分配一個 read 緩衝區並僅在真正須要時啓動 read goroutine:當套接字中有可讀的數據時。
在 github.com/golang/go 上,有一個導出 netpoll 函數的 issue。
假設咱們有 Go 的 netpoll 實現。如今咱們能夠避免在內部緩衝區啓動 Channel.reader()
goroutine,而是在鏈接中訂閱可讀數據的事件:
// 使用 netpoll
ch := NewChannel(conn)
// 經過 netpoll 實例觀察 conn
poller.Start(conn, netpoll.EventRead, func() {
// 咱們在這裏產生 goroutine 以防止在輪詢從 ch 接收數據包時被鎖。
go Receive(ch)
})
// Receive 從 conn 讀取數據包並以某種方式處理它。
func (ch *Channel) Receive() {
buf := bufio.NewReader(ch.conn)
pkt := readPacket(buf)
c.handle(pkt)
}
複製代碼
Channel.writer()
更簡單,由於咱們只能在發送數據包時運行 goroutine 並分配緩衝區:
// 當咱們須要時啓動 writer goroutine
func (ch *Channel) Send(p Packet) {
if c.noWriterYet() {
go ch.writer()
}
ch.send <- p
}
複製代碼
須要注意的是,當操做系統在
write()
調用上返回EAGAIN
時,咱們不處理這種狀況。咱們依靠 Go runtime 來處理這種狀況,由於這種狀況在服務器上不多見。然而,若是有必要,它能夠以與reader()
相同的方式處理。
當從 ch.send
(一個或幾個)讀取傳出數據包後,writer 將完成其操做並釋放 goroutine 的內存和發送緩衝區的內存。
完美!咱們經過去除兩個運行的 goroutine 中的內存消耗和 I/O 緩衝區的內存消耗節省了 48 GB。
大量鏈接不只僅涉及到內存消耗高的問題。在開發服務時,咱們遇到了反覆出現的競態條件和 self-DDoS 形成的死鎖。
例如,若是因爲某種緣由咱們忽然沒法處理 ping/pong
消息,可是空閒鏈接的處理程序繼續關閉這樣的鏈接(假設鏈接被破壞,沒有提供數據),客戶端每隔 N 秒失去鏈接並嘗試再次鏈接而不是等待事件。
被鎖或超載的服務器中止服務,若是它以前的負載均衡器(例如,nginx)將請求傳遞給下一個服務器實例,這將是不錯的。
此外,不管服務器負載如何,若是全部客戶端忽然(多是因爲錯誤緣由)向咱們發送數據包,以前的 48 GB 內存的消耗將不可避免,由於須要爲每一個鏈接分配 goroutine 和緩衝區。
上面的狀況,咱們可使用 goroutine 池限制同時處理的數據包數量。下面是這種池的簡單實現:
// goroutine 池的簡單實現
package gopool
func New(size int) *Pool {
return &Pool{
work: make(chan func()), sem: make(chan struct{}, size), } } func (p *Pool) Schedule(task func()) error {
select {
case p.work <- task:
case p.sem <- struct{}{}:
go p.worker(task)
}
}
func (p *Pool) worker(task func()) {
defer func() { <-p.sem }
for {
task()
task = <-p.work
}
}
複製代碼
如今咱們的 netpoll 代碼以下:
// 處理 goroutine 池中的輪詢事件。
pool := gopool.New(128)
poller.Start(conn, netpoll.EventRead, func() {
// 咱們在全部 worker 被佔用時阻塞 poller
pool.Schedule(func() {
Receive(ch)
})
})
複製代碼
如今咱們不只在套接字中有可讀數據時讀取,並且還能夠佔用池中的空閒的 goroutine。
一樣,咱們修改 Send()
:
// 複用 writing goroutine
pool := gopool.New(128)
func (ch *Channel) Send(p Packet) {
if c.noWriterYet() {
pool.Schedule(ch.writer)
}
ch.send <- p
}
複製代碼
取代 go ch.writer()
,咱們想寫一個複用的 goroutines。所以,對於擁有 N
個 goroutines 的池,咱們能夠保證同時處理 N
個請求而且在 N + 1
的時候, 咱們不會分配 N + 1
個緩衝區。 goroutine 池還容許咱們限制新鏈接的 Accept()
和 Upgrade()
,並避免大多數的 DDoS 攻擊。
如前所述,客戶端使用 HTTP Upgrade 切換到 WebSocket 協議。這就是 WebSocket 協議的樣子:
## HTTP Upgrade 示例
GET /ws HTTP/1.1
Host: mail.ru
Connection: Upgrade
Sec-Websocket-Key: A3xNe7sEB9HixkmBhVrYaA==
Sec-Websocket-Version: 13
Upgrade: websocket
HTTP/1.1 101 Switching Protocols
Connection: Upgrade
Sec-Websocket-Accept: ksu0wXWG+YmkVx+KQR2agP0cQn4=
Upgrade: websocket
複製代碼
也就是說,在咱們的例子中,須要 HTTP 請求及其 Header 用於切換到 WebSocket 協議。這些知識以及 http.Request
中存儲的內容代表,爲了優化,咱們須要在處理 HTTP 請求時放棄沒必要要的內存分配和內存複製,並棄用 net/http
庫。
例如,
http.Request
有一個與 Header 具備相同名稱的字段,這個字段用於將數據從鏈接中複製出來填充請求頭。想象一下,該字段須要消耗多少額外內存,例如碰到比較大的 Cookie 頭。
不幸的是,在咱們優化的時候全部存在的庫都是使用標準的 net/http
庫進行升級。並且,(兩個)庫都不能使用上述的讀寫優化方案。爲了採用這些優化方案,咱們須要用一個比較低級的 API 來處理 WebSocket。要重用緩衝區,咱們須要把協議函數變成這樣:
func ReadFrame(io.Reader) (Frame, error) func WriteFrame(io.Writer, Frame) error 複製代碼
若是有一個這種 API 的庫,咱們能夠按下面的方式從鏈接中讀取數據包(數據包的寫入也同樣):
// 預期的 WebSocket 實現API
// getReadBuf, putReadBuf 用來複用 *bufio.Reader (with sync.Pool for example).
func getReadBuf(io.Reader) *bufio.Reader func putReadBuf(*bufio.Reader) // 當 conn 中的數據可讀取時,readPacket 被調用 func readPacket(conn io.Reader) error {
buf := getReadBuf()
defer putReadBuf(buf)
buf.Reset(conn)
frame, _ := ReadFrame(buf)
parsePacket(frame.Payload)
//...
}
複製代碼
簡單來講,咱們須要本身的 WebSocket 庫。
在乎識形態上,編寫 ws
庫是爲了避免將其協議操做邏輯強加給用戶。全部讀寫方法都實現了標準的 io.Reader 和 io.Writer 接口,這樣就可使用或不使用緩衝或任何其餘 I/O 。
除了來自標準庫 net/http
的升級請求以外,ws
還支持零拷貝升級,升級請求的處理以及切換到 WebSocket 無需分配內存或複製內存。ws.Upgrade()
接受 io.ReadWriter
(net.Conn
實現了此接口)。換句話說,咱們可使用標準的 net.Listen()
將接收到的鏈接從 ln.Accept()
轉移給 ws.Upgrade()
。該庫使得能夠複製任何請求數據以供應用程序使用(例如,Cookie
用來驗證會話)。
下面是升級請求的基準測試結果:標準庫 net/http
的服務與用零拷貝升級的 net.Listen()
:
BenchmarkUpgradeHTTP 5156 ns/op 8576 B/op 9 allocs/op
BenchmarkUpgradeTCP 973 ns/op 0 B/op 0 allocs/op
複製代碼
切換到 ws
和零拷貝升級爲咱們節省了另外的 24 GB 內存 - 在 net/http
處理請求時爲 I/O 緩衝區分配的空間。
咱們總結一下這些優化。
net/http
不是處理升級到 WebSocket 的最快方法。解決方案:在裸 TCP 鏈接上使用內存零拷貝升級。服務的代碼看起來以下所示:
// WebSocket 服務器示例,包含 netpoll,goroutine 池和內存零拷貝的升級。
import (
"net"
"github.com/gobwas/ws"
)
ln, _ := net.Listen("tcp", ":8080")
for {
// 嘗試在空閒池的 worker 內的接收傳入的鏈接。若是超過 1ms 沒有空閒 worker,則稍後再試。這有助於防止 self-ddos 或耗盡服務器資源的狀況。
err := pool.ScheduleTimeout(time.Millisecond, func() {
conn := ln.Accept()
_ = ws.Upgrade(conn)
// 使用 Channel 結構體包裝 WebSocket 鏈接
// 將幫助咱們處理應用包
ch := NewChannel(conn)
// 等待鏈接傳入字節
poller.Start(conn, netpoll.EventRead, func() {
// 不要超過資源限制
pool.Schedule(func() {
// 讀取並處理傳入的包
ch.Recevie()
})
})
})
if err != nil {
time.Sleep(time.Millisecond)
}
}
複製代碼
過早優化是編程中全部邪惡(或至少大部分)的根源。 -- Donald Knuth
固然,上述優化是和需求相關的,但並不是全部狀況下都是如此。例如,若是空閒資源(內存,CPU)和線上鏈接數之間的比率比較高,則優化可能沒有意義。可是,經過了解優化的位置和內容,咱們會受益不淺。
感謝你的關注!
via: www.freecodecamp.org/news/millio…
做者:Sergey Kamardin 譯者:咔嘰咔嘰 校對:polaris1119