python與C結構體之間二進制數據轉換

python與C結構體之間數據轉換

前言

在實際應用中,可能會遇到直接和C進行二進制字節流協議通訊,這時要把數據解包成python數據,若是可能,最好與C定義的結構體徹底對應上.
python中有2種方式,可處理二進制數據轉換python

  • 用ctypes包的Structure直接定義結構體
  • 用struct包的pack/unpack函數組裝轉換

在轉換時必定要注意==字節序==,這兩種方式都有各自的方法標誌字節序.git

使用ctypes包

ctypes中有許多C中的操做接口,如sizeof,memmove等,也提供近似C結構體的模擬類Structure,BigEndianStructure,Union,顯然的是BigEndianStructure是網絡字節序(大端),方便直接用於網絡傳輸,UnionStructure是主機序(多是大端,也多是小端,和本機有關).數組

Structure/BigEndianStructure使用

from ctypes import *
class SSHead(BigEndianStructure):
    _pack_ = 1
    _fields_ = [
        #(字段名, c類型 )
        ('nTotalSize', c_uint32),
        ('nSourceID', c_int32),
        ('sourceType', c_uint8),
        ('destType', c_uint8),
        ('transType', c_uint8),
        ('nDestID', c_int32),
        ('nFlag', c_uint8),
        ('nOptionalLength', c_uint16),
        ('arrOptional', c_char * 20),
    ]
    
    def encode(self):
        return string_at(addressof(self), sizeof(self))

    def decode(self, data):
        memmove(addressof(self), data, sizeof(self))
        return len(data)

# -------------------
# 使用
sshead = SSHead()
sshead.nSourceID = 20 #省略其餘賦值
buf = sshead.encode()
ss = SSHead()
ss.decode(buf)
print(ss.nSourceID)

以上就是一個簡單協議結構體定義,對應的C版本以下網絡

struct SSHead
{
    uint32_t nTotalSize;
    int32_t nSourceID;
    uint8_t sourceType;
    uint8_t destType;
    uint8_t transType;
    int32_t nDestID;
    int8_t nFlag;
    uint16_t nOptionalLength;
    char arrOptional[20];
    
    //簡單模擬python的打包解包
    int encode(char* buf, size_t max_len)
    {
        memmove(buf, this, sizeof(this));
        return 0;
    }
    int decode(char* buf, size_t len)
    {
        memmove(this, buf, len);
        return 0;
    }
}
// c中對應的 打包/解包流程(假設本機字節序爲大端)
SSHead sshead = {0};
sshead.nSourceID = 20;
char buf[1024];
sshead.encode(buf);

SSHead ss = {0};
ss.decode(buf, sizeof(ss));

其中_pack_ = 1表示1字節對齊,否則可能會被填充,致使結構體實際所佔字節數與表面上的不同.
_fields_定義C結構體中相對應的字段名和類型,C中每種基礎類型在ctypes都有與之對應的類型,如c_uint32對應uint32_t,佔4個字節.數組就是後面乘以對應的長度便可,如c_uint8 * 20.另外還支持嵌套定義結構體.在實例化後,字段名會成爲成員變量,可直接賦值.app

encode會直接獲得該對象的二進制數據,若是不考慮字節序,則與C中相同對象的二進制數據是同樣的
decode相反,直接解包二進制數據爲python數據
這樣python和c就能夠直接經過結構體定義協議通訊了.ssh

注意

  • python中的二進制數據是==bytes==類型,不是==str==類型
  • 在python3.6及以前的版本,是沒有BigEndianUnion類型
  • 用來網絡傳輸必定要用BigEndianStructure,否則會有字節序問題

缺點

此方法只能適用於結構體固定打解包的狀況,若是協議中有大數組,但數組中的數據只有前幾個是有效的,後面都是無效的,通常在打包的時候只打包有效數據,這種狀況用Structure就不合適了.函數

使用struct包

struct模塊是專門用來處理python與C之間的二進制數據轉換,總共只有幾個函數測試

下面在原有的SSHead定義中增長2個使用struct打包解包的函數ui

from ctypes import *
import struct
class SSHead(BigEndianStructure):
    _pack_ = 1
    _fields_ = [
        #(字段名, c類型 )
        ('nTotalSize', c_uint32),
        ('nSourceID', c_int32),
        ('sourceType', c_uint8),
        ('destType', c_uint8),
        ('transType', c_uint8),
        ('nDestID', c_int32),
        ('nFlag', c_uint8),
        ('nOptionalLength', c_uint16),
        ('arrOptional', c_char * 20),
    ]
    
    def encode(self):
        return string_at(addressof(self), sizeof(self))

    def decode(self, data):
        memmove(addressof(self), data, sizeof(self))
        return len(data)
        
    def pack(self):
        buffer = struct.pack("!IIBBBIBH20s", self.nTotalSize, self.nSourceID, self.sourceType
                             , self.destType, self.transType, self.nDestID, self.nFlag, self.nOptionalLength, self.arrOptional)
        return buffer
    
    def unpack(self, data):
        (self.nTotalSize, self.nSourceID, self.sourceType, self.destType, self.transType, self.nDestID,
        self.nFlag, self.nOptionalLength, self.arrOptional) = struct.unpack("!IIBBBIBH20s", data)

# ---------------------------
# 測試        
s = SSHead()
s.arrOptional = b'hello'
ss = SSHead()
ss.unpack(s.encode())
print(ss.arrOptional)

pack/unpack的fmt(格式化串)說明

"!IIBBBIBH20B":!表示按照網絡序處理,I表示後面的第一變量爲4字節的int型,接着的B表示爲下一個變量爲1字節的uint8_t型,以此類推,20s表示後面是長度20的字節數組
其餘參數可參考官方文檔.this

缺點

上面的例子中若是使用pakc/unpack方法,是不用繼承BigEndianStructure,只需自定義相應字段變量.
能夠看到,struct.pack/unpack必須對每一個字段表明什麼類型,幾個字節進行描述.與Structure相比,比較靈活,能夠自由組合怎麼打包,好比在nOptionalLength=0時,不打包arrOptional字段.缺點就是,定義pack/unpack函數時,協議多起來會很是繁瑣且容易出錯.因此最好是自動化生成pack/unpack函數.

自動化生成pack/unpack

定義結構體成員列表

顯然,咱們須要知道結構體成員的變量名和類型,參考Structure,有以下定義

class BaseCode(object):
    _type_map_index_pack_tag = 1
    _type_map_index_pack_size = 2
    _type_map = {
        # C類型:(說明, 編碼標誌)
        'char': ('int', 'B'),
        'uint32_t': ('int', 'I'),
        'string': ('str', 'B'),
        'int32_t': ('int', 'i'),
        'int64_t': ('int', 'q'),
        'uint64_t': ('int', 'Q'),
        'float': ('float', 'f'),
        'double': ('double', 'd'),
    }

    # 每種基礎類型所佔字節數
    _ctype_size_map = {'I': 4, 'B': 1, 'i': 4, 'b': 1, 'Q': 8, 'q': 8, 'f': 4, 'd': 8}

    _fields_index_ctype = 0
    _fields_index_value_name = 1
    _fields_index_array_length = 2

    # 測試
    
    _fields = [
        # (C類型, 變量名)
        ('uint32_t', 'nUint'),
        ('string', 'szString', '_Const.enmMaxAccountIDLength'),
        ('int32_t', 'nInt3'),
        ('uint32_t', 'nUintArray', 4),
    ]

按序遍歷_fields中的字段

對_fields中的每一個元素,進行編碼,經過變量名可得到實際變量值,經過C類型利用struct.pack/unpack可得到實際編碼
下面是添加的類成員函數encode

def encode(self, nest=1):
        data = b''
        tmp = b''
        debug_log("&" * nest, self.__class__.__name__, "encode struct start :")
        for one in self._fields:
            debug_log("#" * nest, "encode one element:", one)
            ctype = one[self._fields_index_ctype]

            value = getattr(self, one[self._fields_index_value_name])
            if len(one) == 3:
                length = one[self._fields_index_array_length]
                if type(length) == str:
                    length = eval(length)
                tmp = self._encode_array(ctype, value, length)
            else:

                # 不是基礎類型,即嵌套定義
                if ctype not in BaseCode._type_map:
                    tmp = value.encode(nest+1)
                else:
                    fmt = '!' + self._type_map[ctype][self._type_map_index_pack_tag]
                    tmp = struct.pack(fmt, value)
                    # debug_log(fmt, type(value), value)
            debug_log("#" * nest,"encode one element:", len(tmp), tmp)
            data += tmp
        debug_log("&" * nest, self.__class__.__name__, "encode end: len=", len(data), data)
        return data

    def _encode_array(self, ctype, value, max_length):
        """
        打包數組
        若是是字符串類型 須要作下特殊處理
        :param ctype:
        :param value:
        :param max_length:
        :return:
        """
        debug_log('ctype:', ctype, type(ctype))
        if ctype == 'string':
            max_length -= 1  # 字符串長度須要減一
            value = bytes(value, encoding='utf8')
            #print(value)

        if len(value) > max_length:
            raise EncodeError('the length of  array is too long')

        # pack長度
        data = struct.pack('!H', len(value))
        debug_log("array count:", len(value), "value:", value, type(value))
        # pack數組內容
        for one in value:
            #debug_log("self._type_map[ctype][1]=", self._type_map[ctype][self._type_map_index_pack_tag], one)
            if ctype not in BaseCode._type_map:
                data += one.encode()
            else:
                data += struct.pack('!' + self._type_map[ctype][self._type_map_index_pack_tag], one)
        return data

數組類型在python中使用list表示,在打包數組類型以前會添加==2字節表示數組長度==
字符串類型轉換爲bytes類型,而後就和普通數組同樣,一個元素一個元素處理(實際在for遍歷中,一個元素是一個int,和C中同樣,因此用B標誌打包)
當==c類型==不是_type_map中的基礎類型,那就是自定義的結構體類型,而後嵌套調用encode就能夠了
目前沒有考慮union的處理

解碼,反向處理

def decode(self, data, offset=0, nest=1):
        """
        :param data:
        :return:
        """
        debug_log("&" * nest, self.__class__.__name__, "decode struct start :")
        for one in self._fields:
            debug_log("#" * nest, "decode one element:", one)
            ctype = one[self._fields_index_ctype]
            if len(one) == 3:
                offset = self._decode_array(one, data, offset, nest)
            else:
                ctype_attr = self._type_map[ctype]
                if ctype not in BaseCode._type_map:
                    value = eval(ctype + '()')
                    offset = value.decode(data, offset, nest)
                    setattr(self, one[self._fields_index_value_name], value)

                else:
                    fmt = '!' + ctype_attr[self._type_map_index_pack_tag]
                    value, = struct.unpack_from(fmt, data, offset)
                    offset += self._ctype_size_map[ctype_attr[self._type_map_index_pack_tag]]
                    debug_log(one, one[self._fields_index_value_name])
                    setattr(self, one[self._fields_index_value_name], value)
            debug_log("#" * nest, "decode one element end:", offset, one)
        return offset

    def _decode_array(self, field, data, offset, nest):
        ctype = field[self._fields_index_ctype]
        array_num, = struct.unpack_from('!H', data, offset)
        offset += 2
        value = []
        ctype_attr = self._type_map[ctype]
        debug_log("$" * nest, "decode array count", array_num, field)
        while array_num > 0:
            array_num -= 1
            if ctype not in BaseCode._type_map:
                one = eval(ctype + '()')
                offset = one.decode(data, offset, nest)
                value.append(one)
            else:
                one, = struct.unpack_from('!' + ctype_attr[self._type_map_index_pack_tag], data, offset)
                value.append(one)
                offset += self._ctype_size_map[ctype_attr[self._type_map_index_pack_tag]]

        if ctype == 'string':
            # 這裏是由於字符串是按照單個字符解包,會解成python的int,經過chr()轉化爲字符型
            # value = [97,98]
            # list(map(chr,value)) 後等於 ['a','b']
            # ''.join() 就轉成'ab'

            value = ''.join(list(map(chr, value)))
            value = bytes(value, encoding='latin1').decode('utf8')


        setattr(self, field[self._fields_index_value_name], value)
        debug_log("$" * nest, "decode array ok", array_num, field)
        return offset

最後

完整代碼:https://gitee.com/iclodq/codes/e81qfpxw3dnkju594yi6b90

包含簡單測試和轉成字典結構
在python3.5下運行成功

但願幫到各位!!

Buy me a coffee

image

相關文章
相關標籤/搜索