脫敏後腳本git
projectapi.py: 項目接口類sql
# -*- coding:utf-8 -*- """ xx項目接口類 2018-11 dinghanhua """ import requests import re import pymssql #region 工具類函數 def findinfo_from_apiquerystockdetailinfo(str1,str2): """ 從str1中找第一個"str2":...後面的值 :param str1: :param str2: :return: str2對應的值 """ pattern1 = '"'+str2 + '":(.*?),"' #左右邊界 result = re.search(pattern1, str1) #正則匹配 if result: result = result.group(1).replace('"','') return result def get_last_value_of_key(resultlist,key): ''' 從二維數組取第一行的元素對應的最後一行的值 :param resultlist: :param key: :return: value ''' for i in range(0,len(resultlist[0])): if key == resultlist[0][i]: #第一行中找到對應字段名的索引 result = resultlist[-1][i] return result #返回數組最後一行對應的值 def round_test(data,i=0): ''' 四捨五入,解決round(7.35)=7.3的問題 :param data: :param i: 保留的位數,默認保留一位小數 :return: ''' if isinstance(i,int): #i是整數 raise Exception('the second param must be int') else: mi = 10**i f = data*mi - int(data*mi) if f >=0.5: res = (int(data*mi)+1)/mi elif f <=-0.5: res = (int(data*mi-1))/mi else: res = int(data*mi)/mi if i<=0: res = int(res) return res # endregion class ProjectApi: def api_querystockdetailinfo(self,stockcode): """ 請求並提取股票基本信息接口數據 :param stockcode: :return: 截取信息dict """ api = 'http://testdomain/querystockdetailinfo?stockcode={stockcode}'.format(stockcode = stockcode) response = requests.get(api) result = response.text.replace(r'\n','').replace('\\', '') # 去掉特殊字符\n,\ result_dict = {'stockcode': stockcode} #股票名稱 result_dict['StockName'] = findinfo_from_apiquerystockdetailinfo(result, 'StockName') if result_dict['StockName']: #股票名稱存在繼續處理其餘字段,不然報錯並返回 # 公司概要 #剔除公司概要中「公司」「公司是」、「公司是一家」高度重複的內容 overviewvalue = result_dict['OverviewValue'] = findinfo_from_apiquerystockdetailinfo(result, 'OverviewValue') if overviewvalue.startswith('公司是一家'): result_dict['OverviewValue'] = overviewvalue[5:] elif overviewvalue.startswith('公司是'): result_dict['OverviewValue'] = overviewvalue[3:] elif overviewvalue.startswith('公司'): result_dict['OverviewValue'] = overviewvalue[2:] if not overviewvalue.endswith('。'): #判斷最後是否有句號,沒有加一個 result_dict['OverviewValue'] += '。' # 市值 typecap = findinfo_from_apiquerystockdetailinfo(result, 'TypeCap') dictcap = {'1': '巨盤', '2': '大盤', '3': '中盤', '4': '小盤', '5': '微盤'} result_dict['TypeCap'] = dictcap[typecap] # 風格 TypeStyle = result_dict['TypeStyle'] = findinfo_from_apiquerystockdetailinfo(result, 'TypeStyle') dictstyle = {'1': '成長', '2': '價值', '3': '週期', '4': '題材', '5': '高價值'} if len(TypeStyle) == 1: result_dict['TypeStyle'] = dictstyle[TypeStyle] elif len(TypeStyle) >1: typestylelist = TypeStyle.split(',') #風格可能有多個 for t in range(len(typestylelist)): typestylelist[t] = dictstyle[typestylelist[t]] result_dict['TypeStyle'] = '、'.join(typestylelist) # 生命週期 LifecycleValue 生命週期(單選,例:1);(1初創期、2成長期、3成熟期、4衰退期)") LifecycleValue = findinfo_from_apiquerystockdetailinfo(result, 'LifecycleValue') dictlifecycle = {'1': '初創期', '2': '成長期', '3': '成熟期', '4': '衰退期', '5': '週期底部', '6': '週期頂部', '7': '週期向下', '8': '週期向上'} if LifecycleValue: result_dict['LifecycleValue'] = dictlifecycle[LifecycleValue] # 估值 ScoreTTM 估值(分值1~5,>=4 偏低, <=2 偏高,其餘適中)") ScoreTTM = findinfo_from_apiquerystockdetailinfo(result, 'ScoreTTM') if ScoreTTM: if float(ScoreTTM) >= 4: result_dict['ScoreTTM'] = '偏低' elif ScoreTTM and float(ScoreTTM) <= 2: result_dict['ScoreTTM'] = '偏高' else: result_dict['ScoreTTM'] = '適中' # 成長指數 ScoreGrowing 成長指數(分值1~5,>=4 高, <=2 低,其餘中)') ScoreGrowing = findinfo_from_apiquerystockdetailinfo(result, 'ScoreGrowing') if ScoreGrowing: if float(ScoreGrowing) >= 4: result_dict['ScoreGrowing'] = '高' elif float(ScoreGrowing) <= 2: result_dict['ScoreGrowing'] = '低' else: result_dict['ScoreGrowing'] = '中' else: result_dict['ScoreGrowing']='' # 盈利能力 ScoreProfit = findinfo_from_apiquerystockdetailinfo(result, 'ScoreProfit') # ' ScoreProfit 盈利能力(分值1~5,>=4 高, <=2 低,其餘中)' ) if ScoreProfit: if float(ScoreProfit) >= 4: result_dict['ScoreProfit'] = '高' elif float(ScoreProfit) <= 2: result_dict['ScoreProfit'] = '低' else: result_dict['ScoreProfit'] = '中' else: result_dict['ScoreProfit']='' return result_dict def api_finance(self,stockcode): """ 請求並提取財務數據 :param stockcode: :return: dict """ api = 'http://testdomain/finance?stockcode={stockcode}'.format(stockcode = stockcode) response = requests.get(api) response.encoding = 'utf-8-sig' result = response.json()['data'][0]['result'] # 轉化爲二位數組 result_dict = {'stockcode': stockcode} #存儲返回結果 if len(result) <3: #說明股票沒數據 return result_dict # 當期報告期 result_dict['EndDate'] = get_last_value_of_key(result, 'EndDate') # 預測收益報告期 ReportPeriod = get_last_value_of_key(result, 'ReportPeriod') dictreportperoid = {'03/31': '一季度', '06/30': '上半年', '09/30': '前三季度', '12/31': '本年度'} if ReportPeriod and ReportPeriod != result_dict['EndDate']: #預測收益報告期不等於當期報告期 ReportPeriod = get_last_value_of_key(result, 'ReportPeriod')[5:10] result_dict['ReportPeriod'] = dictreportperoid[ReportPeriod] else: result_dict['ReportPeriod'] = '' # 預測業績狀況 PerformanceType = get_last_value_of_key(result, 'PerformanceType') result_dict['PerformanceType'] = PerformanceType # 預測業績比例 result_dict['PerformanceTypeRange'] = get_last_value_of_key(result, 'PerformanceTypeRange') # 營業收入增加率 result_dict['OperatingRevenueYoY'] = get_last_value_of_key(result, 'OperatingRevenueYoY') # 營業利潤增加率 result_dict['NetProfitYoY'] = get_last_value_of_key(result, 'NetProfitYoY') # 淨資產收益率 result_dict['ROE'] = get_last_value_of_key(result, 'ROE') # 毛利率 result_dict['SalesGrossMargin'] = get_last_value_of_key(result, 'SalesGrossMargin') return result_dict def api_quote(self,stockcode): """ 請求並提取PETTM :param stockcode: :return: dict """ api = 'http://testdomain/quote?stockcode={stockcode}'.format(stockcode=stockcode) response = requests.get(api) response.encoding = 'utf-8-sig' result = response.json()['data'][0]['result'] # 轉化爲二位數組 result_dict = {'stockcode':stockcode} if len(result) <3: #說明股票沒數據 return result_dict result_dict['PETTM'] = get_last_value_of_key(result, 'PE1') return result_dict def result_of_3sourceapi(self,stockcode): """ 拼接3個接口取出的字串 :param stockcode: :return: """ result_stockdetailinfo = self.api_querystockdetailinfo(stockcode) if result_stockdetailinfo['StockName']: #若是股票名稱存在,執行後續步驟 result_finance = self.api_finance(stockcode) result_quote = self.api_quote(stockcode) #顯示三個接口結果 #print(result_stockdetailinfo) #print(result_finance) #print(result_quote) #空值、精度處理 OperatingRevenueYoY = round_test(float(result_finance['OperatingRevenueYoY']),1) NetProfitYoY = round_test(float(result_finance['NetProfitYoY']),1) if result_finance['ReportPeriod'] ==''\ or result_finance['PerformanceType'] == ''\ or result_finance['PerformanceTypeRange'] == '': ReportPeriod = PerformanceType = PerformanceTypeRange = '' else: ReportPeriod = ',預計' + result_finance['ReportPeriod'] PerformanceType = '業績' + result_finance['PerformanceType'] PerformanceTypeRange = result_finance['PerformanceTypeRange'] if result_finance['ROE']: ROE = ',淨資產收益率:{0}%'.format(round_test(float(result_finance['ROE']),1)) else: ROE = '' if result_finance['SalesGrossMargin']: SalesGrossMargin = ',毛利率:{0}%'.format(round_test(float(result_finance['SalesGrossMargin']),1)) else: SalesGrossMargin = '' result = '{OverviewValue} {TypeCap}{TypeStyle}股,處於{LifecycleValue}。' \ '估值{ScoreTTM},PE(TTM):{PETTM};' \ '成長性{ScoreGrowing},當期營收增加:{OperatingRevenueYoY}%,當期利潤增加:{NetProfitYoY}%;' \ '盈利能力{ScoreProfit}{ROE}{SalesGrossMargin}{ReportPeriod}{PerformanceType}{PerformanceTypeRange}。'\ .format(OverviewValue = result_stockdetailinfo['OverviewValue'], TypeCap = result_stockdetailinfo['TypeCap'], TypeStyle = result_stockdetailinfo['TypeStyle'], LifecycleValue = result_stockdetailinfo['LifecycleValue'], ScoreTTM = result_stockdetailinfo['ScoreTTM'], PETTM = round_test(float(result_quote['PETTM'])), ScoreGrowing = result_stockdetailinfo['ScoreGrowing'], OperatingRevenueYoY = OperatingRevenueYoY, NetProfitYoY = NetProfitYoY, ScoreProfit = result_stockdetailinfo['ScoreProfit'], ROE = ROE, SalesGrossMargin=SalesGrossMargin, ReportPeriod = ReportPeriod, PerformanceType = PerformanceType, PerformanceTypeRange = PerformanceTypeRange) return result else: return '不存在該股票數據' def api_of_dev(self,stockcodelist,cookie,page=0,domain='testdomain.cn'): """ 獲取開發接口數據 :param 股票列表;cookie;domain默認線上 :return: 股票代碼及數據 """ headers = {'Cookie':cookie} url = 'http://{domain}/getstockbypage?StockCodeList={stockcodelist}&PageIndex={page}&PageSize=10'.format(stockcodelist = stockcodelist,domain = domain,page=page) response = requests.get(url, headers=headers) jsonstr = response.json()# 轉成json,取出message message = jsonstr['Message'] dict_result = {} if message: for ele in message: stockcode = ele['StockCode'] content = ele['Content'] # 實際結果 nickname = ele['NickName'] #發佈者 if nickname == 'project000': dict_result[stockcode] = content return dict_result def compare_vs_devapi(self,stockcodelist,cookie,page=0,domain='testdomain.cn'): """ 開發接口數據與接口拼接數據對比 :return: bool """ diff_list = [] # 存儲不一致的股票代碼 resultofdev = self.api_of_dev(stockcodelist,cookie,page,domain) #請求開發接口 if resultofdev: #若是開發結果不爲空 for stock,actual in resultofdev.items(): expected = self.result_of_3sourceapi(stock) #數據源拼接結果 '''去掉pe(ttm)對比''' beginindex = actual.find('PE(TTM)') endindex = actual.find(';', beginindex) actual_result = actual[:beginindex] + actual[endindex:] beginindex = expected.find('PE(TTM)') endindex = expected.find(';', beginindex) expected_result = expected[:beginindex] + expected[endindex:] if actual_result != expected_result: #預期實際對比 print(stock) print('開發:',actual_result) print('預期:',expected_result) diff_list.append(stock) else: print(stock,'一致(不含PETTM)') if diff_list: #異常股票列表不爲空則輸出;空則提示所有一致 print('不一致的股票列表:', diff_list) return False else: print('對比結果:數據一致') return True else: print('接口沒有數據') return True def compare_vs_database(self,count=10): """ 比較數據庫數據與數據源拼接字串 :param count:對比股票數量,default=10 :return:True 一致;False 不一致 """ diff_list = [] # 存儲不一致的股票代碼 with pymssql.connect(server='192.168.1.1', user='sa', password='sa', database='test_db') as myconnect: with myconnect.cursor(as_dict=True) as cursor: cursor.execute("""SELECT top {count} StockCode,StockName,content FROM [test_db].[dbo].[table] where NickName ='project000' and isvalid = 1 and IsDeleted =0 order by createtime desc""".format(count=count)) for row in cursor: stockcode = row['StockCode'] actual = row['content'] expected = self.result_of_3sourceapi(stockcode) # 數據源拼接結果 '''去掉pe(ttm)對比''' beginindex = actual.find('PE(TTM)') endindex = actual.find(';', beginindex) actual_result = actual[:beginindex] + actual[endindex:] beginindex = expected.find('PE(TTM)') endindex = expected.find(';', beginindex) expected_result = expected[:beginindex] + expected[endindex:] if actual_result != expected_result: # 預期實際對比 print(stockcode) print('開發:', actual_result) print('預期:', expected_result) diff_list.append(stockcode) else: print(stockcode, '一致(不含PETTM)') if diff_list: print('不一致的股票列表:', diff_list) return False else: print('對比結果:數據所有一致') return True
run_test.py 執行腳本:數據庫
# coding:utf-8 """ 接口測試執行腳本 """ import projectapi import unittest class ApiTest(unittest.TestCase): def setUp(self): self.projectapi1 = projectapi.ProjectApi() # 建立接口對象 def testcase1(self): """與開發接口比對""" stockcodelist = '600000%2C600128%2C600146%2C600165%2C600186' #經過抓包獲取cookie cookie = 'globalid=24A85DEC-AF25-36DD-C774-ED092F705767' testresult = self.projectapi1.compare_vs_devapi(stockcodelist,cookie) self.assertTrue(testresult) def testcase2(self): # 與數據庫對比 testresult = self.projectapi1.compare_vs_database(count=10) self.assertTrue(testresult) def testcase3(self): """手工查詢原數據和拼接字串""" while True: stockcode = input('輸入股票代碼: ') if stockcode.isdigit() and len(stockcode)==6 and stockcode[:2] in ('00','60','30'): print('數據請求中.....') print(self.projectapi1.api_quote(stockcode)) print(self.projectapi1.api_finance(stockcode)) print(self.projectapi1.api_querystockdetailinfo(stockcode)) print(self.projectapi1.result_of_3sourceapi(stockcode)) else: print('股票代碼有誤')