go 客戶端服務端通訊

client.gogit

package main import ( "bufio"
    "encoding/json"
    "fmt"
    "hash/crc32"
    "math/rand"
    "net"
    "os"
    // "sync"
    "time" ) //數據包類型
const ( HEART_BEAT_PACKET = 0x00 REPORT_PACKET = 0x01 ) //默認的服務器地址
var ( server = "127.0.0.1:8080" ) //數據包
type Packet struct { PacketType byte PacketContent []byte } //心跳包
type HeartPacket struct { Version string `json:"version"` Timestamp int64 `json:"timestamp"` } //數據包
type ReportPacket struct { Content string `json:"content"` Rand int    `json:"rand"` Timestamp int64 `json:"timestamp"` } //註冊
type RegisterReq struct { PERAESKey string `json:"PERAESKey"` VIN string `json:"VIN"` T_Box_SN string `json:"T_Box_SN"` IMSI string `json:"IMSI"` rollNumber string `json:"rollNumber"` } //客戶端對象
type TcpClient struct { connection *net.TCPConn hawkServer *net.TCPAddr stopChan chan struct{} } func main() { //拿到服務器地址信息
    hawkServer, err := net.ResolveTCPAddr("tcp", server) if err != nil { fmt.Printf("hawk server [%s] resolve error: [%s]", server, err.Error()) os.Exit(1) } //鏈接服務器
    connection, err := net.DialTCP("tcp", nil, hawkServer) if err != nil { fmt.Printf("connect to hawk server error: [%s]", err.Error()) os.Exit(1) } client := &TcpClient{ connection: connection, hawkServer: hawkServer, stopChan: make(chan struct{}), } //啓動接收
 go client.receivePackets() //發送心跳的goroutine
    /*go func() { heartBeatTick := time.Tick(2 * time.Second) for { select { case <-heartBeatTick: client.sendHeartPacket() case <-client.stopChan: return } } }()*/

    //測試用的,開300個goroutine每秒發送一個包 // for i := 0; i < 1; i++ {
 go func() { sendTimer := time.After(5 * time.Second) for { select { case <-sendTimer: client.sendReportPacket() sendTimer = time.After(1 * time.Second) case <-client.stopChan: return } } }() // } //等待退出
    <-client.stopChan } // 接收數據包
func (client *TcpClient) receivePackets() { reader := bufio.NewReader(client.connection) for { //承接上面說的服務器端的偷懶,我這裏讀也只是以\n爲界限來讀區分包
        msg, err := reader.ReadString('\n') if err != nil { //在這裏也請處理若是服務器關閉時的異常
 close(client.stopChan) break } fmt.Print(msg) } } //發送數據包 //仔細看代碼其實這裏作了兩次json的序列化,有一次實際上是不須要的
func (client *TcpClient) sendReportPacket() { registPacket := RegisterReq{ PERAESKey: "123456", VIN: "abcdef", T_Box_SN: "abcdef123456", IMSI: "IMSI", rollNumber: getRandString(), /*Content: getRandString(), Timestamp: time.Now().Unix(), Rand: rand.Int(),*/ } fmt.Println("registPacket:", registPacket) packetBytes, err := json.Marshal(registPacket) //返回值是字節數組byte[],
    if err != nil { fmt.Println(err.Error()) } //這一次其實能夠不須要,在封包的地方把類型和數據傳進去便可
    /*packet := Packet{ PacketType: REPORT_PACKET, PacketContent: packetBytes, } sendBytes, err := json.Marshal(packet) if err != nil { fmt.Println(err.Error()) }*/
    //發送
 client.connection.Write(EnPackSendData(packetBytes)) // fmt.Println("EnPackSendData(packetBytes):%v", EnPackSendData(packetBytes)) // fmt.Println("Send metric data success!")
} //使用的協議與服務器端保持一致
func EnPackSendData(sendBytes []byte) []byte { packetLength := len(sendBytes) + 8 result := make([]byte, packetLength) result[0] = 0xFF result[1] = 0xFF result[2] = byte(uint16(len(sendBytes)) >> 8) //除以2的8次方,byte是0-255,
    result[3] = byte(uint16(len(sendBytes)) & 0xFF) copy(result[4:], sendBytes) sendCrc := crc32.ChecksumIEEE(sendBytes) result[packetLength-4] = byte(sendCrc >> 24) result[packetLength-3] = byte(sendCrc >> 16 & 0xFF) result[packetLength-2] = 0xFF result[packetLength-1] = 0xFE fmt.Println(result) return result } //發送心跳包,與發送數據包同樣
func (client *TcpClient) sendHeartPacket() { heartPacket := HeartPacket{ Version: "1.0", Timestamp: time.Now().Unix(), } packetBytes, err := json.Marshal(heartPacket) if err != nil { fmt.Println(err.Error()) } packet := Packet{ PacketType: HEART_BEAT_PACKET, PacketContent: packetBytes, } sendBytes, err := json.Marshal(packet) if err != nil { fmt.Println(err.Error()) } client.connection.Write(EnPackSendData(sendBytes)) fmt.Println("Send heartbeat data success!") } //拿一串隨機字符
func getRandString() string { // length := rand.Intn(10)
    strBytes := make([]byte, 10) for i := 0; i < 10; i++ { strBytes[i] = byte(rand.Intn(26) + 97) } return string(strBytes) } /*做者:getyouyou 連接:https://www.jianshu.com/p/dbc62a879081 來源:簡書 著做權歸做者全部。商業轉載請聯繫做者得到受權,非商業轉載請註明出處。*/

server.gogithub

package main import ( "bufio"
    "encoding/json"
    "fmt"
    "hash/crc32"
    "io"
    "net"
    "os" ) //數據包的類型
const ( HEART_BEAT_PACKET = 0x00 REPORT_PACKET = 0x01 ) var ( server = "127.0.0.1:8080" ) //這裏是包的結構體,實際上是能夠不須要的
type Packet struct { PacketType byte PacketContent []byte } //註冊
type RegisterReq struct { PERAESKey string `json:"PERAESKey"` VIN string `json:"VIN"` T_Box_SN string `json:"T_Box_SN"` IMSI string `json:"IMSI"` rollNumber string `json:"rollNumber"` } //心跳包,這裏用了json來序列化,也能夠用github上的gogo/protobuf包 //具體見(https://github.com/gogo/protobuf)
type HeartPacket struct { Version string `json:"version"` Timestamp int64 `json:"timestamp"` } //正式上傳的數據包
type ReportPacket struct { Content string `json:"content"` Rand int    `json:"rand"` Timestamp int64 `json:"timestamp"` } //與服務器相關的資源都放在這裏面
type TcpServer struct { listener *net.TCPListener hawkServer *net.TCPAddr } func main() { //相似於初始化套接字,綁定端口
    hawkServer, err := net.ResolveTCPAddr("tcp", server) checkErr(err) //偵聽
    listen, err := net.ListenTCP("tcp", hawkServer) checkErr(err) //記得關閉
 defer listen.Close() tcpServer := &TcpServer{ listener: listen, hawkServer: hawkServer, } fmt.Println("start server successful......") //開始接收請求
    for { conn, err := tcpServer.listener.Accept() fmt.Println("accept tcp client %s", conn.RemoteAddr().String()) checkErr(err) // 每次創建一個鏈接就放到單獨的協程內作處理
 go Handle(conn) } } //處理函數,這是一個狀態機 //根據數據包來作解析 //數據包的格式爲|0xFF|0xFF|len(高)|len(低)|Data|CRC高16位|0xFF|0xFE //其中len爲data的長度,實際長度爲len(高)*256+len(低) //CRC爲32位CRC,取了最高16位共2Bytes //0xFF|0xFF和0xFF|0xFE相似於前導碼
func Handle(conn net.Conn) { // close connection before exit
 defer conn.Close() // 狀態機狀態
    state := 0x00
    // 數據包長度
    length := uint16(0) // crc校驗和
    crc16 := uint16(0) var recvBuffer []byte
    // 遊標
    cursor := uint16(0) bufferReader := bufio.NewReader(conn) //狀態機處理數據
    for { recvByte, err := bufferReader.ReadByte() //recvByte是每次讀到的字節
        if err != nil { //這裏由於作了心跳,因此就沒有加deadline時間,若是客戶端斷開鏈接 //這裏ReadByte方法返回一個io.EOF的錯誤,具體可考慮文檔
            /*Handle方法在一個死循環中使用了一個無阻塞的buff來讀取套接字中的數據, 所以當客戶端主動關閉鏈接時,若是不對這個io.EOF進行處理,會致使這個goroutine空轉, 瘋狂吃cpu,在這裏io.EOF的處理很是重要:)*/
            if err == io.EOF { fmt.Printf("client %s is close!\n", conn.RemoteAddr().String()) } //在這裏直接退出goroutine,關閉由defer操做完成
            return } //進入狀態機,根據不一樣的狀態來處理
        switch state { case 0x00: if recvByte == 0xFF { state = 0x01
                //初始化狀態機
                recvBuffer = nil length = 0 crc16 = 0 } else { state = 0x00 } break
        case 0x01: if recvByte == 0xFF { state = 0x02 } else { state = 0x00 } break
        case 0x02: length += uint16(recvByte) * 256      //length此次是發送數據的長度
            fmt.Println("0x02,length:%d", length) //0
            state = 0x03
            break
        case 0x03: length += uint16(recvByte) fmt.Println("0x03,length:%d", length) //77 // 一次申請緩存,初始化遊標,準備讀數據
            recvBuffer = make([]byte, length) cursor = 0 state = 0x04
            break
        case 0x04: //不斷地在這個狀態下讀數據,直到知足長度爲止
            recvBuffer[cursor] = recvByte cursor++
            if cursor == length { state = 0x05 } break
        case 0x05: crc16 += uint16(recvByte) * 256 //crc32編碼
            state = 0x06
            break
        case 0x06: crc16 += uint16(recvByte) state = 0x07
            break
        case 0x07: if recvByte == 0xFF { state = 0x08 } else { state = 0x00 } case 0x08: if recvByte == 0xFE { //執行數據包校驗
                if (crc32.ChecksumIEEE(recvBuffer)>>16)&0xFFFF == uint32(crc16) { var packet RegisterReq //把拿到的數據反序列化出來
                    json.Unmarshal(recvBuffer, &packet) //新開協程處理數據
                    go processRecvData(&packet, conn) } else { fmt.Println("丟棄數據!") } } //狀態機歸位,接收下一個包
            state = 0x00 } } } //在這裏處理收到的包,就和通常的邏輯同樣了,根據類型進行不一樣的處理,因人而異 //我這裏處理了心跳和一個上報數據包 //服務器往客戶端的數據包很簡單地以\n換行結束了,偷了一個懶:),正常狀況下也可根據本身的協議來封裝好 //而後在客戶端寫一個狀態來處理
func processRecvData(packet *RegisterReq, conn net.Conn) { // switch packet.PacketType { // case HEART_BEAT_PACKET: // var beatPacket HeartPacket // json.Unmarshal(packet.PacketContent, &beatPacket)
    fmt.Printf("recieve heat beat from [%s] ,data is [%v]\n", conn.RemoteAddr().String(), packet) conn.Write([]byte("heartBeat\n")) return
    // case REPORT_PACKET: // var reportPacket ReportPacket // json.Unmarshal(packet.PacketContent, &reportPacket) // fmt.Printf("recieve report data from [%s] ,data is [%v]\n", conn.RemoteAddr().String(), reportPacket) // conn.Write([]byte("Report data has recive\n")) // return // }
} //處理錯誤,根據實際狀況選擇這樣處理,仍是在函數調以後不一樣的地方不一樣處理
func checkErr(err error) { if err != nil { fmt.Println(err) os.Exit(-1) } } /*做者:getyouyou 連接:https://www.jianshu.com/p/dbc62a879081 來源:簡書 著做權歸做者全部。商業轉載請聯繫做者得到受權,非商業轉載請註明出處。*/
相關文章
相關標籤/搜索