最近公司項目週期比較趕, 項目是軟硬結合,在缺乏硬件的狀況下,經過接口模擬設備上下架和購買狀況,並進行壓力測試,html
本次主要使用三個接口 分別是3個場景: 生成商品IP, 對商品進行上架, 消費者購買商品python
最大問題:是數據庫是用ssh,只能用127.0.0.1去連接數據庫,mysql
試過用ssh連接數據庫, 用requests 去跑腳本沒有問題,換上locust 就有問題web
最後使用putty做爲代理連接 ,有個缺陷就是 連接時效性不強,常常要從新連接puttysql
環境:數據庫
win10 mysql locust python3.7flask
Locust是一個用於可擴展的,分佈式的,性能測試的,開源的,用Python編寫框架/工具,它很是容易使用,也很是好學。
它的主要思想就是模擬一羣用戶將訪問你的網站。每一個用戶的行爲由你編寫的python代碼定義,同時能夠從Web界面中實時觀察到用戶的行爲。
Locust徹底是事件驅動的,所以在單臺機器上可以支持幾千併發用戶訪問。
與其它許多基於事件的應用相比,Locust並不使用回調,而是使用gevent,而gevent是基於協程的,能夠用同步的方式來編寫異步執行的代碼。
每一個用戶實際上運行在本身的greenlet中。
api
pip install locust
安裝 pyzmq網絡
If you intend to run Locust distributed across multiple processes/machines, we recommend you to also install pyzmq.併發
若是打算運行Locust 分佈在多個進程/機器,須要安裝pyzmq.
經過pip命令安裝。
pip install pyzmq
1) gevent
gevent是一種基於協程的Python網絡庫,它用到Greenlet提供的,封裝了libevent事件循環的高層同步API。
2) flask
Python編寫的輕量級Web應用框架。
3) requests
Python Http庫
4) msgpack-python
MessagePack是一種快速、緊湊的二進制序列化格式,適用於相似JSON的數據格式。msgpack-python主要提供MessagePack數據序列化及反序列化的方法。
5) six
Python2和3兼容庫,用來封裝Python2和Python3之間的差別性
6) pyzmq
pyzmq是zeromq(一種通訊隊列)的Python綁定,主要用來實現Locust的分佈式模式運行
當咱們在安裝 Locust 時,它會檢測咱們當前的 Python 環境是否已經安裝了這些庫,若是沒有安裝,它會先把這些庫一一裝上。而且對這些庫版本有要求,有些是必須等於某版本,有些是大於某版本。咱們也能夠事先把這些庫所有按要求裝好,再安裝Locust時就會快上許多
4. 腳本解讀:
1、建立ScriptTasks()類繼承TaskSet類: 用於定義測試業務。 二、建立index()、about()、demo()方法分別表示一個行爲,訪問http://example.com。用@task() 裝飾該方法爲一個任務。一、2表示一個Locust實例被挑選執行的權重,數值越大,執行頻率越高。在當前ScriptTasks()行爲下的三個方法得執行比例爲2:1:1 3、WebsiteUser()類: 用於定義模擬用戶。 4、task_set : 指向一個定義了的用戶行爲類。 5、host: 指定被測試應用的URL的地址 6、min_wait : 用戶執行任務之間等待時間的下界,單位:毫秒。 七、max_wait : 用戶執行任務之間等待時間的上界,單位:毫秒。
5.執行腳本:
web/UI 界面:
if __name__ == '__main__': import os os.system("locust -f godemo.py --host=http://xx.api.xxxxx.net")
或者在命令行:
locust -f godemo.py --host=http:xxx.xxx.xx.net
或者:
locust -f godemo.py --H=http:xxx.xxx.xx.net
no-web:
dos進入Scripts目錄下,執行 locust -f ****.py --csv=onetest --host=http://0.0.0.0:0000 --no-web -c10 -r10 -t2
(PS:-f 指定運行的py文件的名字,--csv 生成報告的名字,--host 測試的http服務的ip和port,--no-web 不用web啓動,-c 設置虛擬用戶數, -r 設置每秒啓動虛擬用戶數, -t 設置運行時間)
下面是腳本:
建立一個方法保存在goconn,從數據庫讀取數據傳入請求:
import pymysql from sshtunnel import SSHTunnelForwarder import random def conn(): #本地經過putty連接數據庫,在連接跳板機 lcDB = pymysql.connect(host="127.0.0.1",port=8807, user="rt",passwd="qwqwqw12",db="go") cur = lcDB.cursor() #隨機生成一個數字,做爲查詢的結果的結果數 num = random.randint(0,10) sql = "select no from goods where operator_id =%s order by rand() limit %s"%(11, num) print(sql) rfid = [] try: cur.execute(sql) data = cur.fetchall() for row in data: good_no = row[0] rfid.append(good_no) # print(good_no) return rfid except: print("Error") finally: #關閉鏈接 lcDB.close()
請求:
import goconnfrom locust import HttpLocust,TaskSet,task class goDemo(TaskSet): def getRfid(self): gID = goconn.conn() data = { "goods_no[]": gID, "num": 1, } print("2") r = self.client.post("/test/api/XXX", data=data) rfid = eval(r.content)["result"] print(rfid) return rfid def stock(self,did, oid, rfid): data = { "device_id": did, "operator_id": oid, "data[add][]": rfid, "data[remove][]": "", } result = self.client.post("/test/api/XX2", data= data) print(result.content) def purchase(self,did,uid,rfid): data = { "device_id":did, "user_id":uid, "data[]":rfid, } req = self.client.post("/test/api/XXX3", data = data) res =eval(req.content)["result"] print(req.content) @task(1) def test_getrfid(self): self.getRfid() @task(3) def test_stock(self): rfid = self.getRfid() self.stock(28,11,rfid) @task(7) def test_purchase(self): rfid = self.getRfid() self.stock(28,11,rfid) self.purchase(28,172,rfid) class WebsiteUser(HttpLocust): task_set = goDemo min_wait = 3000 max_wait = 5000
測試結果:
Number of users to simulate:設置模擬的用戶總數,
Hatch rate (users spawned/second):每秒啓動的虛擬用戶數 ,
Start swarming:執行locust腳本
UI界面:運行200個, 每秒啓動2個用戶:
參考文章:
http://www.pianshen.com/article/6404330705/
關於性能好文章:
https://www.cnblogs.com/botoo/p/7410283.html
設計locust:
參數化 ,關聯等:
http://www.cnblogs.com/ailiailan/p/9474973.html