目錄html
Windows Python3環境搭建前端
python setup.py install
if x > 0: print("正數") elif x = 0: print("0") else: print("負數") def add(x,y): return x+y
x=1; y=2; print(x+y)
print("this line is too long, \ so I break it to two lines")
# 單行註釋 a = 1 '''這是一段 多行註釋''' def add(x, y): """加法函數:這是docstring(函數說明)""" pass
x=y=z=1
x,y = y,x
x+=1
x-=1
(不支持x++
, x--
)Python3中沒有常量java
str(12)
int("12")
float("1.23")
len("abcdefg")
"abcdefg".find("b"); "abcedfgg".index("g")
"AbcdeF".lower();"abcedF".upper()
isdigit("123")
,結果爲 True"aabcabc".count("a")
"".join(["a","b","c"])
會按空字符鏈接列表元素,獲得"abc""hello,java".replace("java", "python")
,將java 替換爲 python"a,b,c,d".split(",")
獲得["a", "b", "c", "d"]" this has blanks \n".strip()
獲得"this has balnks""Name: %s, Age: %d" % ("Lily", 12)
或"Name: %(name)s, Age: %(age)d" % {"name": "Lily", "age": 12}
"Name: {}, Age: {}".format("Lily", 12)
或"Name: {name}, Age: {age}".format(name="Lily",age=12)
"Name: ${name}, Age: ${age}".safe_substitude(name="Lily",age=12)
tpl='''<html> <head><title>{title}</title></head> <body> <h1>{title}</h1> <table border=1px> <tr> <th>序號</th> <th>用例</th> <th>結果</th> </tr> {trs} </table> </body> </html> ''' tr='''<tr><td>{sn}</td> <td>{case_name}</td> <td>{result}</td> ''' title="自動化測試報告" case_results = [("1", "test_add_normal", "PASS"),("2", "test_add_negative", "PASS"), ("3", "test_add_float", "FAIL")] trs='' for case_result in case_results: tr_format = tr.format(sn=case_result[0], case_name=case_result[1], result=case_result[2]) trs += tr_format html = tpl.format(title=title, trs=trs) f = open("report.html", "w") f.write(html) f.close()
結果預覽node
序號 | 用例 | 結果 |
---|---|---|
1 | test_add_normal | PASS |
2 | test_add_negative | PASS |
3 | test_add_float | FAIL |
列表元素支持各類對象的混合,支持嵌套各類對象,如
["a", 1, {"b": 3}, [1,2,3]]
python
l = [1, "hello", ("a", "b")]
a = l[0] # 經過索引獲取
l.append("c");l.extend(["d","e"]);l+["f"]
l.pop() # 按索引刪除,無參數默認刪除最後一個;l.remove("c") # 按元素刪除
l[1]="HELLO" # 經過索引修改
for i in l: print(i)
s="abcdefg"; r=''.join(reversed(a))
a=("hello",)
- 由於Python中()還有分組的含義,不加","會識別爲字符串字符串/列表/元組統稱爲序列, 有類似的結構和操做方法mysql
l[3];l[-1]
s="abcdefg";r=s[::-1]
for item in l: print(item)
for index in range(len(l)): print(l[index])
for i,v in enumerate(l): print((i,v))
"abc"+"123";[1,2,3]+[4,5];[1,2,3].extend([4,5,6,7])
類型互轉: str()/list()/tuple()git
list轉str通常用join(), str轉list通常用split()正則表達式
l_new=sorted(l);l_new2=reversed(l)
a = set([1,2,3])
l=[1,2,3,1,4,3,2,5,6,2];l=list(set(l))
(因爲集合無序,沒法保持原有順序)d = {"a":1, "b":2}
a = d['a']
或a = d.get("a") # d中不存在"a"元素時不會報錯
d["c"] = 3; d.update({"d":5, "e": 6}
d.pop("d");d.clear() # 清空
d.has_key("c")
for key in d:
或for key in d.keys():
for value in d.values():
for item in d.items():
案例: 更新接口參數 api = {"url": "/api/user/login": data: {"username": "張三", "password": "123456"}},將username修改成"李四"
api['data']['username'] = "李四"
或 api['data'].update({"username": "李四"})
redis
哈希與可哈希元素算法
可哈希元素: 爲了保證每次計算出的地址相同, 要求元素長度是固定的, 如數字/字符串/只包含數字,字符串的元組, 這些都是可哈希元素
6種類型簡單的特色總結
if x>0: print("正數") elif x=0: print("0") else: print("負數")
max = a if a > b else b
案例: 判斷一個字符串是不ip地址
ip_str = '192.168.100.3'
ip_list = ip_str.split(".") # 將字符串按點分割成列表
is_ip = True # 先假設ip合法
if len(ip_list) != 4:
is_ip= False
else:
for num in ip_list:
if not isdigit(num) or not 0 <= int(num) <= 255:
is_ip = False
if is_ip:
print("是ip")
else:
print("不是ip")
使用map函數的實現方法(參考):
def check_ipv4(str):
ip = str.strip().split(".")
return False if len(ip) != 4 or False in map(lambda x:True if x.isdigit() and 0<= int(x) <= 255 else False, ip) else True
while 循環
html/xml/config/csv也能夠按文本文件處理
f =open("test.txt")
或f =open("test.txt","r", encoding="utf-8")
或with open("test.txt) as f: # 上下文模式,出結構體自動關閉文件
r+/rb+, w+/wb+, a+/ab+: 讀寫,區別在於, r+/w+會清空文件再寫內容, r+文件不存在會報錯, a+不清空原文件,進行追加, w+/a+文件不存在時會新建文件
文件是可迭代的,能夠直接用 for line in f: 遍歷
def add(x, y): # 定義函數 return x+y print(add(1,3)) # 調用函數
案例: 用戶註冊/登陸函數
users = {"張三": "123456"} def reg(username, password): if users.get(username): # 若是用戶中存在username這個key print("用戶已存在") else: users[username] = password # 向users字典中添加元素 print("添加成功") def login(username, password) if not users.get(username): print("用戶不存在") elif users['username'] == password: print("登陸成功") else: print("密碼錯誤")
def func(*args, **kwargs):
能夠接受任意長度和格式的參數def(x:int, y:int) -> int: # x,y爲int型,函數返回爲int型,只是註釋,參數格式非法不會報錯 return x+y
def a(): print("I'm a") def deco(func): print("call from deco") func() deco(a) # 輸出"call from deco"並調用a(),輸出"I'm a"
def a(): a_var = 1 def b:() # 嵌套函數 a_var += 1
案例: 求N!
def factorial(n): return 1 if n == 0 or n == 1 else n * factorial(n-1)
支持一次導入多個
一個文件夾爲一個包(Python3,文件夾中不須要創建__init__.py文件)
for root,dirs,files in os.walk("./case/"): for file in files: if file.startswith("test") and file.endswith(".py"): print(os.path.join(root, file)
def buddle_sort(under_sort_list): l = under_sort_list for j in range(len(l)): for i in range(len(l)-j-1): if l[i] > l[i+1]: l[i], l[i+1] = l[i+1], l[i]
def quick_sort(l): if len(l) < 2: return l # 若是列表只有一個元素, 返回列表(用於結束迭代) else: pivot = l[0] # 取列表第一個元素爲基準數 low = [i for i in l[1:] if i < pivot] # 遍歷l, 將小於基準數pivot的全放入low這個列表 high = [i for i in l[1:] if i >= pivot ] return quick_sort(low) + [pivot] + quick_sort(high) # 對左右兩部分分別進行迭代
def bin_search(l, n): # l爲有序列表 low, high = 0, len(l) - 1 # low,high分別初始化爲第一個/最後一個元素索引(最小/最大數索引) while low < high: mid = (high-low) // 2 # 地板除,保證索引爲整數 if l[mid] == n: return mid elif l[mid] > n: # 中間數大於n則查找前半部分, 重置查找的最大數 high = mid -1 else: # 查找後半部分, 重置查找的最小數 low = mid + 1 return None # 循環結束沒有return mid 則說明沒找到
做業
首先要安裝flask包:
pip install flask
from flask import Flask, request
app=Flask(__name__)
a=request.values.get("a");b=request.values.get("b")
sum = int(a) + int(b)
return str(sum) # http通常使用字符串傳輸數據
@app.route("/add/", methods=["GET"]) # 寫到函數上面(裝飾器)
if __name__ == "__main__": app.run()
保存爲add.py
, 打開命令行,進入add.py所在目錄,運行python add.py
完整代碼
# 1. 導入包 from flask import Flask, request # 2. 實例化一個 app = Flask(__name__) # 3. 編寫一個接口處理方法 @app.route("/add/", methods=["GET","POST"]) # 4. 掛載路由(指定接口的url路徑), 聲明接口接受的方法 def add(): # 3.1 從請求中獲取參數 # request.values {"a": "1", "b": "2"} a = request.values.get("a") b = request.values.get("b") # 3.2 業務操做 sum = int(a) + int(b) # 3.3 組裝響應並返回 return str(sum) # 5. 運行接口 if __name__ == '__main__': app.run() # 默認5000端口,能夠指定端口app.run(port=50001)
from flask import Flask, request, jsonify app = Flask(__name__) @app.route("/api/sub/", methods=["POST"]) def sub(): if not request.json: # 若是請求數據類型非json return jsonify({"code": "100001", "msg": "請求類型錯誤", "data": None}) if not "a" in request.json or not "b" in request.json: # 若是參數中沒有a或者沒有b return jsonify({"code": "100002", "msg": "參數缺失", "data": None}) a = request.json.get("a") b = request.json.get("b") result = str(float(a) - float(b)) # 使用float支持浮點數相減 return jsonify({"code": "100000", "msg": "成功", "data": result}) # 使用jsonify將字典數據轉換爲json類型的相應數據 if __name__ == '__main__': app.run()
接口測試是測試系統組件間接口的一種測試。
接口測試主要用於檢測外部系統與系統之 間以及內部各個子系統之間的交互點。測試的重點是要檢查數據的交換,傳遞和控制管理過 程,以及系統間的相互邏輯依賴關係等。
Excel/TestLink/禪道
TestCase | Url | Method | DataType | a | b | Excepted | Actual | Status |
---|---|---|---|---|---|---|---|---|
test_add_normal | /api/add/ | GET | Url | 3 | 5 | 8 | ||
test_add_zero | /api/add/ | POST | FORM | 0 | 0 | 0 | ||
test_add_negetive | /api/add/ | POST | FORM | -3 | 5 | 2 | ||
test_add_float | /api/add/ | POST | FORM | 3.2 | 5.2 | 8.4 | ||
test_add_null | /api/add/ | POST | FORM | 0 |
須要安裝三方包:requests pytest pytest-html
pip install requests pytest pytest-html
url = 'http://127.0.0.1:5000/add/'
params = {"a":3, "b":5} # get請求url參數, 字典格式
data = {"a":3, "b":5} # post請求請求數據, 字典格式
resp = requests.get(url=url, params=params)
resp = requests.post(url=url,data=data)
# 1. 導入包 import requests base_url = "http://127.0.0.1:5005" # 2. 組裝請求 def test_add_normal(): # url 字符串格式 url = base_url + "/add/" # data {} 字典格式 data = {"a": "1", "b": "2"} # 3. 發送請求,獲取響應對象 response = requests.post(url=url, data=data) # 4. 解析響應 # 5. 斷言結果 assert response.text == '3'
請求格式爲json
三處不一樣:
data=json.dumps(data)
(使用json.dumps須要import json)完整代碼:
# 1. 導入包 import requests import json base_url = "http://127.0.0.1:5005" def test_sub_normal(): url = base_url + "/api/sub/" headers = {"Content-Type": "application/json"} # 1. 必須經過headers指定請求內容類型爲json data = {"a": "4", "b": "2"} data = json.dumps(data) # 2. 序列化成字符串 response = requests.post(url=url, headers=headers, data=data) # 3. 響應解析 # 響應格式爲: {"code":"100000", "msg": "成功", "data": "2"} resp_code = response.json().get("code") resp_msg = response.json().get("msg") resp_data = response.json().get("data") # 斷言 assert response.status_code == 200 assert resp_code == "100000" assert resp_msg == "成功" assert resp_data == "2"
補充1: 感覺Python黑科技之exec()動態生成用例:
數據文件: test_add_data.xls
TestCase | Url | Method | DataType | a | b | Excepted | Actual | Status |
---|---|---|---|---|---|---|---|---|
test_add_normal | /api/add/ | GET | Url | 3 | 5 | 8 | ||
test_add_zero | /api/add/ | POST | FORM | 0 | 0 | 0 | ||
test_add_negetive | /api/add/ | POST | FORM | -3 | 5 | 2 | ||
test_add_float | /api/add/ | POST | FORM | 3.2 | 5.2 | 8.4 | ||
test_add_null | /api/add/ | POST | FORM | 0 |
import requests import xlrd base_url = 'http://127.0.0.1:5005' # 1.打開excel wb = xlrd.open_workbook("test_add_data.xls") # 2. 獲取sheet sh = wb.sheet_by_index(0) # wb.sheet_by_name("Sheet1") # 行數 sh.nrows 列數 sh.ncols # 獲取單元格數據 # print(sh.cell(1,0).value) # print(sh.nrows) tpl = '''def {case_name}(): url = base_url + '/add/' data = {{"a": "{a}", "b":"{b}"}} response = requests.get(url=url, data=data) assert response.text == '{expected}' ''' for row in range(1, sh.nrows): case_name = sh.cell(row,0).value a = sh.cell(row, 4).value b = sh.cell(row, 5).value expected = sh.cell(row, 6).value case = tpl.format(case_name=case_name, a=a, b=b, expected=expected) exec(case)
動態生成的用例支持pytest發現和執行
pytest
(運行全部test開頭的.py用例)或pytest test_user.py
(只運行該腳本中用例)pytest -q test_user.py
pytest test_user.py --html=test_user_report.html
pytest test_user.py --resultlog=run.log
requests.session() # 用來保持session會話,如登陸狀態
url="http://127.0.0.1:5000/add/"
headers={"Content-Type": "application/json"}
params={"a":"1":"b":"2"}
data={"a":"1":"b":"2"}
files={"file": open("1.jpg")}
timeout: 超時時間,單位s, str, 超過期間會報超時錯誤```requests.get(url=url,params=params,timeout=10)
resp.content # 響應內容, 二進制類型
requests.get(url=url, params=params)
requests.post(url=url, data=data)
requests.post(url=url, headers={"Content-Type": "application/json"}, data=json.dumps(data)
requests.post(url=url, files={"file": open("1.jpg", "rb")})
session=requests.session(); session.post(...); session.post()
resp=requests.get(...);token=resp.split("=")[1];resp2=requests.post(....token...)
import hashlib def md5(str): m = hashlib.md5() m.update(str.encode('utf8')) return m.hexdigest() #返回摘要,做爲十六進制數據字符串值 def makeSign(params): if not isinstance(params, dict): print("參數格式不正確,必須爲字典格式") return None if 'sign' in params: params.pop('sign') sign = '' for key in sorted(params.keys()): sign = sign + key + '=' + str(params[key]) + '&' sign = md5(sign + 'appsecret=' + appsecret) params['sign'] = sign return params
data = makeSign(data);resp = requests.post(url=url, headers=headers, data=json.dumps(data))
Mock和單元測試的樁(Stub)相似, 是經過創建一個模擬對象來解決依賴問題的一種方法.
應用場景:
1. 依賴第三方系統接口, 沒法調試
2. 所依賴接口還沒有具有(先後端分離項目, 前端開發時,後端接口還沒有開發完畢)
3. 所依賴接口須要修改或不穩定
4. 依賴接口較多或場景複雜, 所依賴接口不是主要驗證目標的
解決方法:
1. 經過Mock.js/RAP/RAP2來動態生成, 模擬接口返回數據
2. 本身使用Flask你們簡單的Mock接口
3. 使用Python自帶的mock庫
...
pip install suds
from suds.client import Client ip = '127.0.0.1' port = '5001' client = Client("http://%s:%s/?wsdl" % (ip, port)) result = client.service.addUser("張790", "123456") print(result)
import xmlrpc.client user = xmlrpc.client.ServerProxy('http://127.0.0.1:5002') print(user.getAll())
參數化是用來解決動態參數問題的
import csv csv_data = csv.reader(open('data/reg.csv'))
import configparser cf=configparser.ConfigParser() cf.read('data/reg.config', encoding='utf-8') cf.sections() cf.options(section) cf.get(section,option)
import json with open('data/reg.json', encoding='utf-8') as f: json_data = json.loads(f) #json_data爲列表或字典
json的序列化和反序列化 需求:python的字典/列表等數據格式爲內存對象,須要作存儲(持久化)和進行數據交換 序列化: 從內存對象到可存儲數據, 方便存儲和交換數據 json.dumps: 列表/字典/json->字符串 ```str_data = json.dumps(dict_data)``` json.dump: 列表/字典/json->數據文件 ```json.dump(dict_data, open(data_file, "w"))``` 反序列化: 從存儲數據到內存對象 json.loads: 字符串->字典/列表```json.loads('{"a":1,"b":2}') #獲得字典{"a":1,"b":2}``` json.load: json數據文檔->列表/字典```dict_data = json.load(open('data.json'))```
pip install xlrd
import xlrd wb=xlrd.open_workbook("data/reg.xlsx") sh=wb.sheet_by_index(0) sh=wb.sheet_by_name('Sheet1") sh.nrows sh.ncols sh.cell(x,y).value
from xml.dom.minidom import parse dom=parse('data/reg.xml') root=dom.documentElement user_nodes=root.getElementsByTagName("user") user_node.getAttribute('title') user_node.hasAttribute('title') name_node=user_node.getElementsByTagName('name')[0] name=name_node.childNodes[0].data
案例1: 自動執行excel用例並將結果回寫excel
數據文件: test_user.xlsx
TestCase | Url | Method | DataType | Data | Excepted | Resp.text | Status |
---|---|---|---|---|---|---|---|
test_user_reg_normal | /api/user/reg/ | POST | JSON | {"name":"九小1","passwd": "123456"} | resp.json()["code"]=="100000" | ||
test_user_login_normal | /api/user/login/ | POST | FORM | {"name":"九小1","passwd": "123456"} | "成功" in resp.text |
import xlrd from xlutils.copy import copy import json import requests import sys base_url = "http://127.0.0.1:5000" def run_excel(file_name, save_file="result.xls"): wb=xlrd.open_workbook(file_name) sh=wb.sheet_by_index(0) wb2 = copy(wb) sh2 = wb2.get_sheet(0) for i in range(1,sh.nrows): url = base_url + sh.cell(i,1).value data = json.loads(sh.cell(i,4).value) headers = {} method = sh.cell(i,2).value data_type = sh.cell(i,3).value excepted = sh.cell(i,5).value if data_type.lower() == 'json': data = json.dumps(data) headers = {"Content-Type":"application/json"} if method.lower() == "get": resp = requests.get(url=url,headers=headers) else: resp = requests.post(url=url,headers=headers,data=data) if eval(excepted): status = "PASS" else: status = "FAIL" sh2.write(i,6, resp.text) sh2.write(i,7, status) wb2.save(save_file) print("保存成功") if __name__ == "__main__": if len(sys.argv)==2: run_excel(sys.argv[1]) elif len(sys.argv)>2: run_excel(sys.argv[1],sys.argv[2]) else: print("調用格式: python run_excel.py 用例文件 輸出文件")
保存腳本爲run_excel.py, (最好和數據文件test_user.xlsx放在同一目錄下), 在腳本所在目錄打開命令行,運行
python run_excel.py test_user.xlsx
生成的result.xls預覽
TestCase | Url | Method | DataType | Data | Excepted | Resp.text | Status |
---|---|---|---|---|---|---|---|
test_user_reg_normal | /api/user/reg/ | POST | JSON | {"name":"九小1","passwd": "123456"} | resp.json()["code"]=="100000" | {"code":"100001","data":{"name":"\u4e5d\u5c0f1","passwod":"e10adc3949ba59abbe56e057f20f883e"},"msg":"\u5931\u8d25\uff0c\u7528\u6237\u5df2\u5b58\u5728"} | FAIL |
test_user_login_normal | /api/user/login/ | POST | FORM | {"name":"九小1","passwd": "123456"} | "成功" in resp.text | <h1>登陸成功</h1> |
PASS |
import random
隨機姓名的實現: #隨機漢字: chr(random.randint(0x4e00, 0x9fbf)) name=random.choice(['趙','錢','孫','李','周'])+chr(random.randint(0x4e00, 0x9fbf)
隨機2個字符拼接: ''.join(random.sample('abcdefg',2)
斷言/檢查點是用來自動驗證接口返回數據/業務操做的結果是否知足預期
re.complie()
從數據庫讀取數據,驗證數據庫數據是否符合預期
MySQL斷言
pip install pymysql
import pymysql
conn = pymysql.connect(host='',port=3306,db='',user='',passwd='',charset='utf8')
cur=conn.cursor()
cur.execute("select * from user")
cur.close();conn.close()
PostgreSQL
pip install pyscopg2
import pyscopg2 conn=pyscopg2.connect(host='',port='',dbname='',user='',password='') # 注意是dbname, password cur=conn.curser() cur.execute("...") cur.fetchall()
Oracle
pip install cx_Oracle
...
Mongodb
pip install pymongo
from pymongo import MongoClient conn = MongoClient('', 27017) db = conn.mydb my_set = db.test_set for i in my_set.find({"name": "張三"}): print(i) print(my_set.findone({"name": "張三"}))
Redis斷言
pip install redis
import redis r = redis.Redis(host='192.168.100.198', port=6379, password='!QE%^E2sdf23RGF@ml239', db=0) print(r.dbsize()) print(r.get('package')) print(r.hget('event_order_advance_008aea6a62115ec4923829ee09f76a9c18243f5d', 'user'))
pip install paramiko
import paramiko client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect('192.168.100.241', 22, username='root', password='1234567', timeout=4) stdin, stdout, stderr = client.exec_command('cat /proc/meminfo') print(stdout.read()) client.close()
完整用例:
import requests import pytest import json import hashlib import re def md5(string): m = hashlib.md5() m.update(string.encode('utf8')) return m.hexdigest() def makeSign(data): sign='' for key in sorted(data.keys()): sign += key + '=' + str(data[key]) + '&' sign += 'appsecret=NTA3ZTU2ZWM5ZmVkYTVmMDBkMDM3YTBi' data['sign'] = md5(sign) return data class DB(): def __init__(self): # 創建鏈接 self.conn = pymysql.connect(host='localhost',port=3307,user='root',passwd='',db='api',charset='utf8') # 創建一個遊標 self.cur = self.conn.cursor() def __del__(self): self.cur.close() self.conn.close() def getUserByName(self, name): self.cur.execute("select * from user where name='%s'" % name) return self.cur.fetchone() def checkUser(self, name, passwd): user = self.getUserByName(name) if user: if user[2] == md5(passwd): return True else: return False else: return None class TestUser(): # pytest識別不能用__init__方法 base_url = 'http://127.0.0.1:5000' db = DB() def test_login(self): url = self.base_url + '/api/user/login/' data = {"name": "張三", "passwd": "123456"} resp = requests.post(url=url, data=data) #斷言 assert resp.status_code == 200 assert '登陸成功' in resp.text def test_reg(self): url = self.base_url + '/api/user/reg/' headers = {"Content-Type": "application/json"} data = {'name': '張10', 'passwd': '123456'} resp = requests.post(url=url, headers=headers, data=json.dumps(data)) #斷言 assert resp.json()['code'] == '100000' assert resp.json()['msg'] == '成功' assert self.db.getUserByName('張10') def test_uploadImage(self): url = self.base_url + '/api/user/uploadImage/' files = {'file': open("複習.txt")} resp = requests.post(url=url, files=files) #斷言 assert resp.status_code == 200 assert '成功' in resp.text # todo 服務器斷言 def test_getUserList(self): session = requests.session() login_url = self.base_url + '/api/user/login/' login_data = {"name": "張三", "passwd": "123456"} session.post(url=login_url, data=login_data) url = self.base_url + '/api/user/getUserList/' resp = session.get(url=url) #斷言 assert resp.status_code == 200 assert '用戶列表' in resp.text assert re.findall('\w{32}',t2) != [] def test_updateUser(self): session = requests.session() # 接口依賴的接口須要用session get_token_url = self.base_url + '/api/user/getToken/' params = {"appid": '136425'} token_resp = session.get(url=get_token_url, params=params) assert re.findall('token=\w{32}$') token = token_resp.text.split('=')[1] url = self.base_url + '/api/user/updateUser/?token=' + token data = {'name': '張三', 'passwd': '234567'} headers = {"Content-Type": "application/json"} resp = session.post(url=url, headers=headers, data=json.dumps(data)) #斷言 assert resp.status_code == 200 assert resp.json()['code'] == '100000' assert resp.json()['msg'] == '成功' assert self.db.checkUser('張三', '234567') def test_delUser(self): url = self.base_url + '/api/user/delUser/' headers = {"Content-Type": "application/json"} data = {'name': '張10', 'passwd': '123456'} data = makeSign(data) resp = requests.post(url=url, headers=headers, data=json.dumps(data)) #斷言 assert resp.status_code == 200 assert resp.json()['code'] == '100000' assert resp.json()['msg'] == '成功' assert not self.db.getUserByName('張10') if __name__ == '__main__': t = TestUser() # t.test_updateUser() # t.test_updateUser() t.test_delUser() # pytest.main("-q test_user2.py")
- case: 測試用例目錄 - user: (用戶模塊) - test_user.py: 測試用例 - case.py: 用例公共方法 - data: 數據文件目錄 - test_user_data.xlsx: 測試用例數據文件 - conf: 配置文件目錄 - default.conf: 默認配置文件 - report: pytest生成的報告保存路徑 - log: log保存路徑,按天生成log - common: 公共方法目錄 - config.py: 配置文件讀取 - data.py: 數據文件讀取 - db.py: 數據庫鏈接 - log.py: 日誌配置 - send_email.py: 發送郵件配置
[runtime] log_level=debug report_dir=report log_dir=log timeout=10 [server] test = http://127.0.0.1:5000 stage = http://127.0.0.1:6000 prod = http://127.0.0.1:7000 [db_test] host = localhost port = 3307 db = api user = root passwd = [db_stage] [db_prod] [email] server = smtp.sina.com user = test_results@sina.com pwd = ****** subject = Api Test Ressult receiver = superhin@126.com
TestCase | Url | Method | DataType | Data | Code | Msg |
---|---|---|---|---|---|---|
test_reg_normal | /api/user/reg/ | POST | JSON | {"name": "{NAME}", "passwd": "123456"} | 100000 | 成功 |
TestCase | Url | Method | DataType | Data | ResponseText |
---|---|---|---|---|---|
test_login_normal | /api/user/login/ | POST | FORM | {"name": "張三", "passwd": "123456"} | 登陸成功 |
checkUser | select * from user where name={NAME} |
---|---|
checkUserPasswd | select * from user where name={NAME} and passwd={PASSWD} |
""" 1. 從配置文件中獲取各個段信息 2. 返回一個項目的絕對路徑 """ import os import configparser # 相對導入包的問題 pro_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) class Config(object): def __init__(self, filename="default.conf"): self.cf = configparser.ConfigParser() self.cf.read(os.path.join(pro_path,"conf",filename)) def get_runtime(self, option): return self.cf.get("runtime", option) def get_server(self, option): return self.cf.get("server", option) def get_db_test(self, option): return self.cf.get("db_test", option) def get_email(self, option): return self.cf.get("email",option) if __name__ == "__main__": c = Config() print(c.get_runtime("log_level")) print(c.get_server("test"))
""" 1. 從Excel中讀取接口的數據 2. 讀取Sql命令 """ import xlrd import sys import os sys.path.append("..") from common.config import pro_path class Data(object): def __init__(self, filename): data_file_path = os.path.join(pro_path,"data",filename) self.wb = xlrd.open_workbook("../data/test_user_data.xlsx") def get_case(self,sheet_name, case_name): sh = self.wb.sheet_by_name(sheet_name) for i in range(1, sh.nrows): if sh.cell(i,0).value == case_name: return sh.row_values(i) print("用例名未找到") return None def get_sql(self, sql_name): sh = self.wb.sheet_by_name("SQL") for i in range(sh.nrows): if sh.cell(i,0).value == sql_name: return sh.cell(i,1).value print("sql未找到") return None if __name__ == "__main__": d = Data("test_user_data.xlsx") print(d.get_case("reg","test_reg_normal")) print(d.get_sql("checkUser"))
""" 1. 從配置文件中讀取數據庫配置 2. 鏈接數據庫 3. 執行sql並返回全部結果 """ import sys import pymysql sys.path.append("..") from common.config import Config class DB(object): def __init__(self): c = Config() self.conn = pymysql.connect(host=c.get_db_test("host"), port=int(c.get_db_test("port")), db=c.get_db_test("db"), user=c.get_db_test("user"), passwd=c.get_db_test("passwd"), charset="utf8") self.cur = self.conn.cursor() def do_sql(self, sql): self.cur.execute(sql) return self.cur.fetchall() def __del__(self): self.cur.close() self.conn.close() if __name__ == "__main__": db = DB() print(db.do_sql("select * from user"))
""" 1. 配置log輸出格式 time - loglevel - file - func - line - msg 2. 支持輸出到log文件及屏幕 3. 支持返回一個logger,讓其餘模塊調用 """ import sys sys.path.append("..") from common.config import Config, pro_path import time import logging import os class Log(): @classmethod def config_log(cls): cf = Config() log_dir = os.path.join(pro_path, cf.get_runtime("log_dir")) today = time.strftime("%Y%m%d", time.localtime(time.time())) log_file = os.path.join(log_dir, today+".log") # 獲取一個標準的logger, 配置loglevel cls.logger = logging.getLogger() cls.logger.setLevel(eval("logging." + cf.get_runtime("log_level").upper())) # 創建不一樣handler fh = logging.FileHandler(log_file, mode="a",encoding=‘utf-8’) ch = logging.StreamHandler() # 定義輸出格式 ft = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s") fh.setFormatter(ft) ch.setFormatter(ft) # 把定製handler 添加到咱們logger cls.logger.addHandler(fh) cls.logger.addHandler(ch) @classmethod def get_logger(cls): cls.config_log() return cls.logger if __name__ == "__main__": l= Log.get_logger() l.info("abc") l.debug("hello, debug")
""" 1. 從配置文件中讀取stmp配置 2. 從report文件夾下打開report.html,發送郵件 """ import smtplib from email.mime.text import MIMEText import os import sys sys.path.append("..") from common.config import Config, pro_path from common.log import Log def send_email(report_name): cf = Config() logger = Log.get_logger() report_file = os.path.join(pro_path, cf.get_runtime("report_dir"),report_name) with open(report_file, "rb") as f: body = f.read() # 格式化email正文 msg = MIMEText(body, "html", "utf-8") # 配置email頭 msg["Subject"] = cf.get_email("subject") msg["From"] = cf.get_email("user") msg["To"] = cf.get_email("receiver") # 鏈接smtp服務器,發送郵件 smtp = smtplib.SMTP() smtp.connect(cf.get_email("server")) smtp.login(cf.get_email("user"),cf.get_email("pwd")) smtp.sendmail(cf.get_email("user"), cf.get_email("receiver"), msg.as_string()) print("郵件發送成功") if __name__ == "__main__": send_email("report.html")
""" 1. 加載數據 2. 發送接口 3. 爲用例封裝一些方法 """ import sys sys.path.append("..") from common.log import Log from common.config import Config from common.db import DB from common.data import Data import json import requests class Case(object): def __init__(self): self.logger = Log.get_logger() self.cf = Config() def load_data(self, data_file): self.data = Data(data_file) def set_env(self, env): self.env = env def run_case(self, sheet_name, case_name, var={}): case_data = self.data.get_case(sheet_name, case_name) url = self.cf.get_server(self.env) + case_data[1] data = case_data[4].format(**var) if case_data[3].lower() == "form": data = json.loads(data) headers = {} else: headers = {"content-type": "application/json"} if case_data[2].lower() == "get": resp = requests.get(url=url) else: resp = requests.post(url=url, headers=headers, data=data) return resp.text def check_response(self): pass def check_db(self, sql_name, vars={}): sql = self.data.get_sql(sql_name).format(**vars) return self.db.exec_sql(sql) if __name__ == "__main__": c = Case() c.set_env("test") c.load_data("test_user_data.xlsx") r = c.run_case("login", "test_login_normal") print(r)
import sys import random import pytest sys.path.append("../..") from case.case import Case case = Case() def setup_module(module): case.set_env('dev') case.load_data('test_user_data.xlsx') def test_login_normal(): result case.run("login", "test_login_normal") if __name__ == '__main__': pytest.main(["-q", "test_user.py"])
import os import time from util.config import Config from util.e_mail import send_email import pytest def main(): cf = Config() report_dir = cf.get_report_dir() now = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time())) report_name = os.path.join(report_dir, 'report_' + now + '.html') pytest.main(["-q", "case", "--html=" + report_name]) send_email(report_name) if __name__ == '__main__': main()