gev
是一個輕量、快速的基於 Reactor 模式的非阻塞 TCP 網絡庫,支持自定義協議,輕鬆快速搭建高性能服務器。git
TCP 自己就是面向流的協議,就是一串沒有界限的數據。因此本質上來講 TCP 粘包是一個僞命題。github
TCP 底層並不關心上層業務數據,會套接字緩衝區的實際狀況進行包的劃分,一個完整的業務數據可能會被拆分紅屢次進行發送,也可能會將多個小的業務數據封裝成一個大的數據包發送(Nagle算法)。web
gev 經過回調函數 OnMessage
通知用戶數據到來,回調函數中會將用戶數據緩衝區(ringbuffer)經過參數傳遞過來。算法
用戶經過對 ringbuffer 操做,來進行數據解包,獲取到完整用戶數據後再進行業務操做。這樣又一個明顯的缺點,就是會讓業務操做和自定義協議解析代碼堆在一塊兒。服務器
因此,最近對 gev 進行了一次較大改動,主要是爲了可以以插件的形式支持各類自定義的數據協議,讓使用者能夠便捷處理 TCP 粘包問題,專一於業務邏輯。websocket
作法以下,定義一個接口 Protocol網絡
// Protocol 自定義協議編解碼接口
type Protocol interface {
UnPacket(c *Connection, buffer *ringbuffer.RingBuffer) (interface{}, []byte)
Packet(c *Connection, data []byte) []byte
}
複製代碼
用戶只需實現這個接口,並註冊到 server 中,當客戶端數據到來時,gev 會首先調用 UnPacket 方法,若是緩衝區中的數據足夠組成一幀,則將數據解包,並返回真正的用戶數據,而後在回調 OnMessage 函數並將數據經過參數傳遞。併發
下面,咱們實現一個簡單的自定義協議插件,來啓動一個 Server :socket
| 數據長度 n | payload |
| 4字節 | n 字節 |
複製代碼
// protocol.go
package main
import (
"encoding/binary"
"github.com/Allenxuxu/gev/connection"
"github.com/Allenxuxu/ringbuffer"
"github.com/gobwas/pool/pbytes"
)
const exampleHeaderLen = 4
type ExampleProtocol struct{}
func (d *ExampleProtocol) UnPacket(c *connection.Connection, buffer *ringbuffer.RingBuffer) (interface{}, []byte) {
if buffer.VirtualLength() > exampleHeaderLen {
buf := pbytes.GetLen(exampleHeaderLen)
defer pbytes.Put(buf)
_, _ = buffer.VirtualRead(buf)
dataLen := binary.BigEndian.Uint32(buf)
if buffer.VirtualLength() >= int(dataLen) {
ret := make([]byte, dataLen)
_, _ = buffer.VirtualRead(ret)
buffer.VirtualFlush()
return nil, ret
} else {
buffer.VirtualRevert()
}
}
return nil, nil
}
func (d *ExampleProtocol) Packet(c *connection.Connection, data []byte) []byte {
dataLen := len(data)
ret := make([]byte, exampleHeaderLen+dataLen)
binary.BigEndian.PutUint32(ret, uint32(dataLen))
copy(ret[4:], data)
return ret
}
複製代碼
// server.go
package main
import (
"flag"
"log"
"strconv"
"github.com/Allenxuxu/gev"
"github.com/Allenxuxu/gev/connection"
)
type example struct{}
func (s *example) OnConnect(c *connection.Connection) {
log.Println(" OnConnect : ", c.PeerAddr())
}
func (s *example) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) {
log.Println("OnMessage:", data)
out = data
return
}
func (s *example) OnClose(c *connection.Connection) {
log.Println("OnClose")
}
func main() {
handler := new(example)
var port int
var loops int
flag.IntVar(&port, "port", 1833, "server port")
flag.IntVar(&loops, "loops", -1, "num loops")
flag.Parse()
s, err := gev.NewServer(handler,
gev.Address(":"+strconv.Itoa(port)),
gev.NumLoops(loops),
gev.Protocol(&ExampleProtocol{}))
if err != nil {
panic(err)
}
log.Println("server start")
s.Start()
}
複製代碼
當回調 OnMessage
函數的時候,會經過參數傳遞已經拆好包的用戶數據。
當咱們須要使用其餘協議時,僅僅須要實現一個 Protocol 插件,而後只要 gev.NewServer
時指定便可:
gev.NewServer(handler, gev.NumLoops(2), gev.Protocol(&XXXProtocol{}))
複製代碼
得益於 Protocol Plugins 模式的引進,我能夠將 WebSocket 的實現作成一個插件(WebSocket 協議構建在 TCP 之上),獨立於 gev 以外。
package websocket
import (
"log"
"github.com/Allenxuxu/gev/connection"
"github.com/Allenxuxu/gev/plugins/websocket/ws"
"github.com/Allenxuxu/ringbuffer"
)
// Protocol websocket
type Protocol struct {
upgrade *ws.Upgrader
}
// New 建立 websocket Protocol
func New(u *ws.Upgrader) *Protocol {
return &Protocol{upgrade: u}
}
// UnPacket 解析 websocket 協議,返回 header ,payload
func (p *Protocol) UnPacket(c *connection.Connection, buffer *ringbuffer.RingBuffer) (ctx interface{}, out []byte) {
upgraded := c.Context()
if upgraded == nil {
var err error
out, _, err = p.upgrade.Upgrade(buffer)
if err != nil {
log.Println("Websocket Upgrade :", err)
return
}
c.SetContext(true)
} else {
header, err := ws.VirtualReadHeader(buffer)
if err != nil {
log.Println(err)
return
}
if buffer.VirtualLength() >= int(header.Length) {
buffer.VirtualFlush()
payload := make([]byte, int(header.Length))
_, _ = buffer.Read(payload)
if header.Masked {
ws.Cipher(payload, header.Mask, 0)
}
ctx = &header
out = payload
} else {
buffer.VirtualRevert()
}
}
return
}
// Packet 直接返回
func (p *Protocol) Packet(c *connection.Connection, data []byte) []byte {
return data
}
複製代碼
具體的實現,能夠到倉庫的 plugins/websocket 查看。