python locust 進行壓力測試

最近公司項目週期比較趕, 項目是軟硬結合,在缺乏硬件的狀況下,經過接口模擬設備上下架和購買狀況,並進行壓力測試,html

本次主要使用三個接口 分別是3個場景: 生成商品IP, 對商品進行上架, 消費者購買商品python

最大問題:是數據庫是用ssh,只能用127.0.0.1去連接數據庫,mysql

試過用ssh連接數據庫, 用requests 去跑腳本沒有問題,換上locust 就有問題web

最後使用putty做爲代理連接 ,有個缺陷就是 連接時效性不強,常常要從新連接puttysql

 

環境:數據庫

win10  mysql  locust python3.7flask

 

1.locust:

Locust是一個用於可擴展的,分佈式的,性能測試的,開源的,用Python編寫框架/工具,它很是容易使用,也很是好學。

它的主要思想就是模擬一羣用戶將訪問你的網站。每一個用戶的行爲由你編寫的python代碼定義,同時能夠從Web界面中實時觀察到用戶的行爲。

Locust徹底是事件驅動的,所以在單臺機器上可以支持幾千併發用戶訪問。

與其它許多基於事件的應用相比,Locust並不使用回調,而是使用gevent,而gevent是基於協程的,能夠用同步的方式來編寫異步執行的代碼。

每一個用戶實際上運行在本身的greenlet中。

api

2. 安裝

pip  install locust

 安裝 pyzmq網絡

 If you intend to run Locust distributed across multiple processes/machines, we recommend you to also install pyzmq.併發

 若是打算運行Locust 分佈在多個進程/機器,須要安裝pyzmq.

 經過pip命令安裝。

 pip install pyzmq

3.Locust主要由下面的幾個庫構成:

 

1) gevent

gevent是一種基於協程的Python網絡庫,它用到Greenlet提供的,封裝了libevent事件循環的高層同步API。

2) flask

Python編寫的輕量級Web應用框架。

3) requests

Python Http庫

4) msgpack-python

MessagePack是一種快速、緊湊的二進制序列化格式,適用於相似JSON的數據格式。msgpack-python主要提供MessagePack數據序列化及反序列化的方法。

5) six

Python2和3兼容庫,用來封裝Python2和Python3之間的差別性

6) pyzmq

pyzmq是zeromq(一種通訊隊列)的Python綁定,主要用來實現Locust的分佈式模式運行

當咱們在安裝 Locust 時,它會檢測咱們當前的 Python 環境是否已經安裝了這些庫,若是沒有安裝,它會先把這些庫一一裝上。而且對這些庫版本有要求,有些是必須等於某版本,有些是大於某版本。咱們也能夠事先把這些庫所有按要求裝好,再安裝Locust時就會快上許多

 

 

4. 腳本解讀

1、建立ScriptTasks()類繼承TaskSet類:  用於定義測試業務。

二、建立index()、about()、demo()方法分別表示一個行爲,訪問http://example.com。用@task() 裝飾該方法爲一個任務。一、2表示一個Locust實例被挑選執行的權重,數值越大,執行頻率越高。在當前ScriptTasks()行爲下的三個方法得執行比例爲2:1:1

3、WebsiteUser()類: 用於定義模擬用戶。

4、task_set :  指向一個定義了的用戶行爲類。

5、host:   指定被測試應用的URL的地址

6、min_wait :   用戶執行任務之間等待時間的下界,單位:毫秒。

七、max_wait :   用戶執行任務之間等待時間的上界,單位:毫秒。

 

5.執行腳本:

web/UI 界面:

if __name__ == '__main__':
    import os
    os.system("locust -f godemo.py --host=http://xx.api.xxxxx.net")

或者在命令行:

locust -f godemo.py --host=http:xxx.xxx.xx.net

或者:
locust -f godemo.py --H=http:xxx.xxx.xx.net

 

no-web:

dos進入Scripts目錄下,執行  locust -f ****.py --csv=onetest --host=http://0.0.0.0:0000 --no-web -c10 -r10 -t2 

(PS:-f 指定運行的py文件的名字,--csv 生成報告的名字,--host 測試的http服務的ip和port,--no-web 不用web啓動,-c 設置虛擬用戶數,  -r 設置每秒啓動虛擬用戶數, -t  設置運行時間)

 

下面是腳本:

建立一個方法保存在goconn,從數據庫讀取數據傳入請求:

import  pymysql
from sshtunnel import SSHTunnelForwarder
import random


def conn():

    #本地經過putty連接數據庫,在連接跳板機
    lcDB = pymysql.connect(host="127.0.0.1",port=8807, user="rt",passwd="qwqwqw12",db="go")
    cur = lcDB.cursor()
    #隨機生成一個數字,做爲查詢的結果的結果數
    num = random.randint(0,10)

    sql = "select no from goods where operator_id =%s order by rand() limit %s"%(11, num)
    print(sql)
    rfid = []

    try:

        cur.execute(sql)
        data = cur.fetchall()

        for row in data:
            good_no = row[0]
            rfid.append(good_no)
            # print(good_no)
        return rfid


    except:
        print("Error")

    finally:
        #關閉鏈接
        lcDB.close()

 

請求:

import goconnfrom locust import HttpLocust,TaskSet,task

class goDemo(TaskSet):

    def getRfid(self):

        gID = goconn.conn()
        data = {
            "goods_no[]": gID,
            "num": 1,
        }
        print("2")
        r = self.client.post("/test/api/XXX", data=data)
        rfid = eval(r.content)["result"]
        print(rfid)
        return rfid

    def stock(self,did, oid, rfid):
        data = {
            "device_id": did,
            "operator_id": oid,
            "data[add][]": rfid,
            "data[remove][]": "",
        }
        result = self.client.post("/test/api/XX2", data= data)
        print(result.content)


    def purchase(self,did,uid,rfid):

        data = {
            "device_id":did,
            "user_id":uid,
            "data[]":rfid,
        }
        req = self.client.post("/test/api/XXX3", data = data)
        res =eval(req.content)["result"]
        print(req.content)

    @task(1)
    def test_getrfid(self):
        self.getRfid()

    @task(3)
    def test_stock(self):
        rfid = self.getRfid()
        self.stock(28,11,rfid)

    @task(7)
    def test_purchase(self):
        rfid = self.getRfid()
        self.stock(28,11,rfid)
        self.purchase(28,172,rfid)


class WebsiteUser(HttpLocust):
    task_set = goDemo
    min_wait = 3000
    max_wait = 5000

 

測試結果:

 Number of users to simulate:設置模擬的用戶總數,

Hatch rate (users spawned/second):每秒啓動的虛擬用戶數 ,

Start swarming:執行locust腳本

 

UI界面:運行200個, 每秒啓動2個用戶:

 參考文章:

http://www.pianshen.com/article/6404330705/

關於性能好文章:

https://www.cnblogs.com/botoo/p/7410283.html

設計locust:

參數化 ,關聯等:

http://www.cnblogs.com/ailiailan/p/9474973.html

相關文章
相關標籤/搜索