接口實戰---註冊之操做數據庫mysql
1、註冊手機號 是否已存在數據庫sql
背景:註冊成功的用例中,須要一個沒有被註冊過的手機號,那麼,就須要查詢數據,該手機號是否存在在數據庫裏數據庫
數據庫沒有:能夠註冊json
數據庫有:沒法註冊 再次動態生成手機號 while循環dom
其次,註冊成功後,要再次查詢數據庫,註冊的用戶信息是否存在測試
1.動態生成手機號url
動態生成的手機號,在數據庫中進行查詢。spa
若是能查到該條手機號的用戶信息,怎繼續循環生成手機號;excel
若是查不到該條手機號的用戶信息,則返回該手機號。------->將excel中的#phone#替換成該phone,就能夠進行requests請求了。code
def random_phone(self): """隨機生成動態手機號""" import random while True: phone = '1' + random.choice(['3','5','7','8','9']) #動態生成手機號後9位 for i in range(9): num = random.randint(0,9) phone += str(num) print(phone) db = mysql_handler.MysqlHandler( host= "120.78.128.25", port= 3306, user= "future", password="123456", charset= "utf8", ) sql_code = "SELECT * FROM futureloan.member WHERR mobile_phone={}".format(phone) phone_code = db.query(sql_code) if not phone_code: db.close() return phone db.close()
2.判斷動態生成的手機號是否在數據庫中
1)對於註冊成功的case,將mobile_phone設置爲特定字符,好比:#phone#。判斷讀取的case_info的data參數中是否有#phone#,若是有,則替換成動態生成的手機號phone。
if "#phone#" in case_info["data"]: phone = self.random_phone() #phone 去替換 #phone# case_info["data"] = case_info["data"].replace("#phone#",phone)
上面1,2 完整的代碼以下(register.py用例):
import json import os import unittest from pymysql.cursors import DictCursor from common.mysql_handler import MysqlHandler from config import config from common.excel_handler import ExcelHandler from common import requests_handler from common import yaml_handler from common import logging_handler import ddt #讀取excel #獲取yaml中excel文件名 data_yaml =yaml_handler.read_yaml(os.path.join(config.CONFIG_PATH,'config.yaml')) excel_name = data_yaml['excel']['file'] excel_file = os.path.join(config.EXCEL_PATH,excel_name) #獲取一個excel對象 exl=ExcelHandler(excel_file) excel_data = exl.read_data('register_v1') #獲取logger logger_config = data_yaml["logger"] logger = logging_handler.get_logger( name = logger_config['name'], file_name=os.path.join(config.LOG_PATH,logger_config['file']), log_level=logger_config['logger_level'], stream_level=logger_config['stream_level'], file_level=logger_config['file_level'] ) @ddt.ddt class TestRegister(unittest.TestCase): @classmethod def setUpClass(cls): print('執行一次前置條件。') @classmethod def tearDownClass(cls): print('執行一次後置條件。') @ddt.data(*excel_data) def test_register(self,case_info): if "#phone#" in case_info["data"]: phone = self.random_phone() #phone 去替換 #phone# case_info["data"] = case_info["data"].replace("#phone#",phone) logger.info('讀取case成功。') resp = requests_handler.visit_simple( url = case_info["url"], method=case_info["method"], json = json.loads(case_info["data"]), headers =json.loads(case_info["headers"]) ) print(resp) #斷言 try: for k,v in eval(case_info['expected']).items():#items循環字典 self.assertEqual(v,resp[k]) logger.info('斷言成功,測試經過。') except AttributeError as e: logger.error('斷言失敗,測試不經過。{}'.format(e)) raise e #生成手機號的實例方法 def random_phone(self): """隨機生成動態手機號""" import random while True: phone = '1'+random.choice(['3','5','7','8','9']) #動態生成手機號後9位 for i in range(0,9): num = random.randint(0,9) phone += str(num) print(phone) db = MysqlHandler( host= "120.78.128.25", port= 3306, user= "future", password="123456", charset= "utf8", cursorclass=DictCursor ) sql_code = "SELECT * FROM futureloan.member WHERE mobile_phone={}".format(phone) #查詢生成的phone是否在數據庫,phone_record是該手機號的用戶信息 phone_recode = db.query(sql_code) #返回的用戶信息不存在,返回該phone,去替換excel的#phone# if not phone_recode: db.close() return phone db.close() if __name__ == '__main__': pass
3.二次封裝------middleware---handler.py
上面的test_register.py中,獲取yaml,excel,logger數據能夠進行單獨封裝成handler,簡化test_register, 在test_register中調用handler便可。
♥封裝的handler應該放在哪一個目錄?------>common目錄是一些通用的模塊,不一樣的項目均可以直接拿來用,而handler中有依賴別的模塊,不能放在common目錄。
在項目中新建一個middleware文件夾,裏面存放封裝的handler.py
注意:在讀取excel表單時,每一個表單的名字不同,在handler.py中,只是封裝了exel對象,在test_register.py中,excel對象訪問excel_handler中的read_data()方法便可
import os from common import yaml_handler,excel_handler,logging_handler from config import config class Handler: """將yaml,excel,logger定義爲類屬性""" #獲取conf屬性 conf = config #一切皆對象,能夠將模塊直接賦值給conf這個屬性 #獲取yaml數據 yaml_data = yaml_handler.read_yaml(os.path.join(config.CONFIG_PATH,'config.yaml')) #獲取excel對象 __excel_file = yaml_data["excel"]["file"] __excel_path = os.path.join(config.EXCEL_PATH,__excel_file) excel = excel_handler.ExcelHandler(__excel_path) #獲取logger logger_config = yaml_data["logger"] logger = logging_handler.get_logger( name = logger_config["name"], file_name = os.path.join(config.LOG_PATH,logger_config["file"]), log_level=logger_config["logger_level"], stream_level=logger_config["stream_level"], file_level=logger_config["file_level"] )
上面的雙下劃線+屬性名,表示私有屬性,該屬性只能在這個類裏面使用,出了這個類,別的對象/類是訪問不了的。
簡化後的test_register.py
import json import unittest from pymysql.cursors import DictCursor from common.mysql_handler import MysqlHandler from common import requests_handler from middleware import handler import ddt #獲取excel數據 excel_data = handler.Handler.excel.read_data('register_v1') #獲取logger logger = handler.Handler.logger @ddt.ddt class TestRegister(unittest.TestCase): @classmethod def setUpClass(cls): print('執行一次前置條件。') @classmethod def tearDownClass(cls): print('執行一次後置條件。') @ddt.data(*excel_data) def test_register(self,case_info): if "#phone#" in case_info["data"]: phone = self.random_phone() #phone 去替換 #phone# case_info["data"] = case_info["data"].replace("#phone#",phone) logger.info('讀取case成功。') resp = requests_handler.visit_simple( url = case_info["url"], method=case_info["method"], json = json.loads(case_info["data"]), headers =json.loads(case_info["headers"]) ) print(resp) #斷言 try: for k,v in eval(case_info['expected']).items():#item循環字典 self.assertEqual(v,resp[k]) logger.info('斷言成功,測試經過。') except AttributeError as e: logger.error('斷言失敗,測試不經過。{}'.format(e)) raise e #生成手機號的實例方法 def random_phone(self): """隨機生成動態手機號""" import random while True: phone = '1'+random.choice(['3','5','7','8','9']) #動態生成手機號後9位 for i in range(0,9): num = random.randint(0,9) phone += str(num) print(phone) db = MysqlHandler( host= "120.78.128.25", port= 3306, user= "future", password="123456", charset= "utf8", cursorclass=DictCursor ) sql_code = "SELECT * FROM futureloan.member WHERE mobile_phone={}".format(phone) #查詢生成的phone是否在數據庫,phone_record是該手機號的用戶信息 phone_recode = db.query(sql_code) #返回的用戶信息不存在,返回該phone,去替換excel的#phone# if not phone_recode: db.close() return phone db.close() if __name__ == '__main__': pass
4.mysqlhandler二次封裝
在讀取數據庫數據時,每次讀取都要傳不少參數,因此能夠把讀取數據庫的操做進行再次封裝
在middlerware的handler.py模塊中,從新定義一個MysqlhandlerMid的類,並繼承MysqlHandler類,進行封裝
import os from pymysql.cursors import DictCursor from common import yaml_handler,excel_handler,logging_handler from config import config from common.mysql_handler import MysqlHandler class Handler: """將yaml,excel,logger定義爲類屬性""" #獲取conf屬性 conf = config #一切皆對象,能夠將模塊直接賦值給conf這個屬性 #獲取yaml數據 yaml_data = yaml_handler.read_yaml(os.path.join(config.CONFIG_PATH,'config.yaml')) #獲取excel對象 excel_file = yaml_data["excel"]["file"] excel_path = os.path.join(config.EXCEL_PATH,excel_file) excel = excel_handler.ExcelHandler(excel_path) #獲取logger logger_config = yaml_data["logger"] logger = logging_handler.get_logger( name = logger_config["name"], file_name = os.path.join(config.LOG_PATH,logger_config["file"]), log_level=logger_config["logger_level"], stream_level=logger_config["stream_level"], file_level=logger_config["file_level"] ) class MysqlHandlerMid(MysqlHandler): #讀取配置文件的選項 MysqlHandler """yaml中db的數據 db: host: "120.78.128.25" port: 3306 user: "future" password: "123456" charset: "utf8" """ def __init__(self): #初始化全部的配置項,從yaml中讀取 db_config = Handler.yaml_data["db"] super().__init__( host=db_config["host"], port = db_config["port"], user = db_config["user"], password=db_config["password"], charset=db_config["charset"], cursorclass=DictCursor )
2、註冊成功,檢查數據庫是否有註冊的用戶信息
@ddt.ddt class TestRegister(unittest.TestCase): @classmethod def setUpClass(cls): print('執行一次前置條件。') @classmethod def tearDownClass(cls): print('執行一次後置條件。') @ddt.data(*excel_data) def test_register(self,case_info): if "#phone#" in case_info["data"]: phone = self.random_phone() case_info["data"] = case_info["data"].replace("#phone#",phone) data = json.loads(case_info["data"]) logger.info("#####################正在執行register接口測試#####################") logger.info("正在執行第{}條的{}用例!".format(case_info["case_id"], case_info["title"])) logger.info("請求的數據是:{}".format(case_info)) resp = requests_handler.visit_simple( url = case_info["url"], method=case_info["method"], json = data, headers =json.loads(case_info["headers"]) ) print(resp) try: for k,v in eval(case_info['expected']).items():#item循環字典 self.assertEqual(v,resp[k]) logger.info('斷言成功,測試經過。') # 判斷註冊成功的手機號,是否存在數據庫中 logger.info("#####################正在查詢數據庫#####################") if resp["code"] == 0: db = handler.MysqlHandlerMiddle() sql_code = "select * from futureloan.member where mobile_phone = {}".format(data["mobile_phone"]) user_info = db.query(sql_code) db.close() self.assertTrue(user_info) test_result = "PASS" logger.info("第{}用例執行結果爲:{}".format(case_info["case_id"],test_result)) except AttributeError as e: logger.error('斷言失敗,測試不經過,錯誤爲:{}'.format(e)) test_result = "Failed" logger.error("第{}用例執行結果爲:{}".format(case_info["case_id"],test_result)) raise e