python_接口自動化測試框架

本文總結分享介紹接口測試框架開發,環境使用python3+selenium3+unittest+ddt+requests測試框架及ddt數據驅動,採用Excel管理測試用例等集成測試數據功能,以及使用HTMLTestRunner來生成測試報告,目前有開源的poman、Jmeter等接口測試工具,爲何還要開發接口測試框架呢?因接口測試工具也有存在幾點不足。html

  • 測試數據不可控制。好比接口返回數據不可控,就沒法自動斷言接口返回的數據,不能判定是接口程序引發,仍是測試數據變化引發的錯誤,因此須要作一些初始化測試數據。接口工具沒有具有初始化測試數據功能,沒法作到真正的接口測試自動化。
  • 沒法測試加密接口。實際項目中,多數接口不是能夠隨便調用,通常狀況沒法摸擬和生成加密算法。如時間戳和MDB加密算法,通常接口工具沒法摸擬。
  • 擴展能力不足。開源的接口測試工具沒法實現擴展功能。好比,咱們想生成不一樣格式的測試報告,想將測試報告發送到指定郵箱,又想讓接口測試集成到CI中,作持續集成定時任務。

測試框架處理流程

 

 測試框架處理過程以下:python

  1. 首先初始化清空數據庫表的數據,向數據庫插入測試數據;
  2. 調用被測試系統提供的接口,先數據驅動讀取excel用例一行數據;
  3. 發送請求數據,根據傳參數據,向數據庫查詢獲得對應的數據;
  4. 將查詢的結果組裝成JSON格式的數據,同時根據返回的數據值與Excel的值對比判斷,並寫入結果至指定Excel測試用例表格;
  5. 經過單元測試框架斷言接口返回的數據,並生成測試報告,最後把生成最新的測試報告HTML文件發送指定的郵箱。

測試框架結構目錄介紹

 

目錄結構介紹以下:mysql

  • config/:                    文件路徑配置
  • database/:               測試用例模板文件及數據庫和發送郵箱配置文件
  • db_fixture/:              初始化接口測試數據
  • lib/:                          程序核心模塊。包含有excel解析讀寫、發送郵箱、發送請求、生成最新測試報告文件
  • package/:                存放第三方庫包。如HTMLTestRunner,用於生成HTML格式測試報告
  • report/:                    生成接口自動化測試報告
  • testcase/:                用於編寫接口自動化測試用例
  • run_demo.py:          執行全部接口測試用例的主程序
  • GitHub項目地址:    https://github.com/yingoja/DemoAPI

數據庫封裝

 1 [tester]
 2 name = Jason
 3 
 4 [mysqlconf]
 5 host = 127.0.0.1
 6 port = 3306
 7 user = root
 8 password = 123456
 9 db_name = guest
10 
11 [user]
12 # 發送郵箱服務器
13 HOST_SERVER = smtp.163.com
14 # 郵件發件人
15 FROM = 111@163.com
16 # 郵件收件人
17 TO = 222@126.com
18 # 發送郵箱用戶名/密碼
19 user = aaa
20 password = aaa
21 # 郵件主題
22 SUBJECT = 發佈會系統接口自動化測試報告
config.ini
 1 #!/usr/bin/env python
 2 # _*_ coding:utf-8 _*_
 3 __author__ = 'YinJia'
 4 
 5 import os,sys
 6 sys.path.append(os.path.dirname(os.path.dirname(__file__)))
 7 from config import setting
 8 from pymysql import connect,cursors
 9 from pymysql.err import OperationalError
10 import configparser as cparser
11 
12 # --------- 讀取config.ini配置文件 ---------------
13 cf = cparser.ConfigParser()
14 cf.read(setting.TEST_CONFIG,encoding='UTF-8')
15 host = cf.get("mysqlconf","host")
16 port = cf.get("mysqlconf","port")
17 user = cf.get("mysqlconf","user")
18 password = cf.get("mysqlconf","password")
19 db = cf.get("mysqlconf","db_name")
20 
21 class DB:
22     """
23     MySQL基本操做
24     """
25     def __init__(self):
26         try:
27             # 鏈接數據庫
28             self.conn = connect(host = host,
29                                 user = user,
30                                 password = password,
31                                 db = db,
32                                 charset = 'utf8mb4',
33                                 cursorclass = cursors.DictCursor
34                                 )
35         except OperationalError as e:
36             print("Mysql Error %d: %s" % (e.args[0],e.args[1]))
37 
38    # 清除表數據
39     def clear(self,table_name):
40         real_sql = "delete from " + table_name + ";"
41         with self.conn.cursor() as cursor:
42              # 取消表的外鍵約束
43             cursor.execute("SET FOREIGN_KEY_CHECKS=0;")
44             cursor.execute(real_sql)
45         self.conn.commit()
46 
47     # 插入表數據
48     def insert(self, table_name, table_data):
49         for key in table_data:
50             table_data[key] = "'"+str(table_data[key])+"'"
51         key   = ','.join(table_data.keys())
52         value = ','.join(table_data.values())
53         real_sql = "INSERT INTO " + table_name + " (" + key + ") VALUES (" + value + ")"
54 
55         with self.conn.cursor() as cursor:
56             cursor.execute(real_sql)
57         self.conn.commit()
58 
59     # 關閉數據庫
60     def close(self):
61         self.conn.close()
62 
63     # 初始化數據
64     def init_data(self, datas):
65         for table, data in datas.items():
66             self.clear(table)
67             for d in data:
68                 self.insert(table, d)
69         self.close()
mysql_db.py
 1 #!/usr/bin/env python
 2 # _*_ coding:utf-8 _*_
 3 __author__ = 'YinJia'
 4 
 5 import sys, time, os
 6 sys.path.append(os.path.dirname(os.path.dirname(__file__)))
 7 from db_fixture.mysql_db import DB
 8 
 9 # 定義過去時間
10 past_time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()-100000))
11 # 定義未來時間
12 future_time = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()+10000))
13 
14 # 建立測試數據
15 datas = {
16     # 發佈會表數據
17     'sign_event':[
18         {'id':1,'name':'紅米Pro發佈會','`limit`':2000,'status':1,'address':'北京會展中心','start_time':future_time},
19         {'id':2,'name':'蘋果iphon6發佈會','`limit`':1000,'status':1,'address':'寶安體育館','start_time':future_time},
20         {'id':3,'name':'華爲榮耀8發佈會','`limit`':2000,'status':0,'address':'深圳福田會展中心','start_time':future_time},
21         {'id':4,'name':'蘋果iphon8發佈會','`limit`':2000,'status':1,'address':'深圳灣體育中心','start_time':past_time},
22         {'id':5,'name':'小米5發佈會','`limit`':2000,'status':1,'address':'北京國家會議中心','start_time':future_time},
23     ],
24     # 嘉賓表數據
25     'sign_guest':[
26         {'id':1,'realname':'Tom','phone':13511886601,'email':'alen@mail.com','sign':0,'event_id':1},
27         {'id':2,'realname':'Jason','phone':13511886602,'email':'sign@mail.com','sign':1,'event_id':1},
28         {'id':3,'realname':'Jams','phone':13511886603,'email':'tom@mail.com','sign':0,'event_id':5},
29     ],
30 }
31 
32 # 測試數據插入表
33 def init_data():
34     DB().init_data(datas)
test_data.py
 1 #!/usr/bin/env python
 2 # _*_ coding:utf-8 _*_
 3 __author__ = 'YinJia'
 4 
 5 import os,sys
 6 BASE_DIR = os.path.dirname(os.path.dirname(__file__))
 7 sys.path.append(BASE_DIR)
 8 
 9 # 配置文件
10 TEST_CONFIG =  os.path.join(BASE_DIR,"database","config.ini")
11 # 測試用例模板文件
12 SOURCE_FILE = os.path.join(BASE_DIR,"database","DemoAPITestCase.xlsx")
13 # excel測試用例結果文件
14 TARGET_FILE = os.path.join(BASE_DIR,"report","excelReport","DemoAPITestCase.xlsx")
15 # 測試用例報告
16 TEST_REPORT = os.path.join(BASE_DIR,"report")
17 # 測試用例程序文件
18 TEST_CASE = os.path.join(BASE_DIR,"testcase")
setting.py

程序核心模塊

 1 #!/usr/bin/env python
 2 # _*_ coding:utf-8 _*_
 3 __author__ = 'YinJia'
 4 
 5 import os
 6 
 7 def new_report(testreport):
 8     """
 9     生成最新的測試報告文件
10     :param testreport:
11     :return:返回文件
12     """
13     lists = os.listdir(testreport)
14     lists.sort(key=lambda fn: os.path.getmtime(testreport + "\\" + fn))
15     file_new = os.path.join(testreport,lists[-1])
16     return file_new
netReport.py
 1 #!/usr/bin/env python
 2 # _*_ coding:utf-8 _*_
 3 __author__ = 'YinJia'
 4 
 5 import xlrd
 6 
 7 class ReadExcel():
 8     """讀取excel文件數據"""
 9     def __init__(self,fileName, SheetName="Sheet1"):
10         self.data = xlrd.open_workbook(fileName)
11         self.table = self.data.sheet_by_name(SheetName)
12 
13         # 獲取總行數、總列數
14         self.nrows = self.table.nrows
15         self.ncols = self.table.ncols
16     def read_data(self):
17         if self.nrows > 1:
18             # 獲取第一行的內容,列表格式
19             keys = self.table.row_values(0)
20             listApiData = []
21             # 獲取每一行的內容,列表格式
22             for col in range(1, self.nrows):
23                 values = self.table.row_values(col)
24                 # keys,values組合轉換爲字典
25                 api_dict = dict(zip(keys, values))
26                 listApiData.append(api_dict)
27             return listApiData
28         else:
29             print("表格是空數據!")
30             return None
readexcel.py
 1 #!/usr/bin/env python
 2 # _*_ coding:utf-8 _*_
 3 __author__ = 'YinJia'
 4 
 5 import os,sys,json
 6 sys.path.append(os.path.dirname(os.path.dirname(__file__)))
 7 
 8 
 9 class SendRequests():
10     """發送請求數據"""
11     def sendRequests(self,s,apiData):
12         try:
13             #從讀取的表格中獲取響應的參數做爲傳遞
14             method = apiData["method"]
15             url = apiData["url"]
16             if apiData["params"] == "":
17                 par = None
18             else:
19                 par = eval(apiData["params"])
20             if apiData["headers"] == "":
21                 h = None
22             else:
23                 h = eval(apiData["headers"])
24             if apiData["body"] == "":
25                 body_data = None
26             else:
27                 body_data = eval(apiData["body"])
28             type = apiData["type"]
29             v = False
30             if type == "data":
31                 body = body_data
32             elif type == "json":
33                 body = json.dumps(body_data)
34             else:
35                 body = body_data
36 
37             #發送請求
38             re = s.request(method=method,url=url,headers=h,params=par,data=body,verify=v)
39             return re
40         except Exception as e:
41             print(e)
sendrequests.py
 1 #!/usr/bin/env python
 2 # _*_ coding:utf-8 _*_
 3 __author__ = 'YinJia'
 4 
 5 import os,sys
 6 sys.path.append(os.path.dirname(os.path.dirname(__file__)))
 7 from config import setting
 8 import smtplib
 9 from lib.newReport import new_report
10 import configparser
11 from email.mime.text import MIMEText
12 from email.mime.multipart import MIMEMultipart
13 
14 
15 def send_mail(file_new):
16     """
17     定義發送郵件
18     :param file_new:
19     :return: 成功:打印發送郵箱成功;失敗:返回失敗信息
20     """
21     f = open(file_new,'rb')
22     mail_body = f.read()
23     f.close()
24     #發送附件
25     con = configparser.ConfigParser()
26     con.read(setting.TEST_CONFIG,encoding='utf-8')
27     report = new_report(setting.TEST_REPORT)
28     sendfile = open(report,'rb').read()
29     # --------- 讀取config.ini配置文件 ---------------
30     HOST = con.get("user","HOST_SERVER")
31     SENDER = con.get("user","FROM")
32     RECEIVER = con.get("user","TO")
33     USER = con.get("user","user")
34     PWD = con.get("user","password")
35     SUBJECT = con.get("user","SUBJECT")
36 
37     att = MIMEText(sendfile,'base64','utf-8')
38     att["Content-Type"] = 'application/octet-stream'
39     att.add_header("Content-Disposition", "attachment", filename=("gbk", "", report))
40 
41     msg = MIMEMultipart('related')
42     msg.attach(att)
43     msgtext = MIMEText(mail_body,'html','utf-8')
44     msg.attach(msgtext)
45     msg['Subject'] = SUBJECT
46     msg['from'] = SENDER
47     msg['to'] = RECEIVER
48 
49     try:
50         server = smtplib.SMTP()
51         server.connect(HOST)
52         server.starttls()
53         server.login(USER,PWD)
54         server.sendmail(SENDER,RECEIVER,msg.as_string())
55         server.quit()
56         print("郵件發送成功!")
57     except Exception as  e:
58         print("失敗: " + str(e))
sendmail.py
 1 #!/usr/bin/env python
 2 # _*_ coding:utf-8 _*_
 3 __author__ = 'YinJia'
 4 
 5 import os,sys
 6 sys.path.append(os.path.dirname(os.path.dirname(__file__)))
 7 import shutil
 8 from config import setting
 9 from openpyxl import load_workbook
10 from openpyxl.styles import Font,Alignment
11 from openpyxl.styles.colors import RED,GREEN,DARKYELLOW
12 import configparser as cparser
13 
14 # --------- 讀取config.ini配置文件 ---------------
15 cf = cparser.ConfigParser()
16 cf.read(setting.TEST_CONFIG,encoding='UTF-8')
17 name = cf.get("tester","name")
18 
19 class WriteExcel():
20     """文件寫入數據"""
21     def __init__(self,fileName):
22         self.filename = fileName
23         if not os.path.exists(self.filename):
24             # 文件不存在,則拷貝模板文件至指定報告目錄下
25             shutil.copyfile(setting.SOURCE_FILE,setting.TARGET_FILE)
26         self.wb = load_workbook(self.filename)
27         self.ws = self.wb.active
28 
29     def write_data(self,row_n,value):
30         """
31         寫入測試結果
32         :param row_n:數據所在行數
33         :param value: 測試結果值
34         :return: 無
35         """
36         font_GREEN = Font(name='宋體', color=GREEN, bold=True)
37         font_RED = Font(name='宋體', color=RED, bold=True)
38         font1 = Font(name='宋體', color=DARKYELLOW, bold=True)
39         align = Alignment(horizontal='center', vertical='center')
40         # 獲數所在行數
41         L_n = "L" + str(row_n)
42         M_n = "M" + str(row_n)
43         if value == "PASS":
44             self.ws.cell(row_n, 12, value)
45             self.ws[L_n].font = font_GREEN
46         if value == "FAIL":
47             self.ws.cell(row_n, 12, value)
48             self.ws[L_n].font = font_RED
49         self.ws.cell(row_n, 13, name)
50         self.ws[L_n].alignment = align
51         self.ws[M_n].font = font1
52         self.ws[M_n].alignment = align
53         self.wb.save(self.filename)
writeexcel.py

接口測試用例編寫

 1 #!/usr/bin/env python
 2 # _*_ coding:utf-8 _*_
 3 __author__ = 'YinJia'
 4 
 5 import os,sys
 6 sys.path.append(os.path.dirname(os.path.dirname(__file__)))
 7 import unittest,requests,ddt
 8 from config import setting
 9 from lib.readexcel import ReadExcel
10 from lib.sendrequests import SendRequests
11 from lib.writeexcel import WriteExcel
12 
13 testData = ReadExcel(setting.SOURCE_FILE, "Sheet1").read_data()
14 
15 @ddt.ddt
16 class Demo_API(unittest.TestCase):
17     """發佈會系統"""
18     def setUp(self):
19         self.s = requests.session()
20 
21     def tearDown(self):
22         pass
23 
24     @ddt.data(*testData)
25     def test_api(self,data):
26         # 獲取ID字段數值,截取結尾數字並去掉開頭0
27         rowNum = int(data['ID'].split("_")[2])
28         # 發送請求
29         re = SendRequests().sendRequests(self.s,data)
30         # 獲取服務端返回的值
31         self.result = re.json()
32         # 獲取excel表格數據的狀態碼和消息
33         readData_code = int(data["status_code"])
34         readData_msg = data["msg"]
35         if readData_code == self.result['status'] and readData_msg == self.result['message']:
36             OK_data = "PASS"
37             WriteExcel(setting.TARGET_FILE).write_data(rowNum + 1,OK_data)
38         if readData_code != self.result['status'] or readData_msg != self.result['message']:
39             NOT_data = "FAIL"
40             WriteExcel(setting.TARGET_FILE).write_data(rowNum + 1,NOT_data)
41         self.assertEqual(self.result['status'], readData_code, "返回實際結果是->:%s" % self.result['status'])
42         self.assertEqual(self.result['message'], readData_msg, "返回實際結果是->:%s" % self.result['message'])
43 
44 if __name__=='__main__':
45     unittest.main()
testAPI.py

集成測試報告

 1 #!/usr/bin/env python
 2 # _*_ coding:utf-8 _*_
 3 __author__ = 'YinJia'
 4 
 5 
 6 import os,sys
 7 sys.path.append(os.path.dirname(__file__))
 8 from config import setting
 9 import unittest,time
10 from HTMLTestRunner import HTMLTestRunner
11 from lib.sendmail import send_mail
12 from lib.newReport import new_report
13 from db_fixture import test_data
14 from package.HTMLTestRunner import HTMLTestRunner
15 
16 def add_case(test_path=setting.TEST_CASE):
17     """加載全部的測試用例"""
18     discover = unittest.defaultTestLoader.discover(test_path, pattern='*API.py')
19     return discover
20 
21 def run_case(all_case,result_path=setting.TEST_REPORT):
22     """執行全部的測試用例"""
23 
24     # 初始化接口測試數據
25     test_data.init_data()
26 
27     now = time.strftime("%Y-%m-%d %H_%M_%S")
28     filename =  result_path + '/' + now + 'result.html'
29     fp = open(filename,'wb')
30     runner = HTMLTestRunner(stream=fp,title='發佈會系統接口自動化測試報告',
31                             description='環境:windows 7 瀏覽器:chrome',
32                             tester='Jason')
33     runner.run(all_case)
34     fp.close()
35     report = new_report(setting.TEST_REPORT) #調用模塊生成最新的報告
36     send_mail(report) #調用發送郵件模塊
37 
38 if __name__ =="__main__":
39     cases = add_case()
40     run_case(cases)
run_demo.py

測試結果展現

  • HTML測試結果報告:

  • Excel測試用例結果

  • 郵件收到的測試報告

相關文章
相關標籤/搜索