一個TCP長鏈接設備管理後臺工程(五)

前篇

一個TCP長鏈接設備管理後臺工程(一)
一個TCP長鏈接設備管理後臺工程(二)
一個TCP長鏈接設備管理後臺工程(三)
一個TCP長鏈接設備管理後臺工程(四)git

Github倉庫地址github

幀過濾器

幀過濾器的做用就是,從接收到的buff中,過濾出有效的完整jtt808數據包。因爲是tcp通信,那麼這其中不可避免的會涉及到數據包的兩個常規處理:拆包和粘包。segmentfault

拆包和粘包的簡要說明:api

假設客戶端分別發送了兩個數據包D1和D2給服務端,因爲服務端一次讀取到的字節數是不肯定的,故可能存在如下4種狀況。

(1)服務端分兩次讀取到了兩個獨立的數據包,分別是D1和D2,沒有粘包和拆包;

(2)服務端一次接收到了兩個數據包,D1和D2粘合在一塊兒,被稱爲TCP粘包;

(3)服務端分兩次讀取到了兩個數據包,第一次讀取到了完整的D1包和D2包的部份內容,第二次讀取到了D2包的剩餘內容,這被稱爲TCP拆包;

(4)服務端分兩次讀取到了兩個數據包,第一次讀取到了D1包的部份內容D1_1,第二次讀取到了D1包的剩餘內容D1_2和D2包的整包。

若是此時服務端TCP接收滑窗很是小,而數據包D1和D2比較大,頗有可能會發生第五種可能,即服務端分屢次才能將D1和D2包接收徹底,期間發生屢次拆包。

拆包與粘包的說明網上資料不少,此處不作過多說明。可是咱們設計出來的過濾器,要可以正常應對拆包和粘包的狀況。app

首先,咱們根據jtt808協議定義咱們的數據包結構:tcp

type MultiField struct {
    MsgSum   uint16
    MsgIndex uint16
}

type Header struct {
    MID       uint16
    Attr      uint16
    Version   uint8
    PhoneNum  string
    SeqNum    uint16
    MutilFlag MultiField
}

type Message struct {
    HEADER Header
    BODY   []byte
}

因爲Attr實際上是由多個位域字段組成,因此咱們再定義三個函數:函數

func (h *Header) IsMulti() bool {
    if ((h.Attr >> 12) & 0x0001) > 0 {
        return true
    }
    return false
}

//BodyLen is a function for get body len
func (h *Header) BodyLen() int {
    return int(h.Attr & 0x03ff)
}

//MakeAttr is generate attr
func MakeAttr(verFlag byte, mut bool, enc byte, lens uint16) uint16 {
    attr := lens & 0x03FF

    if verFlag > 0 {
        attr = attr & 0x4000
    }

    if mut {
        attr = attr & 0x2000
    }

    encMask := (uint16(enc) & 0x0007) << 10
    return attr + encMask
}

因爲要考慮拆包和粘包問題,因此咱們的過濾器須要可以同時分析多包數據,可是基本單元函數分析一幀數據,因此咱們先實現一幀數據的過濾器:filterSigleui

咱們想要的過濾器原型是以下的一個函數:設計

func filterSigle(data []byte) (Message, int, error)

該函數接收一個存有從tcp端接收的數據流切片,須要返回咱們解析出來的Message、解析事後消耗的字節數,錯誤信息。code

很明顯,這個返回的消耗字節數就是爲了應對拆包和粘包用的。

咱們知道jtt808協議是以0x7e開始和結尾的,咱們定義一個常量:

const (
    ProtoHeader byte = 0x7e
)

filterSigle的第一個邏輯就是要識別幀頭和幀尾了:

var usedLen int = 0

startindex := bytes.IndexByte(data, ProtoHeader)
if startindex >= 0 {
    usedLen = startindex + 1
    endindex := bytes.IndexByte(data[usedLen:], ProtoHeader)
    if endindex >= 0 {
        endindex = endindex + usedLen
    }
}

此處的幀頭和幀尾索引均相對於data的起始字節而言。理想的狀況下,在startindex和endindex之間的數據就是咱們須要解析的數據,因此以後的解析都是對這部分數據進行分析。咱們對這部分的邏輯單獨用一個函數來處理,這個函數主要完成三個邏輯:轉義、校驗檢查和解析

func frameParser(data []byte) (Message, error) {

}

data入參從startindex到endindex,不包括startindex和endindex。

在轉義以前,首先判斷基本的長度。經過對幀頭固定字段分析能夠知道,消息頭部分的長度爲17或者19個字節,加上幀頭幀尾校驗位的話最小就是17+3=20個字節,鑑於BODY部分可能爲空,因此整個幀最小長度應該爲20字節:

if len(data)+2 < 17+3 {
    return Message{}, fmt.Errorf("header is too short")
}

轉義比較簡單,爲了代碼複用,定義成一個函數:

func Escape(data, oldBytes, newBytes []byte) []byte {
    buff := make([]byte, 0)

    var startindex int = 0

    for startindex < len(data) {
        index := bytes.Index(data[startindex:], oldBytes)
        if index >= 0 {
            buff = append(buff, data[startindex:index]...)
            buff = append(buff, newBytes...)
            startindex = index + len(oldBytes)
        } else {
            buff = append(buff, data[startindex:]...)
            startindex = len(data)
        }
    }
    return buff
}

調用:

//不包含幀頭幀尾
frameData := Escape(data[:len(data)], []byte{0x7d, 0x02}, []byte{0x7e})
frameData = Escape(frameData, []byte{0x7d, 0x01}, []byte{0x7d})

校驗就是簡單的異或校驗,從消息頭到消息體結束,即data[:len(data)-1]

func checkSum(data []byte) byte {
    var sum byte = 0
    for _, itemdata := range data {
        sum ^= itemdata
    }
    return sum
}

調用:

rawcs := checkSum(frameData[:len(frameData)-1])

if rawcs != frameData[len(frameData)-1] {
    return Message{}, fmt.Errorf("cs is not match:%d--%d", rawcs, frameData[len(frameData)-1])
}

而後就是對frameData中的具體數據進行解析了:

var usedLen int = 0
var msg Message
msg.HEADER.MID = codec.Bytes2Word(frameData[usedLen:])
usedLen = usedLen + 2
msg.HEADER.Attr = codec.Bytes2Word(frameData[usedLen:])
usedLen = usedLen + 2
msg.HEADER.Version = frameData[usedLen]
usedLen = usedLen + 1

注意usedLen要跟着實時變化。

手機號固定爲10個字節,不足的話前面會填充0,因此咱們要把前面無效的0去掉,使用bytes.TrimLeftFunc:

tempPhone := bytes.TrimLeftFunc(frameData[usedLen:usedLen+10], func(r rune) bool { return r == 0x00 })
msg.HEADER.PhoneNum = string(tempPhone)
usedLen = usedLen + 10
msg.HEADER.SeqNum = codec.Bytes2Word(frameData[usedLen:])
usedLen = usedLen + 2

同時還要對多幀的狀況進行判斷:

if msg.HEADER.IsMulti() {
    msg.HEADER.MutilFlag.MsgSum = codec.Bytes2Word(frameData[usedLen:])
    usedLen = usedLen + 2
    msg.HEADER.MutilFlag.MsgIndex = codec.Bytes2Word(frameData[usedLen:])
    usedLen = usedLen + 2
}

再次對usedLen長度判斷一下,避免超過界限:

if len(frameData) < usedLen {
    return Message{}, fmt.Errorf("flag code is too short")
}

處理到上面的地方後,接着的就是BODY部分了,直接copy對應的長度,長度爲:

len(frameData)-usedLen

邏輯以下

msg.BODY = make([]byte, len(frameData)-usedLen)
copy(msg.BODY, frameData[usedLen:len(frameData)])
usedLen = len(frameData)

return msg, nil

到此正常的流程就走完了。

調用:

msg, err := frameParser(frameData)

當返回錯誤時,返回的長度值應該爲endindex,即不包括endindex處對應的0x7e。由於這個0x7e多是後面數據的幀頭。

msg, err := frameParser(data[startindex+1 : endindex])
if err != nil {
    return Message{}, endindex, err
}

return msg, endindex + 1, nil

對於

if endindex >= 0

條件不符合的,說明沒有找到幀尾,能夠包幀頭前面的去掉了,可是幀頭和幀頭後面的數據要保留,用來跟以後的數據流拼接。

return Message{}, startindex, fmt.Errorf("can't find end flag")

對於

if startindex >= 0

條件不符合的,說明沒有找到幀頭,那就是整個幀都是無效的:

return Message{}, len(data), fmt.Errorf("can't find start flag")

這樣就實現了一個單幀的過濾器。接着咱們在單幀過濾器的基礎上來實現多幀過濾器。

咱們只須要對數據流進行單幀過濾,而後返回消耗的字節數。若是消耗了必定字節數後,還有剩餘的字節,咱們再對這些字節進行單幀過濾。依次循環,直到字節數消耗完或者發生錯誤。

全部循環結束後,咱們還須要將剩餘的字節數保留,用來跟下一次的數據流進行拼接。函數實現以下:

//Filter is proto Filter api
func Filter(data []byte) ([]Message, int, error) {
    var usedLen int = 0
    msgList := make([]Message, 0)
    var cnt int = 0
    for {
        cnt++
        if cnt > 10 {
            return []Message{}, 0, fmt.Errorf("time too much")
        }
        if usedLen >= len(data) {
            break
        }

        msg, lens, err := filterSigle(data[usedLen:])
        if err != nil {
            usedLen = usedLen + lens
            fmt.Println("err:", err)
            return msgList, usedLen, nil
        }
        usedLen = usedLen + lens
        msgList = append(msgList, msg)
    }
    return msgList, usedLen, nil
}

整個過濾器完整實現:

package proto

import (
    "bytes"
    "fmt"
    "tsp/codec"
    "tsp/utils"
)

const (
    ProtoHeader byte = 0x7e
)

type MultiField struct {
    MsgSum   uint16
    MsgIndex uint16
}

type Header struct {
    MID       uint16
    Attr      uint16
    Version   uint8
    PhoneNum  string
    SeqNum    uint16
    MutilFlag MultiField
}

func (h *Header) IsMulti() bool {
    if ((h.Attr >> 12) & 0x0001) > 0 {
        return true
    }
    return false
}

//BodyLen is a function for get body len
func (h *Header) BodyLen() int {
    return int(h.Attr & 0x03ff)
}

//MakeAttr is generate attr
func MakeAttr(verFlag byte, mut bool, enc byte, lens uint16) uint16 {
    attr := lens & 0x03FF

    if verFlag > 0 {
        attr = attr & 0x4000
    }

    if mut {
        attr = attr & 0x2000
    }

    encMask := (uint16(enc) & 0x0007) << 10
    return attr + encMask
}

//Message is struct for message for jtt808
type Message struct {
    HEADER Header
    BODY   []byte
}

func Version() string {
    return "1.0.0"
}

func Name() string {
    return "jtt808"
}

//Filter is proto Filter api
func Filter(data []byte) ([]Message, int, error) {
    var usedLen int = 0
    msgList := make([]Message, 0)
    var cnt int = 0
    for {
        //添加一個計數器,防止數據異常致使死循環
        cnt++
        if cnt > 10 {
            cnt = 0
            return []Message{}, 0, fmt.Errorf("time too much")
        }
        if usedLen >= len(data) {
            break
        }

        msg, lens, err := filterSigle(data[usedLen:])
        if err != nil {
            usedLen = usedLen + lens
            fmt.Println("err:", err)
            return msgList, usedLen, nil
        }
        usedLen = usedLen + lens
        msgList = append(msgList, msg)
    }
    return msgList, usedLen, nil
}

func filterSigle(data []byte) (Message, int, error) {
    var usedLen int = 0

    startindex := bytes.IndexByte(data, ProtoHeader)
    if startindex >= 0 {
        usedLen = startindex + 1
        endindex := bytes.IndexByte(data[usedLen:], ProtoHeader)
        if endindex >= 0 {
            endindex = endindex + usedLen

            msg, err := frameParser(data[startindex+1 : endindex])
            if err != nil {
                return Message{}, endindex, err
            }

            return msg, endindex + 1, nil
        }

        return Message{}, startindex, fmt.Errorf("can't find end flag")
    }
    return Message{}, len(data), fmt.Errorf("can't find start flag")
}

func Escape(data, oldBytes, newBytes []byte) []byte {
    buff := make([]byte, 0)

    var startindex int = 0

    for startindex < len(data) {
        index := bytes.Index(data[startindex:], oldBytes)
        if index >= 0 {
            buff = append(buff, data[startindex:index]...)
            buff = append(buff, newBytes...)
            startindex = index + len(oldBytes)
        } else {
            buff = append(buff, data[startindex:]...)
            startindex = len(data)
        }
    }
    return buff
}

func frameParser(data []byte) (Message, error) {
    if len(data)+2 < 17+3 {
        return Message{}, fmt.Errorf("header is too short")
    }

    //不包含幀頭幀尾
    frameData := Escape(data[:len(data)], []byte{0x7d, 0x02}, []byte{0x7e})
    frameData = Escape(frameData, []byte{0x7d, 0x01}, []byte{0x7d})

    //以後的操做都是基於frameData來處理
    rawcs := checkSum(frameData[:len(frameData)-1])

    if rawcs != frameData[len(frameData)-1] {
        return Message{}, fmt.Errorf("cs is not match:%d--%d", rawcs, frameData[len(frameData)-1])
    }

    var usedLen int = 0
    var msg Message
    msg.HEADER.MID = codec.Bytes2Word(frameData[usedLen:])
    usedLen = usedLen + 2
    msg.HEADER.Attr = codec.Bytes2Word(frameData[usedLen:])
    usedLen = usedLen + 2
    msg.HEADER.Version = frameData[usedLen]
    usedLen = usedLen + 1

    tempPhone := bytes.TrimLeftFunc(frameData[usedLen:usedLen+10], func(r rune) bool { return r == 0x00 })
    msg.HEADER.PhoneNum = string(tempPhone)
    usedLen = usedLen + 10
    msg.HEADER.SeqNum = codec.Bytes2Word(frameData[usedLen:])
    usedLen = usedLen + 2

    if msg.HEADER.IsMulti() {
        msg.HEADER.MutilFlag.MsgSum = codec.Bytes2Word(frameData[usedLen:])
        usedLen = usedLen + 2
        msg.HEADER.MutilFlag.MsgIndex = codec.Bytes2Word(frameData[usedLen:])
        usedLen = usedLen + 2
    }

    if len(frameData) < usedLen {
        return Message{}, fmt.Errorf("flag code is too short")
    }

    msg.BODY = make([]byte, len(frameData)-usedLen)
    copy(msg.BODY, frameData[usedLen:len(frameData)])
    usedLen = len(frameData)

    return msg, nil
}
相關文章
相關標籤/搜索