第23節課 接口實戰-項目結合數據庫驗證 (註冊接口 動態生成手機號~對比數據庫)

接口實戰---註冊之操做數據庫mysql

1、註冊手機號 是否已存在數據庫sql

背景:註冊成功的用例中,須要一個沒有被註冊過的手機號,那麼,就須要查詢數據,該手機號是否存在在數據庫裏數據庫

數據庫沒有:能夠註冊json

數據庫有:沒法註冊  再次動態生成手機號   while循環dom

 

其次,註冊成功後,要再次查詢數據庫,註冊的用戶信息是否存在測試

 

1.動態生成手機號url

動態生成的手機號,在數據庫中進行查詢。spa

若是能查到該條手機號的用戶信息,怎繼續循環生成手機號;excel

若是查不到該條手機號的用戶信息,則返回該手機號。------->將excel中的#phone#替換成該phone,就能夠進行requests請求了。code

    def random_phone(self):
        """隨機生成動態手機號"""
        import random
        while True:
            phone = '1' + random.choice(['3','5','7','8','9'])

            #動態生成手機號後9位
            for i in range(9):
                num = random.randint(0,9)
                phone += str(num)
                print(phone)

            db = mysql_handler.MysqlHandler(
                host= "120.78.128.25",
                port= 3306,
                user= "future",
                password="123456",
                charset= "utf8",
            )
            sql_code = "SELECT * FROM futureloan.member WHERR mobile_phone={}".format(phone)
            phone_code = db.query(sql_code)
            if not phone_code:
                db.close()
                return phone
            db.close()

 

2.判斷動態生成的手機號是否在數據庫中

1)對於註冊成功的case,將mobile_phone設置爲特定字符,好比:#phone#。判斷讀取的case_info的data參數中是否有#phone#,若是有,則替換成動態生成的手機號phone。

        if "#phone#" in case_info["data"]:
            phone = self.random_phone()
            #phone 去替換 #phone#
            case_info["data"] = case_info["data"].replace("#phone#",phone)

 

上面1,2 完整的代碼以下(register.py用例):

import json
import os
import unittest

from pymysql.cursors import DictCursor

from common.mysql_handler import MysqlHandler
from config import config
from common.excel_handler import ExcelHandler
from common import requests_handler
from common import yaml_handler
from common import logging_handler
import ddt

#讀取excel
#獲取yaml中excel文件名
data_yaml =yaml_handler.read_yaml(os.path.join(config.CONFIG_PATH,'config.yaml'))
excel_name = data_yaml['excel']['file']

excel_file = os.path.join(config.EXCEL_PATH,excel_name)
#獲取一個excel對象
exl=ExcelHandler(excel_file)
excel_data = exl.read_data('register_v1')

#獲取logger
logger_config = data_yaml["logger"]
logger = logging_handler.get_logger(
    name = logger_config['name'],
    file_name=os.path.join(config.LOG_PATH,logger_config['file']),
    log_level=logger_config['logger_level'],
    stream_level=logger_config['stream_level'],
    file_level=logger_config['file_level']
)

@ddt.ddt
class TestRegister(unittest.TestCase):

    @classmethod
    def setUpClass(cls):
        print('執行一次前置條件。')

    @classmethod
    def tearDownClass(cls):
        print('執行一次後置條件。')

    @ddt.data(*excel_data)
    def test_register(self,case_info):

        if "#phone#" in case_info["data"]:
            phone = self.random_phone()
            #phone 去替換 #phone#
            case_info["data"] = case_info["data"].replace("#phone#",phone)

        logger.info('讀取case成功。')
        resp = requests_handler.visit_simple(
            url = case_info["url"],
            method=case_info["method"],
            json = json.loads(case_info["data"]),
            headers =json.loads(case_info["headers"])
        )
        print(resp)

        #斷言
        try:
            for k,v in eval(case_info['expected']).items():#items循環字典
                self.assertEqual(v,resp[k])
                logger.info('斷言成功,測試經過。')

        except AttributeError as e:
            logger.error('斷言失敗,測試不經過。{}'.format(e))
            raise e

    #生成手機號的實例方法
    def random_phone(self):
        """隨機生成動態手機號"""
        import random
        while True:
            phone = '1'+random.choice(['3','5','7','8','9'])

            #動態生成手機號後9位
            for i in range(0,9):
                num = random.randint(0,9)
                phone += str(num)
            print(phone)

            db = MysqlHandler(
                host= "120.78.128.25",
                port= 3306,
                user= "future",
                password="123456",
                charset= "utf8",
                cursorclass=DictCursor
            )
            sql_code = "SELECT * FROM futureloan.member WHERE mobile_phone={}".format(phone)
            #查詢生成的phone是否在數據庫,phone_record是該手機號的用戶信息
            phone_recode = db.query(sql_code)
            #返回的用戶信息不存在,返回該phone,去替換excel的#phone#
            if not phone_recode:
                db.close()
                return phone
            db.close()


if __name__ == '__main__':
    pass

 

3.二次封裝------middleware---handler.py

上面的test_register.py中,獲取yaml,excel,logger數據能夠進行單獨封裝成handler,簡化test_register, 在test_register中調用handler便可。

♥封裝的handler應該放在哪一個目錄?------>common目錄是一些通用的模塊,不一樣的項目均可以直接拿來用,而handler中有依賴別的模塊,不能放在common目錄。

在項目中新建一個middleware文件夾,裏面存放封裝的handler.py

注意:在讀取excel表單時,每一個表單的名字不同,在handler.py中,只是封裝了exel對象,在test_register.py中,excel對象訪問excel_handler中的read_data()方法便可

import os
from common import yaml_handler,excel_handler,logging_handler
from config import config

class Handler:

    """將yaml,excel,logger定義爲類屬性"""
    #獲取conf屬性
    conf = config  #一切皆對象,能夠將模塊直接賦值給conf這個屬性

    #獲取yaml數據
    yaml_data = yaml_handler.read_yaml(os.path.join(config.CONFIG_PATH,'config.yaml'))

    #獲取excel對象
    __excel_file = yaml_data["excel"]["file"]
    __excel_path  = os.path.join(config.EXCEL_PATH,__excel_file)
    excel = excel_handler.ExcelHandler(__excel_path)

    #獲取logger
    logger_config = yaml_data["logger"]
    logger = logging_handler.get_logger(
        name = logger_config["name"],
        file_name = os.path.join(config.LOG_PATH,logger_config["file"]),
        log_level=logger_config["logger_level"],
        stream_level=logger_config["stream_level"],
        file_level=logger_config["file_level"]
    )

上面的雙下劃線+屬性名,表示私有屬性,該屬性只能在這個類裏面使用,出了這個類,別的對象/類是訪問不了的。

 

簡化後的test_register.py

import json
import unittest
from pymysql.cursors import DictCursor
from common.mysql_handler import MysqlHandler
from common import requests_handler
from middleware import handler
import ddt

#獲取excel數據
excel_data = handler.Handler.excel.read_data('register_v1') #獲取logger
logger = handler.Handler.logger

@ddt.ddt
class TestRegister(unittest.TestCase):

    @classmethod
    def setUpClass(cls):
        print('執行一次前置條件。')

    @classmethod
    def tearDownClass(cls):
        print('執行一次後置條件。')

    @ddt.data(*excel_data)
    def test_register(self,case_info):

        if "#phone#" in case_info["data"]:
            phone = self.random_phone()
            #phone 去替換 #phone#
            case_info["data"] = case_info["data"].replace("#phone#",phone)

        logger.info('讀取case成功。')
        resp = requests_handler.visit_simple(
            url = case_info["url"],
            method=case_info["method"],
            json = json.loads(case_info["data"]),
            headers =json.loads(case_info["headers"])
        )
        print(resp)

        #斷言
        try:
            for k,v in eval(case_info['expected']).items():#item循環字典
                self.assertEqual(v,resp[k])
                logger.info('斷言成功,測試經過。')

        except AttributeError as e:
            logger.error('斷言失敗,測試不經過。{}'.format(e))
            raise e

    #生成手機號的實例方法
    def random_phone(self):
        """隨機生成動態手機號"""
        import random
        while True:
            phone = '1'+random.choice(['3','5','7','8','9'])

            #動態生成手機號後9位
            for i in range(0,9):
                num = random.randint(0,9)
                phone += str(num)
            print(phone)

            db = MysqlHandler(
                host= "120.78.128.25",
                port= 3306,
                user= "future",
                password="123456",
                charset= "utf8",
                cursorclass=DictCursor
            )
            sql_code = "SELECT * FROM futureloan.member WHERE mobile_phone={}".format(phone)
            #查詢生成的phone是否在數據庫,phone_record是該手機號的用戶信息
            phone_recode = db.query(sql_code)
            #返回的用戶信息不存在,返回該phone,去替換excel的#phone#
            if not phone_recode:
                db.close()
                return phone
            db.close()


if __name__ == '__main__':
    pass

 

4.mysqlhandler二次封裝

在讀取數據庫數據時,每次讀取都要傳不少參數,因此能夠把讀取數據庫的操做進行再次封裝

在middlerware的handler.py模塊中,從新定義一個MysqlhandlerMid的類,並繼承MysqlHandler類,進行封裝

import os
from pymysql.cursors import DictCursor
from common import yaml_handler,excel_handler,logging_handler
from config import config
from common.mysql_handler import MysqlHandler

class Handler:

    """將yaml,excel,logger定義爲類屬性"""
    #獲取conf屬性
    conf = config  #一切皆對象,能夠將模塊直接賦值給conf這個屬性

    #獲取yaml數據
    yaml_data = yaml_handler.read_yaml(os.path.join(config.CONFIG_PATH,'config.yaml'))

    #獲取excel對象
    excel_file = yaml_data["excel"]["file"]
    excel_path  = os.path.join(config.EXCEL_PATH,excel_file)
    excel = excel_handler.ExcelHandler(excel_path)

    #獲取logger
    logger_config = yaml_data["logger"]
    logger = logging_handler.get_logger(
        name = logger_config["name"],
        file_name = os.path.join(config.LOG_PATH,logger_config["file"]),
        log_level=logger_config["logger_level"],
        stream_level=logger_config["stream_level"],
        file_level=logger_config["file_level"]
    )


class MysqlHandlerMid(MysqlHandler): #讀取配置文件的選項 MysqlHandler

    """yaml中db的數據
    db:
      host: "120.78.128.25"
      port: 3306
      user: "future"
      password: "123456"
      charset: "utf8"
    """
    def __init__(self):
        #初始化全部的配置項,從yaml中讀取
        db_config = Handler.yaml_data["db"]
        super().__init__(
            host=db_config["host"],
            port = db_config["port"],
            user = db_config["user"],
            password=db_config["password"],
            charset=db_config["charset"],
            cursorclass=DictCursor

        )

 

2、註冊成功,檢查數據庫是否有註冊的用戶信息

@ddt.ddt
class TestRegister(unittest.TestCase):

    @classmethod
    def setUpClass(cls):
        print('執行一次前置條件。')

    @classmethod
    def tearDownClass(cls):
        print('執行一次後置條件。')

    @ddt.data(*excel_data)
    def test_register(self,case_info):

        if "#phone#" in case_info["data"]:
            phone = self.random_phone()
            case_info["data"] = case_info["data"].replace("#phone#",phone)
        data = json.loads(case_info["data"])
        logger.info("#####################正在執行register接口測試#####################")
        logger.info("正在執行第{}條的{}用例!".format(case_info["case_id"], case_info["title"]))
        logger.info("請求的數據是:{}".format(case_info))
        resp = requests_handler.visit_simple(
            url = case_info["url"],
            method=case_info["method"],
            json = data,
            headers =json.loads(case_info["headers"])
        )
        print(resp)

        try:
            for k,v in eval(case_info['expected']).items():#item循環字典
                self.assertEqual(v,resp[k])
                logger.info('斷言成功,測試經過。')

            # 判斷註冊成功的手機號,是否存在數據庫中
                logger.info("#####################正在查詢數據庫#####################") if resp["code"] == 0: db = handler.MysqlHandlerMiddle() sql_code = "select * from futureloan.member where mobile_phone = {}".format(data["mobile_phone"]) user_info = db.query(sql_code) db.close() self.assertTrue(user_info) test_result = "PASS" logger.info("第{}用例執行結果爲:{}".format(case_info["case_id"],test_result)) except AttributeError as e:
            logger.error('斷言失敗,測試不經過,錯誤爲:{}'.format(e))
            test_result = "Failed"
            logger.error("第{}用例執行結果爲:{}".format(case_info["case_id"],test_result))
            raise e
相關文章
相關標籤/搜索