本文描述一個python實現的多進程壓測工具,這個壓測工具的特色以下:java
在大多數狀況下,壓測通常適用於IO密集型場景(如訪問接口並等待返回),在這種場景下多線程多進程的區分並不明顯(詳情請參見GIL相關)。不過一旦出現詞表參數加密、返回內容校驗等事情的話,多進程對發送效率的提高仍是很明顯的。python
能夠指定發壓的QPS,根據並行度和請求相應時間,能夠估算出可發送QPS峯值。例如並行度是10,響應時間是100ms,那麼QPS峯值應該是(1s/100ms * 10)=100,此工具能夠將QPS穩定的維持在小於峯值的一個量上。session
爲何要DIY壓測工具了?通常的服務端壓測工具,例如http_load和jmeter,不是http協議的,就是須要經過代碼進行擴展。例如在壓測thrift接口的時候,即便經過jmeter擴展java程序也很麻煩。可是當涉及到場景化壓測,或者是奇怪的SDK,例如本文要壓測的接口是經過java代碼自動生成的python消息類SDK,而且涉及到場景化的壓測,很難經過通常的服務端壓測工具搞定。數據結構
一、發壓代碼多線程
解耦併發
下面是壓測代碼的實現,能夠看到,我這裏使用abc包,作了一個抽象類。app
業務測試代碼,例如自動化case,只要繼承了這個抽象類,就得到壓測的能力,作到壓測和自動化測試的解耦。dom
這裏有兩個抽象方法函數
vocab() - 構造詞表工具
press() - 發壓邏輯
是被@abc.abstractmethod裝飾器裝飾,在子類中,是必定要被實現的。
run()方法是壓測執行的方法,實現子類的詞表方法和發壓邏輯以後,直接調用run()方法就能夠壓測了。
固定QPS
固定QPS是經過管理進程實現的。能夠看到有兩種進程:
一種是worker_process進程,調用了press()發壓邏輯函數,而且這個進程能夠指定併發度concurrent,是實際的發壓進程,值得注意的是在worker_process中使用了time.sleep(),是爲了控制發送速度。
另外一種是manager_process進程,這個進程每隔一段時間計算實際的qps,並和設置的qps比較,而後調整worker_process中的sleep時間,例如實際qps小於設定qps,那麼就少睡一下子。
這裏不得不提到的是,多進程如何共享變量?
這裏使用的是multiprocessing中的Manager包,這個包提供了多進程共享變量的能力,我這裏用到的是Namespace數據結構來存儲多進程的計數。在使用過程當中我懷疑Manager Namespace是經過讀寫文件的形式進行進程間共享變量的,這個我沒有深刻的研究。
# -*- coding:utf-8 -*- import abc import time from multiprocessing import Lock, Process, Manager class Press(object): __metaclass__ = abc.ABCMeta def __init__(self, qps=100, concurrent=10): self.qps = qps self.concurrent = concurrent self.mutex = Lock() self.local = Manager().Namespace() self.local.count = 0 self.local.sleep = 0.1 self.manager_gap = 0.5 self.precision = 0.1 self.vocab_list = list() self.vocab() def manager_process(self): while True: with self.mutex: current_qps = self.local.count / self.manager_gap self.local.count = 0 print self.local.sleep, current_qps if current_qps < self.qps: self.local.sleep = self.local.sleep * (1.0 - self.precision) else: self.local.sleep = self.local.sleep * (1.0 + self.precision) time.sleep(self.manager_gap) def worker_process(self): while True: with self.mutex: self.local.count += 1 time.sleep(self.local.sleep) self.press() @abc.abstractmethod def vocab(self): return @abc.abstractmethod def press(self): return def run(self): processes = [Process(target=self.worker_process) for index in range(self.concurrent)] processes.append(Process(target=self.manager_process)) for process in processes: process.start() for process in processes: process.join()
二、實際壓測
給出一個發壓的例子。分三步~
QueryVmPress繼承了Press類,得到了發壓能力。
而後實現了vocab方法,構造了詞表。
實現了press方法,這裏是發壓邏輯,能夠看到QueryVmScenario.press_vm(vocab),QueryVmScenario放的是自動化case。發壓只是調用了其中的一個接口。這個接口的編寫很複雜,也是爲何要本身作一個壓測工具的緣由。
# -*- coding:utf-8 -*- import random from query.query_vm_scenario import QueryVmScenario from db.vm_dao import Dao as vm_dao from db.account_dao import Dao as account_dao from press import Press from lib import common from vocab import Vocab class QueryVmVocab(Vocab): def __init__(self): Vocab.__init__(self) class QueryVmPress(Press): def __init__(self, qps=100, concurrent=10): Press.__init__(self, qps, concurrent) def vocab(self): for account in account_dao.query_all_account(limit=10): account_name = account[1] account_password = account[2] res = common.login_by_account(account_name, account_password) for item in vm_dao.query_vm_by_account(account_name, limit=100): vm_uuid = item[1] vocab = QueryVmVocab() vocab.add('session_uuid', res.inventory.uuid) vocab.add('vm_uuid', vm_uuid) self.vocab_list.append(vocab) return self.vocab_list def press(self): vocab = self.vocab_list[random.randint(0, len(self.vocab_list)-1)] QueryVmScenario.press_vm(vocab) if __name__ == '__main__': QueryVmPress(qps=100, concurrent=10).run()
QueryVmPress(qps=100, concurrent=10).run(),就按照100QPS進行壓測了。
0.1 20.0 0.09 40.0 0.081 60.0 0.0729 80.0 0.06561 60.0 0.059049 80.0 0.0531441 60.0 0.04782969 80.0 0.043046721 80.0 0.0387420489 80.0 0.03486784401 80.0 0.031381059609 100.0 0.0345191655699 80.0 0.0310672490129 88.0 0.0279605241116 92.0 0.0251644717005 100.0 0.0276809188705 80.0 0.0249128269835 100.0 0.0274041096818 100.0 0.03014452065 80.0 0.027130068585 100.0 0.0298430754435 80.0 0.0268587678991 100.0 0.029544644689 92.0
第一列是sleep時間,第二列是實際QPS,能夠看到,qps會被動態的穩定在設置的值上。
三、混壓
當要作多個接口混壓的時候,能夠這樣作。
先寫好單壓的python類,在單壓的代碼裏,能夠看到我實現了QueryVmVocab類,表名了詞表的類型,這個類集成自Vocab,Vocab就是一個字典的封裝。
混壓的時候,先將詞表彙總,而且shuffle,而後彈出詞表的時候,使用isinstance判斷詞表的類型,調用不一樣的發壓函數進行壓測。
vocab的實現
# -*- coding:utf-8 -*- import abc class Vocab(object): __metaclass__ = abc.ABCMeta def __init__(self): self.vocab = dict() def add(self, key, value): self.vocab[key] = value def get(self, key): return self.vocab.get(key) def remove(self, key): del self.vocab[key]
混壓的實現
# -*- coding:utf-8 -*- import random from press import Press from query_eip_press import QueryEipPress, QueryEipVocab from query_image_press import QueryImagePress, QueryImageVocab from query_snapshot_press import QuerySnapshotPress, QuerySnapshotVocab from query_vm_press import QueryVmPress, QueryVmVocab from query.query_eip_scenario import QueryEipScenario from query.query_image_scenario import QueryImageScenario from query.query_snapshot_scenario import QuerySnapshotScenario from query.query_vm_scenario import QueryVmScenario class MixedPress(Press): def __init__(self, qps=100, concurrent=10): Press.__init__(self, qps, concurrent) def vocab(self): self.vocab_list.extend(QueryEipPress().vocab()) self.vocab_list.extend(QueryImagePress().vocab()) self.vocab_list.extend(QuerySnapshotPress().vocab()) self.vocab_list.extend(QueryVmPress().vocab()) def press(self): vocab = self.vocab_list[random.randint(0, len(self.vocab_list)-1)] if isinstance(vocab, QueryEipVocab): QueryEipScenario.press_eip(vocab) elif isinstance(vocab, QueryImageVocab): QueryImageScenario.press_image(vocab) elif isinstance(vocab, QuerySnapshotVocab): QuerySnapshotScenario.press_snapshot(vocab) elif isinstance(vocab, QueryVmVocab): QueryVmScenario.press_vm(vocab) if __name__ == '__main__': MixedPress(200, 50).run()
後記
這只是一個很小的功能實現,提供給你們參考。若是有不對的地方,但願獲得你們指正。