從零打造一套移動IM系統(一) 玩轉二進制協議及protobuf

本文默認您已具有如下知識:

  • iOS開發的基礎知識,以及swift語法
  • node.js的基礎語法
  • TCP基礎及IM相關基礎知識

經過本文您將能收穫

  • 在iOS上用底層socket,服務器創建tcp鏈接並通信
  • 如何設計一個二進制通信協議
  • swift當中如何操做二進制網絡數據流,會涉及一些unsafe類型及C指針的操做
  • node.js中如何操做網絡數據流
  • protobuf 3.0在客戶端及服務器的實際運用,以及在兩個平臺中的編譯、序列化和反序列化
  • 心跳保活機制

1 基於socket的TCP通信

1.1 iOS端實現

ios端採用開源庫CocoaAsyncSocket,進行TCP通信。html

private let delegateQueue = DispatchQueue.global()
private lazy var socket :GCDAsyncSocket = {
    let socket = GCDAsyncSocket(delegate: self, delegateQueue: delegateQueue)
    return socket
}()
複製代碼

創建鏈接node

socket.delegate   = self
try socket.connect(toHost: host, onPort: port)
socket.readData(withTimeout:-1, tag: 0)
複製代碼

發送數據mysql

self.socket.write(data, withTimeout:5 * 60, tag: 0)
複製代碼

鏈接成功監聽ios

func socket(_ sock: GCDAsyncSocket, didConnectToHost host: String, port: UInt16) {
    print("socket \(sock) didConnectToHost \(host) port \(port)")
}
複製代碼

鏈接失敗監聽git

func socketDidDisconnect(_ sock: GCDAsyncSocket, withError err: Error?) {
    print("socketDidDisconnect \(sock) withError \(err)")
}
複製代碼

發送數據github

self.socket.write(data, withTimeout:5 * 60, tag: 0)
複製代碼

接收數據sql

func socket(_ sock: GCDAsyncSocket, didRead data: Data, withTag tag: Int) {
	let msgArr = SocketDataBuilder.shared().parse(data: data)
	for (seq,socketData) in msgArr {
	    switch (socketData){
	    case .request(let comom):
	        handle(common: comom, seq: seq);
	    case .ping:
	        handlePing(seq: seq);
	    case .message(let msg):
	        handle(message: msg, seq: seq);
	    case .notification(let noti):
	        handle(notification: noti, seq: seq);
	    }
	}
	sock.readData(withTimeout: -1, tag: 0)
}
複製代碼

值得注意的是,在接收到數據時,或者讀超時的時候須要從新調用readData(withTimeout:tag)方法 否則下個數據包到來時,不會再走這個方法。因爲咱們還有透傳體系,須要不間斷的監聽,因此timeout是-1無窮大數據庫

1.2 node.js 服務器實現

var HOST = '0.0.0.0';
var PORT = 6969;
var server = net.createServer();
server.listen(PORT, HOST);
server.on('connection', function(sock) {

    logger.info('CONNECTED: ' + sock.remoteAddress +':'+ sock.remotePort);
    
    // 接收數據
    sock.on('data', function(data) {
    	
    }
    
    // 斷開鏈接
    sock.on('close', function(data) {      
        logger.info('CLOSED: ' + sock.remoteAddress + ' ' + sock.remotePort);
    });
    
}
複製代碼

2 TCP部分的通信協議的二進制頭部設計

對於一個TCP數據包,它包含一個二進制頭部,和一個包體。包體是protubuf序列化後的數據流。包頭一共8個字節,從第一個字節開始依次有如下含義npm

  • margic_num : 1個字節 UInt8 魔法數字,一個特定的數字,服務器與各個終端統一。主要做用是解析時判斷包有沒有損壞。若是解析出的值與設定值不一樣,則說明包損壞或者在拆包或解析過程當中發生異常
  • sequence : 4個字節 UInt32 序列號,用於區分不一樣的包,客戶端維護,服務器根據不一樣的鏈接session+sequence區分不一樣的包
  • type : 1個字節 UInt8 包含內容的類型:1心跳包 2普通數據請求 3聊天消息 4推送 根據不一樣的類型路由到下級業務模塊 着四種類型基本包含了一個IM系統主要的業務模塊
  • length : 2個字節 UInt16 包體的長度 經過它獲取當前數據包的包體,以進行下一步解析

3 二進制頭部解析

3.1 iOS中二進制頭部處理

先定義一個數據結構來處理頭部信息swift

struct BaseHeader {
    private let margic_num : UInt8 = 0b10000001
    var seq   : UInt32
    var type  : UInt8
    var length : UInt16
}
複製代碼

3.1.1 swift的序列化方法

序列化方法

func toData()->Data{
    var marg = margic_num.bigEndian
    var seq  = self.seq.bigEndian
    var type =  self.type.bigEndian
    var length = self.length.bigEndian
    
    let mp = UnsafeBufferPointer(start: &marg, count: 1)
    let sp = UnsafeBufferPointer(start: &seq, count: 1)
    let tp = UnsafeBufferPointer(start: &type, count: 1)
    let lp = UnsafeBufferPointer(start: &length, count: 1)

    var data = Data(mp)
    data.append(sp)
    data.append(tp)
    data.append(lp)
    
    return data
}
複製代碼

代碼比較簡單,值得注意的是兩點

  • 基本數據類型轉化爲Data必須先轉化爲UnsafePointer,再轉化爲UnsafeBufferPointer,再轉化爲預期的Data數據。最後用data進行拼接
  • 再轉化爲UnsafePointer以前,必須作Big-Endian轉化,swift中對應bigEndian計算屬性。對於這個問題能夠參考這篇文章

若是你對位運算比較熟悉,也能夠採用下面這種方式。將原始數據轉化爲UInt8數組再進行拼接

var buf = [UInt8]()
append(margic_num, bufArr: &buf)
append(self.seq, bufArr: &buf)
append(self.type, bufArr: &buf)
append(self.length, bufArr: &buf)
let result = Data(buf)

func append<T:FixedWidthInteger>(_ value:T, bufArr:inout [UInt8]){
    let size = MemoryLayout<T>.size
    for i in 1...size {
        let distance = (size - i) * 8;
        let sub  = (value >> distance) & 0xff
        let value = UInt8(sub & 0xff)
        bufArr.append(value)
    }
}
複製代碼

3.1.1 swift的反序列化方法

對應的反序列化以下。

init?(data:Data){
    if data.count < header_length {
        return nil
    }
    var headerData  = Data(data)
    let tag : UInt8 = headerData[0..<1].withUnsafeBytes{ $0.pointee }
    if tag != margic_num {
        return nil
    }
    let seq : UInt32 = headerData[1..<5].withUnsafeBytes({$0.pointee })
    let typeValue : UInt8  =  headerData[5..<6].withUnsafeBytes({$0.pointee })
    let length : UInt16    =  headerData[6..<8].withUnsafeBytes({$0.pointee })
    
    self.seq  = seq.bigEndian
    self.type  = typeValue.bigEndian
    self.length = length.bigEndian
  
}
複製代碼

Data結構體提供了很方便的下標索引方法

public subscript(bounds: Range<Data.Index>) -> Data
複製代碼

獲得的新的Data與原來的數據共用一塊內存,只是改變指針的偏移。也就是說,相比原始數據,表明存儲結構的_backing:_DataStorage屬性指向的是同一個對象,只是 _sliceRange:Range<Data.Index>不一樣

public func withUnsafeBytes<ResultType, ContentType>(_ body: (UnsafePointer<ContentType>) throws -> ResultType) rethrows -> ResultType
複製代碼

利用這個帶範型的方法,能夠很容易,對data裏面數據進行處理,提取出所須要類型的數據

一樣的你也能夠在UInt8數組上作文章

var index  : Int  = 0
let margic : UInt8   = getValue(data: headerData, index: &index)
let seqv   : UInt32  = getValue(data: headerData, index: &index)
let typev  : UInt8   = getValue(data: headerData, index: &index)
let len    : UInt16  = getValue(data: headerData, index: &index)

func getValue<T:FixedWidthInteger>(data:Data,index:inout Int)->T{
    let size = MemoryLayout<T>.size
    var value:T = 0
    for i in index..<(index+size) {
        let distance = size - (i - index) - 1
        value  += T(data[i]) << distance
    }
    index += size
    return value
}
複製代碼

3.2 node.js中二進制頭部處理

下面是反序列化代碼,data是tcp接收到的數據

var header = data.slice(0,8)
var margic = header.readUInt8(0)
var seq    = header.readUInt32BE(1)
var type   = header.readUInt8(5)
var lenth  = header.readUInt16BE(6)
複製代碼

序列化方法以下,body爲須要發送的包體數據

var margic = 129;
var lenth  = body.length;
var header = new Buffer(8);
header.writeUInt8(margic);
header.writeUInt32BE(seq,1);
header.writeUInt8(type,5);
header.writeInt16BE(lenth,6);
複製代碼

node.js中,從socket中讀取或寫入的數據,都是Buffer。調用對應的read或write的方法,很容易從二進制讀取或填充所需數據類型的數據。值得注意的是,除了UInt8以外,其他方法都有BE後綴,這也和以前所說的Big-Endian有關

4 Protobuf的運用,及數據包體的解析

4.1 .proto文件的編寫

採用最新的protobuf3.0的語法,去除了required、optional關鍵字,枚舉類型統一從0開始。

根據從請求頭返回的type字段,除了心跳包包體爲空外,其餘類型包體分別解析爲響應的protobuf類型。

其中type=2,被解析爲Common類型,對應的是普通數據請求。實際上這部分業務應該做爲普通HTTP請求處理。這裏統一納入TCP通信自定義協議體系中。

syntax = "proto3";
import  "error.proto";

enum Common_method {
    common_method_user = 0;
    common_method_message = 1;
    common_method_friend   = 2;
    common_method_p2p_connect = 3;
    common_method_respond   = 4;
}

message Common {
    Common_method method = 1;
    bytes body = 2;
}

message CommonRespon {
    bool isSuc = 1;
    bytes respon = 2;
    ErrorMsg error  = 3;
}
複製代碼
syntax = "proto3";


enum error_type {
    comom_err  = 0;
    invalid_params = 2;
}

message ErrorMsg {
    error_type type = 1;
    string msg = 2;
}
複製代碼

Comon根據不一樣的type,他的body又能夠被解析爲對應的字類型數據,如signin_requestlogin_requestUser_info_request等等

syntax = "proto3"
import "base.proto";

enum User_cmd {
	User_cmd_sign_in = 0;
	User_cmd_login   = 2;
	User_cmd_logout  = 3;
	User_cmd_user_info = 4;
}

message User_msg {
	User_cmd cmd = 1;
	bytes body  = 2;
}

message signin_request {
	 string nick_name = 1;
	 string pwd = 2;
}

message login_request {
	string nick_name = 1; // 用戶名
	string pwd = 2;       // 密碼
	string ip = 3;        // 設備當前的ip
	int32  port = 4;      // 設備綁定的端口
	string device_name = 5; // iOS/Andoird
	string device_id = 6;   // 設備標識符
	string version  = 7;    // 軟件版本
}

message logout_request {
	 int32 uid = 1;
}

// 註冊成功 必須進行登陸 統一返回uid token
message sigin_response {
	uint32 uid   = 1;
	string token = 2;
}

message login_response {
	 uint32 uid   = 1;
	 string token = 2;
}

// 查詢用戶資料
message User_info_request {
	uint32 uid = 1; // 所要查詢用戶的uid
}

message User_info_response {
	User_info user_info = 1;
}
複製代碼

type = 3時,對應的是Base_msg類型,對應正兒八經的即時通信業務模塊

type=4時,Notification_msg類型,對應推送模塊,及服務器向客戶端發送的通知

因爲代碼量還算比較大,就不貼了。你們本身看源碼

4.2 iOS上protobuf的使用

4.2.1 準備工做

將protobuf-swift庫導入工程中,在Podfile中加上

pod 'ProtocolBuffers-Swift', '4.0.1'
複製代碼

電腦上安裝protobuf

brew install protobuf
複製代碼

cd到.proto文件目錄,編譯出swift平臺代碼

protoc *.proto --swift_out="./"
複製代碼

將獲得的*.pb.swift文件導入到項目工程當中

4.2.1 序列化方法

以登陸請求的包體構建爲例爲例子

let loginReq = LoginRequest().setPwd(pwd).setNickName(user)
let bodyData = try body.build().data()
let user  =  try UserMsg.Builder().setCmd(.userCmdLogin).setBody(bodyData).build().data()
let comom =  try Common.Builder().setMethod(.commonMethodUser).setBody(user).build()

let data = comom.data()
複製代碼
4.2.2 反序列化方法

4.2.1 示例代碼對應的反序列化,應該是這樣子的

do {
	let comon =  try Common.parseFrom(data:data)
	switch comon.type {
		case .commonMethodUser:
			let user  =  try UserMsg.parseFrom(data:comon.body)
			switch user.cmd {
				case .userCmdLogin:
					let login = try LoginRequest.parseFrom(data:user.body)
				...
			}
		...
	}
}catch let err {
	print(err)
}
複製代碼

4.2.3 完整數據包的構建及解析

不管序列化仍是反序列化,都要用到一箇中間橋架的結構體

enum RTPMessageGenerates {

    case ping
    case request(Common?)
    case message(Message?)
    case notification(NotificationMsg?)

    init?(type:UInt8,data:Data){
        switch type {
        case 1:
            self = .ping
        case 2:
            let comon =  try? Common.parseFrom(data:data)
            self = .request(comon)
        case 3:
            let msg = Message(data: data)
            self = .message(msg)
        case 4:
            let noti = try? NotificationMsg.parseFrom(data: data)
            self = .notification(noti)
        default:
            return nil
        }
    }

    var type : UInt8 {
        switch self {
        case .ping:
            return 1
        case .request(_):
            return 2
        case .message(_):
            return 3
        case .notification(_):
            return 4
        }
    }

    var data : Data? {
        switch self {
        case .ping:
            return Data()
        case .request(let req):
            return  req?.data()
        case .message(let msg):
            return  msg?.data
        case .notification(let noti):
            return noti?.data()
        }
    }

}
複製代碼

構建過程以下

func rtpData(seq:UInt32,body:RTPMessageGenerates)->Data?{
    guard let bodyData = body.data  else  { return nil }
    let header = BaseHeader(seq: seq, type: body.type, length: UInt16(bodyData.count)).toData()
    let data = header + bodyData
    return data
}
複製代碼

解析過程略微複雜點,須要進行拆包處理

func parse(data:Data)->[(seq:UInt32,body:RTPMessageGenerates)]{
    var curIndex : UInt16 = 0
    var temp = [(seq:UInt32,body:RTPMessageGenerates)]()
    while curIndex < data.count{
        if curIndex+header_length > data.count {
            break
        }
        let headData = data[curIndex..<curIndex+header_length]
        if let header = BaseHeader(data: headData) {
            let body = data[8..<8+header.length]
            if let msg = RTPMessageGenerates(type: header.type,data: body){
                temp.append((header.seq,msg))
            }
            curIndex += header.length + 8
        }else{
            break;
        }
    }
    return temp
}
複製代碼

4.3 node.js服務器protobuf的使用

4.3.1 準備工做

環境配置,包含數據庫及日誌庫環境

npm install log4js
npm install mysql
npm install google-protobuf
sudo npm install protobufjs
pm2 install pm2-intercom
複製代碼

編譯.proto文件

protoc --js_out=import_style=commonjs,binary:. *.proto
複製代碼

將*_pb.js文件導入項目工程當中

4.3.2 probubuf的解析

須要導入對應模塊文件

var builder = require("../impb/common_pb"),
    Common = builder.Common;
var MethodType = builder.Common_method;
複製代碼
try {
    var datas  = Uint8Array(body);
    var common = new Common.deserializeBinary(datas);
    var method = common.getMethod();
    var body   = common.getBody();
}catch (err){
    console.log(err);
}
複製代碼

須要留意如下幾點:

  • socket返回的數據都是Buffer類型的,而protobuf所生成的js文件,相應方法接收的是Uint8Array類型數據,須要作一下轉化
  • 訪問屬性變量時不能用點語法,要用對應的get、set方法
  • 某些字符作了相應轉化,轉化爲平臺的風格。_都被轉化爲駝峯命名法;枚舉類型全部字符都被轉化爲了大寫
4.3.3 protobuf的序列化
var comon = new Common();
comon.setMethod(MethodType.COMMON_METHOD_RESPOND);
comon.setBody(respond.serializeBinary());

var resData = comon.serializeBinary();
複製代碼

主要是serializeBinary()方法的使用。注意賦值的時候要用set方法。獲得的是Uint8Array,若是要進行下一步操做須要轉化爲Buffer類型

4.3.4 完整數據包的解析與構建

完整數據包解析

var tempData = new Buffer(data)
	while (tempData.length){
	    var header = data.slice(0,8)
	    var margic = header.readUInt8(0)
	    var seq    = header.readUInt32BE(1)
	    var type   = header.readUInt8(5)
	    var lenth  = header.readUInt16BE(6)
	    var body =   tempData.slice(8,lenth+8)
	    var lest = tempData.length - ( lenth + 8 )
	    logger.info("Receive data :" + "margic=" + margic + " seq=" + seq + " type=" + type + " legth=" + lenth )
	    var bodyData  = new  Uint8Array(body)
	    routeWithReiceData(type,header,bodyData)
	    if (lest.length > 0){
	        logger.info("Has one more data packetge");
	        tempData = data.slice(lenth+8,lest)
	    }else {
	        tempData = lest;
	        break
	    }
	}
}
複製代碼

數據包的構建

var margic = 129;
var lenth  = body.length;
var header = new Buffer(8);
header.writeUInt8(margic);
header.writeUInt32BE(seq,1);
header.writeUInt8(type,5);
header.writeInt16BE(lenth,6);
var buf = Buffer(body);
var result = Buffer.concat([header,buf])
複製代碼

5 心跳保活機制

因爲存在NAT超時,咱們必要在長時間沒有數據交互時,主動發送數據包,來維持TCP鏈接。根據一些博客資料,NAT的超時時間最低的在5分鐘左右。關於這些,能夠參考這篇文章

咱們設計的心跳間隔是3分鐘。心跳由客戶端控制,服務器只負責再收到心跳包以後原樣返回。小心跳包的響應超時的時候,或重試三次,三次都失敗證實與服務器鏈接中斷。主動斷開鏈接再嘗試從新鏈接。

心跳包大小是8個字節,即一個只有包頭,包體爲空的tcp數據包。

客戶端代碼以下

extension  SocketManager {
    private var  pingDuration : TimeInterval  {  return 60 * 3 }

    static var reTryCount = 0;
    private func sentPing(){
        sentPing { (isSuc) in
            if isSuc {
                SocketManager.reTryCount = 0;
            }else{
                if SocketManager.reTryCount < 3 {
                    self.sentPing()
                    SocketManager.reTryCount += 1
                }else{
                    // 三次失敗 鏈接已經斷開 斷開再重連
                    self.disconnect()
                    self.reconect(){_ in }
                }
            }
        }
    }
    
    private func sentPing(completion:@escaping (Bool)->()){
        self.sent(msg: .ping, completion: SocketManager.SentMsgCompletion.ping(completion))
    }
    
    func stopPing(){
        self.pingTimer?.invalidate()
        self.pingTimer  = nil;
    }
    
    func startPing(){
        sentPing()
        if pingTimer == nil {
            pingTimer  = Timer(timeInterval:pingDuration , repeats: true, block: {[weak self] (timer) in
                self?.sentPing()
            })
        }
    }
    
}
複製代碼

服務器代碼:

function routeWithReiceData(type,header,body) {
    switch (type){
        case 1:
            // 收到心跳包原樣返回 客戶端控制發送頻率 必要時斷開重連
            sock.write(data)
            break;
    }
 }
複製代碼

附上源碼項目地址:

客戶端代碼

服務器代碼

相關文章
相關標籤/搜索