Protobuf應用普遍,尤爲做爲網絡通信協議最爲廣泛。本文將詳細描述幾個讓人眼前一亮的protobuf協議設計,對準備應用或已經應用protobuf的開發者會有所啓發,甚至能夠直接拿過去用。 這裏描述的協議設計被用於生產環境的即時通信、埋點數據採集、消息推送、redis和mysql數據代理。mysql
Bwar從2013年開始應用protobuf,2014年設計了用於mysql數據代理的protobuf協議,2015年設計了用於即時通信的protobuf協議。高性能C++ IoC網絡框架Nebula https://github.com/Bwar/Nebula把這幾個protobuf協議設計應用到了極致。nginx
本協議設計於2015年,用於一個生產環境的IM和埋點數據採集及實時分析,2016年又延伸發展了基於protobuf3的版本並用於開源網絡框架Nebula。基於protobuf2和protobuf3的有較少差異,這裏分開講解兩個版本的協議設計。git
2015年尚無protobuf3的release版本,protobuf2版本的fixed32類型是固定佔用4個字節的,很是適合用於網絡通信協議設計。Bwar設計用於IM系統的協議包括兩個protobuf message:MsgHead和MsgBody,協議定義以下:github
syntax = "proto2";
/** * @brief 消息頭 */
message MsgHead
{
required fixed32 cmd = 1 ; ///< 命令字(壓縮加密算法佔高位1字節)
required fixed32 msgbody_len = 2; ///< 消息體長度(單個消息體長度不能超過65535即8KB)
required fixed32 seq = 3; ///< 序列號
}
/** * @brief 消息體 * @note 消息體主體是body,全部業務邏輯內容均放在body裏。session_id和session用於接入層路由, * 二者只須要填充一個便可,首選session_id,當session_id用整型沒法表達時才使用session。 */
message MsgBody
{
required bytes body = 1; ///< 消息體主體
optional uint32 session_id = 2; ///< 會話ID(單聊消息爲接收者uid,我的信息修改成uid,羣聊消息爲groupid,羣管理爲groupid)
optional string session = 3; ///< 會話ID(當session_id用整型沒法表達時使用)
optional bytes additional = 4; ///< 接入層附加的數據(客戶端無須理會)
}
解析收到的字節流時先解固定長度(15字節)的MsgHead(protobuf3.0以後的版本必須在cmd、msgbody_len、seq均不爲0的狀況下才是15字節),再經過MsgHead裏的msgbody_len判斷消息體是否接收完畢,若接收完畢則調用MsgBody.Parse()解析。MsgBody裏的設計在下一節詳細說明。web
MsgHead在實際的項目應用中對應下面的消息頭並能夠相互轉換:redis
#pragma pack(1)
/** * @brief 與客戶端通訊消息頭 */
struct tagClientMsgHead {
unsigned char version; ///< 協議版本號(1字節)
unsigned char encript; ///< 壓縮加密算法(1字節)
unsigned short cmd; ///< 命令字/功能號(2字節)
unsigned short checksum; ///< 校驗碼(2字節)
unsigned int body_len; ///< 消息體長度(4字節)
unsigned int seq; ///< 序列號(4字節)
};
#pragma pack()
轉換代碼以下:算法
E_CODEC_STATUS ClientMsgCodec::Encode(const MsgHead& oMsgHead, const MsgBody& oMsgBody, loss::CBuffer* pBuff)
{
tagClientMsgHead stClientMsgHead;
stClientMsgHead.version = 1; // version暫時無用
stClientMsgHead.encript = (unsigned char)(oMsgHead.cmd() >> 24);
stClientMsgHead.cmd = htons((unsigned short)(gc_uiCmdBit & oMsgHead.cmd()));
stClientMsgHead.body_len = htonl((unsigned int)oMsgHead.msgbody_len());
stClientMsgHead.seq = htonl(oMsgHead.seq());
stClientMsgHead.checksum = htons((unsigned short)stClientMsgHead.checksum);
...
}
E_CODEC_STATUS ClientMsgCodec::Decode(loss::CBuffer* pBuff, MsgHead& oMsgHead, MsgBody& oMsgBody)
{
LOG4_TRACE("%s() pBuff->ReadableBytes() = %u", __FUNCTION__, pBuff->ReadableBytes());
size_t uiHeadSize = sizeof(tagClientMsgHead);
if (pBuff->ReadableBytes() >= uiHeadSize)
{
tagClientMsgHead stClientMsgHead;
int iReadIdx = pBuff->GetReadIndex();
pBuff->Read(&stClientMsgHead, uiHeadSize);
stClientMsgHead.cmd = ntohs(stClientMsgHead.cmd);
stClientMsgHead.body_len = ntohl(stClientMsgHead.body_len);
stClientMsgHead.seq = ntohl(stClientMsgHead.seq);
stClientMsgHead.checksum = ntohs(stClientMsgHead.checksum);
LOG4_TRACE("cmd %u, seq %u, len %u, pBuff->ReadableBytes() %u",
stClientMsgHead.cmd, stClientMsgHead.seq, stClientMsgHead.body_len,
pBuff->ReadableBytes());
oMsgHead.set_cmd(((unsigned int)stClientMsgHead.encript << 24) | stClientMsgHead.cmd);
oMsgHead.set_msgbody_len(stClientMsgHead.body_len);
oMsgHead.set_seq(stClientMsgHead.seq);
...
}
}
<br/>sql
protobuf3版的MsgHead和MsgBody從IM業務應用實踐中發展而來,同時知足了埋點數據採集、實時計算、消息推送等業務須要,更爲通用。正因其通用性和高擴展性,採用proactor模型的IoC網絡框架Nebula纔會選用這個協議,經過這個協議,框架層將網絡通訊工做從業務應用中徹底獨立出來,基於Nebula框架的應用開發者甚至能夠不懂網絡編程也能開發出高併發的分佈式服務。數據庫
MsgHead和MsgBody的protobuf定義以下:apache
syntax = "proto3";
// import "google/protobuf/any.proto";
/** * @brief 消息頭 * @note MsgHead爲固定15字節的頭部,當MsgHead不等於15字節時,消息發送將出錯。 * 在proto2版本,MsgHead爲15字節老是成立,cmd、seq、len都是required; * 但proto3版本,MsgHead爲15字節則必需要求cmd、seq、len均不等於0,不然沒法正確進行收發編解碼。 */
message MsgHead
{
fixed32 cmd = 1; ///< 命令字(壓縮加密算法佔高位1字節)
fixed32 seq = 2; ///< 序列號
sfixed32 len = 3; ///< 消息體長度
}
/** * @brief 消息體 * @note 消息體主體是data,全部業務邏輯內容均放在data裏。req_target是請求目標,用於 * 服務端接入路由,請求包必須填充。rsp_result是響應結果,響應包必須填充。 */
message MsgBody
{
oneof msg_type
{
Request req_target = 1; ///< 請求目標(請求包必須填充)
Response rsp_result = 2; ///< 響應結果(響應包必須填充)
}
bytes data = 3; ///< 消息體主體
bytes add_on = 4; ///< 服務端接入層附加在請求包的數據(客戶端無須理會)
string trace_id = 5; ///< for log trace
message Request
{
uint32 route_id = 1; ///< 路由ID
string route = 2; ///< 路由ID(當route_id用整型沒法表達時使用)
}
message Response
{
int32 code = 1; ///< 錯誤碼
bytes msg = 2; ///< 錯誤信息
}
}
MsgBody的data字段存儲消息主體,任何自定義數據都可以二進制數據流方式寫入到data。
msg_type用於標識該消息是請求仍是響應(全部網絡數據流均可歸爲請求或響應),若是是請求,則能夠選擇性填充Request裏的route_id或route,若是填充了,則框架層無須解析應用層協議(也沒法解析)就能自動根據路由ID轉發,而無須應用層解開data裏的內容再根據自定義邏輯轉發。若是是響應,則定義了統一的錯誤標準,也爲業務無關的錯誤處理提供方便。
add_on是附在長鏈接上的業務數據,框架並不會解析但會在每次轉發消息時帶上,能夠爲應用提供極其方便且強大的功能。好比,IM用戶登陸時客戶端只發送用戶ID和密碼到服務端,服務端在登陸校驗經過後,將該用戶的暱稱、頭像等信息經過框架提供的方法SetClientData()將數據附在服務端接入層該用戶對應的長鏈接Channel上,以後全部從該鏈接過來的請求都會由框架層自動填充add_on字段,服務端的其餘邏輯服務器只從data中獲得自定義業務邏輯(好比聊天消息)數據,卻能夠從add_on中獲得這個發送用戶的信息。add_on的設計簡化了應用開發邏輯,並下降了客戶端與服務端傳輸的數據量。
trace_id用於分佈式日誌跟蹤。分佈式服務的錯誤定位是至關麻煩的,Nebula分佈式服務解決方案提供了日誌跟蹤功能,協議裏的trace_id字段的設計使得Nebula框架能夠在徹底不增長應用開發者額外工做的狀況下(正常調用LOG4_INFO寫日誌而無須額外工做)實現全部標記着同一trace_id的日誌發送到指定一臺日誌服務器,定義錯誤時跟單體服務那樣登陸一臺服務器查看日誌便可。好比,IM用戶發送一條消息失敗,在用戶發送的消息到達服務端接入層時就被打上了trace_id標記,這個id會一直傳遞到邏輯層、存儲層等,哪一個環節發生了錯誤均可以從消息的發送、轉發、處理路徑上查到。
MsgHead和MsgBody的編解碼實現見Nebula框架的https://github.com/Bwar/Nebula/blob/master/src/codec/CodecProto.cpp。
上面的講解的是protobuf應用於TCP數據流通訊,接下來將描述protobuf在http通訊上的應用。
在Web服務中一般會用Nginx作接入層的反向代理,通過Nginx轉發到後續業務邏輯層的tomcat、apache或nginx上,接入層和業務邏輯層至少作了兩次http協議解析,http協議是文本協議,傳輸數據量大解析速度慢。Nebula框架不是一個web服務器,但支持http協議,在只需提供http接口的應用場景(好比徹底先後端分離的後端)基於Nebula的單進程http服務端併發量就能夠是tomcat的數十倍。這必定程度上得益於Nebula框架在http通訊上protobuf的應用。Nebula框架解析http文本協議並轉化爲HttpMsg在服務內部處理,應用開發者填充HttpMsg,接入層將響應的HttpMsg轉換成http文本協議發回給請求方,無論服務端內部通過多少次中轉,始終只有一次http協議的decode和一次http協議的encode。
syntax = "proto3";
message HttpMsg
{
int32 type = 1; ///< http_parser_type 請求或響應
int32 http_major = 2; ///< http大版本號
int32 http_minor = 3; ///< http小版本號
int32 content_length = 4; ///< 內容長度
int32 method = 5; ///< 請求方法
int32 status_code = 6; ///< 響應狀態碼
int32 encoding = 7; ///< 傳輸編碼(只在encode時使用,當 Transfer-Encoding: chunked 時,用於標識chunk序號,0表示第一個chunk,依次遞增)
string url = 8; ///< 地址
map<string, string> headers = 9; ///< http頭域
bytes body = 10; ///< 消息體(當 Transfer-Encoding: chunked 時,只存儲一個chunk)
map<string, string> params = 11; ///< GET方法參數,POST方法表單提交的參數
Upgrade upgrade = 12; ///< 升級協議
float keep_alive = 13; ///< keep alive time
string path = 14; ///< Http Decode時從url中解析出來,不須要人爲填充(encode時不須要填)
bool is_decoding = 15; ///< 是否正在解碼(true 正在解碼, false 未解碼或已完成解碼)
message Upgrade
{
bool is_upgrade = 1;
string protocol = 2;
}
}
HttpMsg的編解碼實現見Nebula框架的https://github.com/Bwar/Nebula/blob/master/src/codec/CodecHttp.cpp。
若是上面描述的protobuf在網絡通訊上應用算不錯的話,那如下將protobuf用於數據代理上的協議設計則絕對是讓人眼前一亮。
有的公司規定web服務不得直接訪問MySQL數據庫,甚至不容許在web邏輯層拼接SQL語句。若是有這種出於安全性考慮而作的限制,在web邏輯層後面再增長一層業務邏輯層成本未免過高了,那麼解決辦法應該是增長一層業務邏輯無關的代理服務層。這個代理服務層不是簡單的轉發SQL語句這麼簡單,由於web邏輯層可能不容許拼接SQL,由此引出咱們這個用於數據庫代理的protobuf協議設計。這個協議是將SQL邏輯融入整個協議之中,數據庫代理層接收並解析這個協議後生成SQL語句或用binding方式到數據庫去執行。數據庫代理層只有協議解析和轉化邏輯,無其餘任何業務邏輯,業務邏輯還在web邏輯層,區別只在於從拼接SQL變成了填充protobuf協議。
syntax = "proto2";
package dbagent;
/** * @brief DB Agent消息 */
message DbAgentMsg
{
enum E_TYPE
{
UNDEFINE = 0; ///< 未定義
REQ_CONNECT = 1; ///< 鏈接DB請求
RSP_CONNECT = 2; ///< 鏈接DB響應
REQ_QUERY = 3; ///< 執行SQL請求
RSP_QUERY = 4; ///< 執行SQL響應
REQ_DISCONNECT = 5; ///< 關閉鏈接請求
RSP_DISCONNECT = 6; ///< 關閉鏈接響應
RSP_RECORD = 7; ///< 結果集記錄
RSP_COMMON = 8; ///< 通用響應(當請求不能被Server所認知時會作出這個迴應)
REQ_GET_CONNECT = 9; ///< 獲取鏈接請求
RSP_GET_CONNECT = 10; ///< 獲取鏈接響應
}
required E_TYPE type = 1; ///< 消息/操做 類型
optional RequestConnect req_connect = 2; ///< 鏈接請求
optional ResponseConnect rsp_connect = 3; ///< 鏈接響應
optional RequestDisconnect req_disconnect = 4; ///< 關閉請求
optional ResponseDisconnect rsp_disconnect = 5; ///< 關閉響應
optional RequestQuery req_query = 6; ///< 執行SQL請求
optional ResponseQuery rsp_query = 7; ///< 執行SQL響應
optional ResponseRecord rsp_record = 8; ///< SELECT結果集記錄
optional ResponseCommon rsp_common = 9; ///< 通用響應
optional RequestGetConnection req_get_conn = 10; ///< 獲取鏈接請求
optional ResponseGetConnection rsp_get_conn = 11; ///< 獲取鏈接響應
}
/** * @brief 鏈接請求 */
message RequestConnect
{
required string host = 1; ///< DB所在服務器IP
required int32 port = 2; ///< DB端口
required string user = 3; ///< DB用戶名
required string password = 4; ///< DB用戶密碼
required string dbname = 5; ///< DB庫名
required string charset = 6; ///< DB字符集
}
/** * @brief 鏈接響應 */
message ResponseConnect
{
required int32 connect_id = 1; ///< 鏈接ID (鏈接失敗時,connect_id爲0)
optional int32 err_no = 2; ///< 錯誤碼 0 表示鏈接成功
optional string err_msg = 3; ///< 錯誤信息
}
/** * @brief 關閉鏈接請求 */
message RequestDisconnect
{
required int32 connect_id = 1; ///< 鏈接ID (鏈接失敗時,connect_id爲0)
}
/** * @brief 關閉鏈接響應 */
message ResponseDisconnect
{
optional int32 err_no = 2; ///< 錯誤碼 0 表示鏈接成功
optional string err_msg = 3; ///< 錯誤信息
}
/** * @brief 執行SQL請求 */
message RequestQuery
{
required E_QUERY_TYPE query_type = 1; ///< 查詢類型
required string table_name = 2; ///< 表名
repeated Field fields = 3; ///< 列類型
repeated ConditionGroup conditions= 4; ///< where條件組(由group_relation指定,若不指定則默認爲AND關係)
repeated string groupby_col = 5; ///< group by字段
repeated OrderBy orderby_col = 6; ///< order by字段
optional uint32 limit = 7; ///< 指定返回的行數的最大值 (limit 200)
optional uint32 limit_from = 8; ///< 指定返回的第一行的偏移量 (limit 100, 200)
optional ConditionGroup.E_RELATION group_relation = 9; ///< where條件組的關係,條件組之間有且只有一種關係(and或者or)
optional int32 connect_id = 10; ///< 鏈接ID,有效鏈接ID(長鏈接,當connect後屢次執行query可使用connect_id)
optional string bid = 11; ///< 業務ID,在CmdDbAgent.json配置文件中配置(短鏈接,每次執行query時鏈接,執行完後關閉鏈接)
optional string password = 12; ///< 業務密碼
enum E_QUERY_TYPE ///< 查詢類型
{
SELECT = 0; ///< select查詢
INSERT = 1; ///< insert插入
INSERT_IGNORE = 2; ///< insert ignore插入,若存在則放棄
UPDATE = 3; ///< update更新
REPLACE = 4; ///< replace覆蓋插入
DELETE = 5; ///< delete刪除
}
enum E_COL_TYPE ///< 列類型
{
STRING = 0; ///< char, varchar, text, datetime, timestamp等
INT = 1; ///< int
BIGINT = 2; ///< bigint
FLOAT = 3; ///< float
DOUBLE = 4; ///< double
}
message Field ///< 字段
{
required string col_name = 1; ///< 列名
required E_COL_TYPE col_type = 2; ///< 列類型
required bytes col_value = 3; ///< 列值
optional string col_as = 4; ///< as列名
}
message Condition ///< where條件
{
required E_RELATION relation = 1; ///< 關係(=, !=, >, <, >=, <= 等)
required E_COL_TYPE col_type = 2; ///< 列類型
required string col_name = 3; ///< 列名
repeated bytes col_values = 4; ///< 列值(當且僅當relation爲IN時值的個數大於1有效)
optional string col_name_right= 5; ///< 關係右邊列名(用於where col1=col2這種狀況)
enum E_RELATION
{
EQ = 0; ///< 等於=
NE = 1; ///< 不等於!=
GT = 2; ///< 大於>
LT = 3; ///< 小於<
GE = 4; ///< 大於等於>=
LE = 5; ///< 小於等於<=
LIKE = 6; ///< like
IN = 7; ///< in (1, 2, 3, 4, 5)
}
}
message ConditionGroup ///< where條件組合
{
required E_RELATION relation = 1; ///< 條件之間的關係,一個ConditionGroup裏的全部Condition之間有且只有一種關係(and或者or)
repeated Condition condition = 2; ///< 條件
enum E_RELATION
{
AND = 0; ///< and且
OR = 1; ///< or或
}
}
message OrderBy
{
required E_RELATION relation = 1; ///< 降序或升序
required string col_name = 2; ///< 列名
enum E_RELATION
{
ASC = 0;
DESC = 1;
}
}
}
/** * @brief 執行SQL響應 */
message ResponseQuery
{
required uint32 seq = 1; ///< 數據包序列號(SELECT結果集會分包返回,只有一個包的狀況或已到達最後一個包則seq=0xFFFFFFFF)
required int32 err_no = 2; ///< 錯誤碼,0 表示執行成功
optional string err_msg = 3; ///< 錯誤信息
optional uint64 insert_id = 4; ///< mysql_insert_id()獲取的值(視執行的SQL語句而定,不必定存在)
repeated bytes dict = 5; ///< 結果集字典(視執行的SQL語句而定,不必定存在)
}
/** * @brief SELECT語句返回結果集的一條記錄 */
message ResponseRecord
{
required uint32 seq = 1; ///< 數據包序列號(SELECT結果集會分包返回,已到達最後一個包則seq=0xFFFFFFFF)
repeated bytes field = 2; ///< 數據集記錄的字段
}
/** * @brief 常規響應 */
message ResponseCommon
{
optional int32 err_no = 1; ///< 錯誤碼 0 表示鏈接成功
optional string err_msg = 2; ///< 錯誤信息
}
/** * @brief 獲取鏈接請求 */
message RequestGetConnection
{
required string bid = 1; ///< 業務ID,在dbproxy配置文件中配置
required string password = 2; ///< 業務密碼
}
/** * @brief 獲取鏈接響應 */
message ResponseGetConnection
{
required int32 connect_id = 1; ///< 鏈接ID,有效鏈接ID,不然執行失敗
optional int32 err_no = 2; ///< 錯誤碼 0 表示鏈接成功
optional string err_msg = 3; ///< 錯誤信息
}
基於這個數據庫操做協議開發的數據庫代理層徹底解決了web邏輯層不容許直接訪問數據庫也不容許拼接SQL語句的問題,並且幾乎沒有增長開發代價。另外,基於這個協議的數據庫代理自然防止SQL注入(在代理層校驗field_name,而且mysql_escape_string(filed_value)),雖然防SQL注入應是應用層的責任,但多了數據代理這層保障也是好事。
這個協議只支持簡單SQL,不支持聯合查詢、子查詢,也不支持存儲過程,若是須要支持的話協議會更復雜。在Bwar所負責過的業務裏,基本都禁止數據庫聯合查詢之類,只把數據庫當存儲用,不把邏輯寫到SQL語句裏,因此這個協議知足大部分業務須要。
這一節只說明數據庫代理協議,下一節將從數據庫代理協議延伸並提供協議代碼講解。
大部分後臺應用只有MySQL是不夠的,每每還須要緩存,常常會用Redis來作數據緩存。用緩存意味着數據至少須要同時寫到Redis和MySQL,又或者在未命中緩存時從MySQL中讀取到的數據須要回寫到Redis,這些一般都是由業務邏輯層來作的。也有例外,Nebula提供的分佈式解決方案是由數據代理層來作的,業務邏輯層只需向數據代理層發送一個protobuf協議數據,數據代理層就會完成Redis和MySQL雙寫或緩存未命中時的自動回寫(暫且不探討數據一致性問題)。數據代理層來作這些工做是爲了減小業務邏輯層的複雜度,提升開發效率。既然是爲了提升開發效率,就得讓業務邏輯層低於原來同時操做Redis和MySQL的開發量。Nebula提供的NebulaMydis就是這樣一個讓原來同時操做Redis和MySQL的開發量(假設是2)降到1.2左右。
這個同時操做Redis和MySQL的數據代理協議以下:
syntax = "proto3";
package neb;
message Mydis
{
uint32 section_factor = 1;
RedisOperate redis_operate = 2;
DbOperate db_operate = 3;
message RedisOperate
{
bytes key_name = 1;
string redis_cmd_read = 2;
string redis_cmd_write = 3;
OPERATE_TYPE op_type = 4;
repeated Field fields = 5;
int32 key_ttl = 6;
int32 redis_structure = 7; ///< redis數據類型
int32 data_purpose = 8; ///< 數據用途
bytes hash_key = 9; ///< 可選hash key,當has_hash_key()時用hash_key來計算hash值,不然用key_name來計算hash值
enum OPERATE_TYPE
{
T_READ = 0;
T_WRITE = 1;
}
}
message DbOperate
{
E_QUERY_TYPE query_type = 1; ///< 查詢類型
string table_name = 2; ///< 表名
repeated Field fields = 3; ///< 列類型
repeated ConditionGroup conditions = 4; ///< where條件組(由group_relation指定,若不指定則默認爲AND關係)
repeated string groupby_col = 5; ///< group by字段
repeated OrderBy orderby_col = 6; ///< order by字段
ConditionGroup.E_RELATION group_relation = 7; ///< where條件組的關係,條件組之間有且只有一種關係(and或者or)
uint32 limit = 8; ///< 指定返回的行數的最大值 (limit 200)
uint32 limit_from = 9; ///< 指定返回的第一行的偏移量 (limit 100, 200)
uint32 mod_factor = 10; ///< 分表取模因子,當這個字段沒有時使用section_factor
enum E_QUERY_TYPE ///< 查詢類型
{
SELECT = 0; ///< select查詢
INSERT = 1; ///< insert插入
INSERT_IGNORE = 2; ///< insert ignore插入,若存在則放棄
UPDATE = 3; ///< update更新
REPLACE = 4; ///< replace覆蓋插入
DELETE = 5; ///< delete刪除
}
message Condition ///< where條件
{
E_RELATION relation = 1; ///< 關係(=, !=, >, <, >=, <= 等)
E_COL_TYPE col_type = 2; ///< 列類型
string col_name = 3; ///< 列名
repeated bytes col_values = 4; ///< 列值(當且僅當relation爲IN時值的個數大於1有效)
string col_name_right = 5; ///< 關係右邊列名(用於where col1=col2這種狀況)
enum E_RELATION
{
EQ = 0; ///< 等於=
NE = 1; ///< 不等於!=
GT = 2; ///< 大於>
LT = 3; ///< 小於<
GE = 4; ///< 大於等於>=
LE = 5; ///< 小於等於<=
LIKE = 6; ///< like
IN = 7; ///< in (1, 2, 3, 4, 5)
}
}
message ConditionGroup ///< where條件組合
{
E_RELATION relation = 1; ///< 條件之間的關係,一個ConditionGroup裏的全部Condition之間有且只有一種關係(and或者or)
repeated Condition condition = 2; ///< 條件
enum E_RELATION
{
AND = 0; ///< and且
OR = 1; ///< or或
}
}
message OrderBy
{
E_RELATION relation = 1; ///< 降序或升序
string col_name = 2; ///< 列名
enum E_RELATION
{
ASC = 0;
DESC = 1;
}
}
}
}
enum E_COL_TYPE ///< 列類型
{
STRING = 0; ///< char, varchar, text, datetime, timestamp等
INT = 1; ///< int
BIGINT = 2; ///< bigint
FLOAT = 3; ///< float
DOUBLE = 4; ///< double
}
message Record
{
repeated Field field_info = 1; ///< value data
}
message Field ///< 字段
{
string col_name = 1; ///< 列名
E_COL_TYPE col_type = 2; ///< 列類型
bytes col_value = 3; ///< 列值
string col_as = 4; ///< as列名
}
/** * @brief 查詢結果 * @note 適用於Redis返回和MySQL返回,當totalcount與curcount相等時代表數據已接收完畢, * 不然表示數據還沒有接收完,剩餘的數據會在後續數據包繼續返回。 */
message Result
{
int32 err_no = 1;
bytes err_msg = 2;
int32 total_count = 3;
int32 current_count = 4;
repeated Record record_data = 5;
int32 from = 6; ///< 數據來源 E_RESULT_FROM
DataLocate locate = 7; ///< 僅在DataProxy使用
enum E_RESULT_FROM
{
FROM_DB = 0;
FROM_REDIS = 1;
}
message DataLocate
{
uint32 section_from = 1;
uint32 section_to = 2; ///< 數據所在分段,section_from < MemOperate.section_factor <= section_to
uint32 hash = 3; ///< 用於作分佈的hash值(取模運算時,爲取模後的結果)
uint32 divisor = 4; ///< 取模運算的除數(一致性hash時不須要)
}
}
這個協議分了Redis和MySQL兩部分數據,看似業務邏輯層把一份數據填充了兩份並無下降多少開發量,實際上這兩部分數據有許可能是可共用的,只要提供一個填充類就能夠大幅下降協議填充開發量。爲簡化協議填充,Nebula提供了幾個類:同時填充Redis和MySQL數據、只填充Redis、只填充MySQL。
從Mydis協議的MySQL部分如何生成SQL語句請參考NebulaDbAgent,核心代碼頭文件以下:
namespace dbagent
{
const int gc_iMaxBeatTimeInterval = 30;
const int gc_iMaxColValueSize = 65535;
struct tagConnection {
CMysqlDbi* pDbi;
time_t ullBeatTime;
int iQueryPermit;
int iTimeout;
tagConnection() : pDbi(NULL), ullBeatTime(0), iQueryPermit(0), iTimeout(0)
{
}
~tagConnection()
{
if (pDbi != NULL)
{
delete pDbi;
pDbi = NULL;
}
}
};
class CmdExecSql : public neb::Cmd, public neb::DynamicCreator<CmdExecSql, int32>
{
public:
CmdExecSql(int32 iCmd);
virtual ~CmdExecSql();
virtual bool Init();
virtual bool AnyMessage( std::shared_ptr<neb::SocketChannel> pChannel, const MsgHead& oMsgHead, const MsgBody& oMsgBody);
protected:
bool GetDbConnection(const neb::Mydis& oQuery, CMysqlDbi** ppMasterDbi, CMysqlDbi** ppSlaveDbi);
bool FetchOrEstablishConnection(neb::Mydis::DbOperate::E_QUERY_TYPE eQueryType, const std::string& strMasterIdentify, const std::string& strSlaveIdentify, const neb::CJsonObject& oInstanceConf, CMysqlDbi** ppMasterDbi, CMysqlDbi** ppSlaveDbi);
std::string GetFullTableName(const std::string& strTableName, uint32 uiFactor);
int ConnectDb(const neb::CJsonObject& oInstanceConf, CMysqlDbi* pDbi, bool bIsMaster = true);
int Query(const neb::Mydis& oQuery, CMysqlDbi* pDbi);
void CheckConnection(); //檢查鏈接是否已超時
void Response(int iErrno, const std::string& strErrMsg);
bool Response(const neb::Result& oRsp);
bool CreateSql(const neb::Mydis& oQuery, CMysqlDbi* pDbi, std::string& strSql);
bool CreateSelect(const neb::Mydis& oQuery, std::string& strSql);
bool CreateInsert(const neb::Mydis& oQuery, CMysqlDbi* pDbi, std::string& strSql);
bool CreateUpdate(const neb::Mydis& oQuery, CMysqlDbi* pDbi, std::string& strSql);
bool CreateDelete(const neb::Mydis& oQuery, std::string& strSql);
bool CreateCondition(const neb::Mydis::DbOperate::Condition& oCondition, CMysqlDbi* pDbi, std::string& strCondition);
bool CreateConditionGroup(const neb::Mydis& oQuery, CMysqlDbi* pDbi, std::string& strCondition);
bool CreateGroupBy(const neb::Mydis& oQuery, std::string& strGroupBy);
bool CreateOrderBy(const neb::Mydis& oQuery, std::string& strOrderBy);
bool CreateLimit(const neb::Mydis& oQuery, std::string& strLimit);
bool CheckColName(const std::string& strColName);
private:
std::shared_ptr<neb::SocketChannel> m_pChannel;
MsgHead m_oInMsgHead;
MsgBody m_oInMsgBody;
int m_iConnectionTimeout; //空閒鏈接超時(單位秒)
char* m_szColValue; //字段值
neb::CJsonObject m_oDbConf;
uint32 m_uiSectionFrom;
uint32 m_uiSectionTo;
uint32 m_uiHash;
uint32 m_uiDivisor;
std::map<std::string, std::set<uint32> > m_mapFactorSection; //分段因子區間配置,key爲因子類型
std::map<std::string, neb::CJsonObject*> m_mapDbInstanceInfo; //數據庫配置信息key爲("%u:%u:%u", uiDataType, uiFactor, uiFactorSection)
std::map<std::string, tagConnection*> m_mapDbiPool; //數據庫鏈接池,key爲identify(如:192.168.18.22:3306)
};
} // namespace dbagent
整個mydis數據協議是如何解析如何使用,如何作Redis和MySQL的數據雙寫、緩存數據回寫等不在本文討論範圍,若有興趣能夠閱讀NebulaMydis源碼,也能夠聯繫Bwar。
Protobuf用得合適用得好能夠解決許多問題,能夠提升開發效率,也能夠提升運行效率,以上就是Bwar多年應用protobuf的小結,沒有任何藏私,文中列出的協議均可以在開源項目Nebula的這個路徑https://github.com/Bwar/Nebula/tree/master/proto找到。
開發Nebula框架目的是致力於提供一種基於C++快速構建高性能的分佈式服務。若是以爲本文對你有用,別忘了到Nebula的Github或碼雲給個star,謝謝。