[ gev ] Go 語言優雅處理 TCP 「粘包」

github.com/Allenxuxu/g…html

gev 是一個輕量、快速的基於 Reactor 模式的非阻塞 TCP 網絡庫,支持自定義協議,輕鬆快速搭建高性能服務器。git

TCP 爲何會粘包

TCP 自己就是面向流的協議,就是一串沒有界限的數據。因此本質上來講 TCP 粘包是一個僞命題。github

TCP 底層並不關心上層業務數據,會套接字緩衝區的實際狀況進行包的劃分,一個完整的業務數據可能會被拆分紅屢次進行發送,也可能會將多個小的業務數據封裝成一個大的數據包發送(Nagle算法)。web

gev 如何優雅處理

gev 經過回調函數 OnMessage 通知用戶數據到來,回調函數中會將用戶數據緩衝區(ringbuffer)經過參數傳遞過來。算法

用戶經過對 ringbuffer 操做,來進行數據解包,獲取到完整用戶數據後再進行業務操做。這樣又一個明顯的缺點,就是會讓業務操做和自定義協議解析代碼堆在一塊兒。服務器

因此,最近對 gev 進行了一次較大改動,主要是爲了可以以插件的形式支持各類自定義的數據協議,讓使用者能夠便捷處理 TCP 粘包問題,專一於業務邏輯。websocket

protocol.png

作法以下,定義一個接口 Protocol網絡

// Protocol 自定義協議編解碼接口
type Protocol interface {
	UnPacket(c *Connection, buffer *ringbuffer.RingBuffer) (interface{}, []byte)
	Packet(c *Connection, data []byte) []byte
}
複製代碼

用戶只需實現這個接口,並註冊到 server 中,當客戶端數據到來時,gev 會首先調用 UnPacket 方法,若是緩衝區中的數據足夠組成一幀,則將數據解包,並返回真正的用戶數據,而後在回調 OnMessage 函數並將數據經過參數傳遞。併發

下面,咱們實現一個簡單的自定義協議插件,來啓動一個 Server :socket

| 數據長度 n |  payload |
|  4字節    |  n 字節   |
複製代碼
// protocol.go
package main

import (
	"encoding/binary"
	"github.com/Allenxuxu/gev/connection"
	"github.com/Allenxuxu/ringbuffer"
	"github.com/gobwas/pool/pbytes"
)

const exampleHeaderLen = 4

type ExampleProtocol struct{}

func (d *ExampleProtocol) UnPacket(c *connection.Connection, buffer *ringbuffer.RingBuffer) (interface{}, []byte) {
	if buffer.VirtualLength() > exampleHeaderLen {
		buf := pbytes.GetLen(exampleHeaderLen)
		defer pbytes.Put(buf)
		_, _ = buffer.VirtualRead(buf)
		dataLen := binary.BigEndian.Uint32(buf)

		if buffer.VirtualLength() >= int(dataLen) {
			ret := make([]byte, dataLen)
			_, _ = buffer.VirtualRead(ret)

			buffer.VirtualFlush()
			return nil, ret
		} else {
			buffer.VirtualRevert()
		}
	}
	return nil, nil
}

func (d *ExampleProtocol) Packet(c *connection.Connection, data []byte) []byte {
	dataLen := len(data)
	ret := make([]byte, exampleHeaderLen+dataLen)
	binary.BigEndian.PutUint32(ret, uint32(dataLen))
	copy(ret[4:], data)
	return ret
}
複製代碼
// server.go
package main

import (
	"flag"
	"log"
	"strconv"

	"github.com/Allenxuxu/gev"
	"github.com/Allenxuxu/gev/connection"
)

type example struct{}

func (s *example) OnConnect(c *connection.Connection) {
	log.Println(" OnConnect : ", c.PeerAddr())
}
func (s *example) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) {
	log.Println("OnMessage:", data)
	out = data
	return
}

func (s *example) OnClose(c *connection.Connection) {
	log.Println("OnClose")
}

func main() {
	handler := new(example)
	var port int
	var loops int

	flag.IntVar(&port, "port", 1833, "server port")
	flag.IntVar(&loops, "loops", -1, "num loops")
	flag.Parse()

	s, err := gev.NewServer(handler,
		gev.Address(":"+strconv.Itoa(port)),
		gev.NumLoops(loops),
		gev.Protocol(&ExampleProtocol{}))
	if err != nil {
		panic(err)
	}

	log.Println("server start")
	s.Start()
}
複製代碼

完整代碼地址

當回調 OnMessage 函數的時候,會經過參數傳遞已經拆好包的用戶數據。

當咱們須要使用其餘協議時,僅僅須要實現一個 Protocol 插件,而後只要 gev.NewServer 時指定便可:

gev.NewServer(handler, gev.NumLoops(2), gev.Protocol(&XXXProtocol{}))
複製代碼

基於 Protocol Plugins 模式爲 gev 實現 WebSocket 插件

得益於 Protocol Plugins 模式的引進,我能夠將 WebSocket 的實現作成一個插件(WebSocket 協議構建在 TCP 之上),獨立於 gev 以外。

package websocket

import (
	"log"

	"github.com/Allenxuxu/gev/connection"
	"github.com/Allenxuxu/gev/plugins/websocket/ws"
	"github.com/Allenxuxu/ringbuffer"
)

// Protocol websocket
type Protocol struct {
	upgrade *ws.Upgrader
}

// New 建立 websocket Protocol
func New(u *ws.Upgrader) *Protocol {
	return &Protocol{upgrade: u}
}

// UnPacket 解析 websocket 協議,返回 header ,payload
func (p *Protocol) UnPacket(c *connection.Connection, buffer *ringbuffer.RingBuffer) (ctx interface{}, out []byte) {
	upgraded := c.Context()
	if upgraded == nil {
		var err error
		out, _, err = p.upgrade.Upgrade(buffer)
		if err != nil {
			log.Println("Websocket Upgrade :", err)
			return
		}
		c.SetContext(true)
	} else {
		header, err := ws.VirtualReadHeader(buffer)
		if err != nil {
			log.Println(err)
			return
		}
		if buffer.VirtualLength() >= int(header.Length) {
			buffer.VirtualFlush()

			payload := make([]byte, int(header.Length))
			_, _ = buffer.Read(payload)

			if header.Masked {
				ws.Cipher(payload, header.Mask, 0)
			}

			ctx = &header
			out = payload
		} else {
			buffer.VirtualRevert()
		}
	}
	return
}

// Packet 直接返回
func (p *Protocol) Packet(c *connection.Connection, data []byte) []byte {
	return data
}
複製代碼

具體的實現,能夠到倉庫的 plugins/websocket 查看。

相關文章

項目地址

github.com/Allenxuxu/g…

相關文章
相關標籤/搜索