在實際應用中,可能會遇到直接和C進行二進制字節流協議通訊,這時要把數據解包成python數據,若是可能,最好與C定義的結構體徹底對應上.
python中有2種方式,可處理二進制數據轉換python
Structure
直接定義結構體pack/unpack
函數組裝轉換在轉換時必定要注意==字節序==,這兩種方式都有各自的方法標誌字節序.git
ctypes中有許多C中的操做接口,如sizeof
,memmove
等,也提供近似C結構體的模擬類Structure
,BigEndianStructure
,Union
,顯然的是BigEndianStructure
是網絡字節序(大端),方便直接用於網絡傳輸,Union
和Structure
是主機序(多是大端,也多是小端,和本機有關).數組
from ctypes import * class SSHead(BigEndianStructure): _pack_ = 1 _fields_ = [ #(字段名, c類型 ) ('nTotalSize', c_uint32), ('nSourceID', c_int32), ('sourceType', c_uint8), ('destType', c_uint8), ('transType', c_uint8), ('nDestID', c_int32), ('nFlag', c_uint8), ('nOptionalLength', c_uint16), ('arrOptional', c_char * 20), ] def encode(self): return string_at(addressof(self), sizeof(self)) def decode(self, data): memmove(addressof(self), data, sizeof(self)) return len(data) # ------------------- # 使用 sshead = SSHead() sshead.nSourceID = 20 #省略其餘賦值 buf = sshead.encode() ss = SSHead() ss.decode(buf) print(ss.nSourceID)
以上就是一個簡單協議結構體定義,對應的C版本以下網絡
struct SSHead { uint32_t nTotalSize; int32_t nSourceID; uint8_t sourceType; uint8_t destType; uint8_t transType; int32_t nDestID; int8_t nFlag; uint16_t nOptionalLength; char arrOptional[20]; //簡單模擬python的打包解包 int encode(char* buf, size_t max_len) { memmove(buf, this, sizeof(this)); return 0; } int decode(char* buf, size_t len) { memmove(this, buf, len); return 0; } } // c中對應的 打包/解包流程(假設本機字節序爲大端) SSHead sshead = {0}; sshead.nSourceID = 20; char buf[1024]; sshead.encode(buf); SSHead ss = {0}; ss.decode(buf, sizeof(ss));
其中_pack_ = 1
表示1字節對齊,否則可能會被填充,致使結構體實際所佔字節數與表面上的不同.
_fields_
定義C結構體中相對應的字段名和類型,C中每種基礎類型在ctypes都有與之對應的類型,如c_uint32
對應uint32_t
,佔4個字節.數組就是後面乘以對應的長度便可,如c_uint8 * 20
.另外還支持嵌套定義結構體.在實例化後,字段名會成爲成員變量,可直接賦值.app
encode
會直接獲得該對象的二進制數據,若是不考慮字節序,則與C中相同對象的二進制數據是同樣的
decode
相反,直接解包二進制數據爲python數據
這樣python和c就能夠直接經過結構體定義協議通訊了.ssh
BigEndianUnion
類型BigEndianStructure
,否則會有字節序問題此方法只能適用於結構體固定打解包的狀況,若是協議中有大數組,但數組中的數據只有前幾個是有效的,後面都是無效的,通常在打包的時候只打包有效數據,這種狀況用Structure
就不合適了.函數
struct模塊是專門用來處理python與C之間的二進制數據轉換,總共只有幾個函數測試
下面在原有的SSHead定義中增長2個使用struct打包解包的函數ui
from ctypes import * import struct class SSHead(BigEndianStructure): _pack_ = 1 _fields_ = [ #(字段名, c類型 ) ('nTotalSize', c_uint32), ('nSourceID', c_int32), ('sourceType', c_uint8), ('destType', c_uint8), ('transType', c_uint8), ('nDestID', c_int32), ('nFlag', c_uint8), ('nOptionalLength', c_uint16), ('arrOptional', c_char * 20), ] def encode(self): return string_at(addressof(self), sizeof(self)) def decode(self, data): memmove(addressof(self), data, sizeof(self)) return len(data) def pack(self): buffer = struct.pack("!IIBBBIBH20s", self.nTotalSize, self.nSourceID, self.sourceType , self.destType, self.transType, self.nDestID, self.nFlag, self.nOptionalLength, self.arrOptional) return buffer def unpack(self, data): (self.nTotalSize, self.nSourceID, self.sourceType, self.destType, self.transType, self.nDestID, self.nFlag, self.nOptionalLength, self.arrOptional) = struct.unpack("!IIBBBIBH20s", data) # --------------------------- # 測試 s = SSHead() s.arrOptional = b'hello' ss = SSHead() ss.unpack(s.encode()) print(ss.arrOptional)
"!IIBBBIBH20B"
:!
表示按照網絡序處理,I
表示後面的第一變量爲4字節的int
型,接着的B
表示爲下一個變量爲1字節的uint8_t
型,以此類推,20s
表示後面是長度20的字節數組
其餘參數可參考官方文檔.this
上面的例子中若是使用pakc/unpack
方法,是不用繼承BigEndianStructure
,只需自定義相應字段變量.
能夠看到,struct.pack/unpack必須對每一個字段表明什麼類型,幾個字節進行描述.與Structure
相比,比較靈活,能夠自由組合怎麼打包,好比在nOptionalLength=0
時,不打包arrOptional
字段.缺點就是,定義pack/unpack
函數時,協議多起來會很是繁瑣且容易出錯.因此最好是自動化生成pack/unpack
函數.
顯然,咱們須要知道結構體成員的變量名和類型,參考Structure
,有以下定義
class BaseCode(object): _type_map_index_pack_tag = 1 _type_map_index_pack_size = 2 _type_map = { # C類型:(說明, 編碼標誌) 'char': ('int', 'B'), 'uint32_t': ('int', 'I'), 'string': ('str', 'B'), 'int32_t': ('int', 'i'), 'int64_t': ('int', 'q'), 'uint64_t': ('int', 'Q'), 'float': ('float', 'f'), 'double': ('double', 'd'), } # 每種基礎類型所佔字節數 _ctype_size_map = {'I': 4, 'B': 1, 'i': 4, 'b': 1, 'Q': 8, 'q': 8, 'f': 4, 'd': 8} _fields_index_ctype = 0 _fields_index_value_name = 1 _fields_index_array_length = 2 # 測試 _fields = [ # (C類型, 變量名) ('uint32_t', 'nUint'), ('string', 'szString', '_Const.enmMaxAccountIDLength'), ('int32_t', 'nInt3'), ('uint32_t', 'nUintArray', 4), ]
對_fields中的每一個元素,進行編碼,經過變量名可得到實際變量值,經過C類型利用struct.pack/unpack
可得到實際編碼
下面是添加的類成員函數encode
def encode(self, nest=1): data = b'' tmp = b'' debug_log("&" * nest, self.__class__.__name__, "encode struct start :") for one in self._fields: debug_log("#" * nest, "encode one element:", one) ctype = one[self._fields_index_ctype] value = getattr(self, one[self._fields_index_value_name]) if len(one) == 3: length = one[self._fields_index_array_length] if type(length) == str: length = eval(length) tmp = self._encode_array(ctype, value, length) else: # 不是基礎類型,即嵌套定義 if ctype not in BaseCode._type_map: tmp = value.encode(nest+1) else: fmt = '!' + self._type_map[ctype][self._type_map_index_pack_tag] tmp = struct.pack(fmt, value) # debug_log(fmt, type(value), value) debug_log("#" * nest,"encode one element:", len(tmp), tmp) data += tmp debug_log("&" * nest, self.__class__.__name__, "encode end: len=", len(data), data) return data def _encode_array(self, ctype, value, max_length): """ 打包數組 若是是字符串類型 須要作下特殊處理 :param ctype: :param value: :param max_length: :return: """ debug_log('ctype:', ctype, type(ctype)) if ctype == 'string': max_length -= 1 # 字符串長度須要減一 value = bytes(value, encoding='utf8') #print(value) if len(value) > max_length: raise EncodeError('the length of array is too long') # pack長度 data = struct.pack('!H', len(value)) debug_log("array count:", len(value), "value:", value, type(value)) # pack數組內容 for one in value: #debug_log("self._type_map[ctype][1]=", self._type_map[ctype][self._type_map_index_pack_tag], one) if ctype not in BaseCode._type_map: data += one.encode() else: data += struct.pack('!' + self._type_map[ctype][self._type_map_index_pack_tag], one) return data
數組類型在python中使用list
表示,在打包數組類型以前會添加==2字節表示數組長度==
字符串類型轉換爲bytes
類型,而後就和普通數組同樣,一個元素一個元素處理(實際在for遍歷中,一個元素是一個int
,和C中同樣,因此用B
標誌打包)
當==c類型==不是_type_map
中的基礎類型,那就是自定義的結構體類型,而後嵌套調用encode就能夠了
目前沒有考慮union
的處理
def decode(self, data, offset=0, nest=1): """ :param data: :return: """ debug_log("&" * nest, self.__class__.__name__, "decode struct start :") for one in self._fields: debug_log("#" * nest, "decode one element:", one) ctype = one[self._fields_index_ctype] if len(one) == 3: offset = self._decode_array(one, data, offset, nest) else: ctype_attr = self._type_map[ctype] if ctype not in BaseCode._type_map: value = eval(ctype + '()') offset = value.decode(data, offset, nest) setattr(self, one[self._fields_index_value_name], value) else: fmt = '!' + ctype_attr[self._type_map_index_pack_tag] value, = struct.unpack_from(fmt, data, offset) offset += self._ctype_size_map[ctype_attr[self._type_map_index_pack_tag]] debug_log(one, one[self._fields_index_value_name]) setattr(self, one[self._fields_index_value_name], value) debug_log("#" * nest, "decode one element end:", offset, one) return offset def _decode_array(self, field, data, offset, nest): ctype = field[self._fields_index_ctype] array_num, = struct.unpack_from('!H', data, offset) offset += 2 value = [] ctype_attr = self._type_map[ctype] debug_log("$" * nest, "decode array count", array_num, field) while array_num > 0: array_num -= 1 if ctype not in BaseCode._type_map: one = eval(ctype + '()') offset = one.decode(data, offset, nest) value.append(one) else: one, = struct.unpack_from('!' + ctype_attr[self._type_map_index_pack_tag], data, offset) value.append(one) offset += self._ctype_size_map[ctype_attr[self._type_map_index_pack_tag]] if ctype == 'string': # 這裏是由於字符串是按照單個字符解包,會解成python的int,經過chr()轉化爲字符型 # value = [97,98] # list(map(chr,value)) 後等於 ['a','b'] # ''.join() 就轉成'ab' value = ''.join(list(map(chr, value))) value = bytes(value, encoding='latin1').decode('utf8') setattr(self, field[self._fields_index_value_name], value) debug_log("$" * nest, "decode array ok", array_num, field) return offset
完整代碼:https://gitee.com/iclodq/codes/e81qfpxw3dnkju594yi6b90
包含簡單測試和轉成字典結構
在python3.5下運行成功
但願幫到各位!!