由於一個朋友最近想搞接口壓力測試,推薦了jmeter,由於jmeter開源,且有命令行啓動模式,方便封裝。興起時,本身也簡單實現了一下高併發的腳本。python
一開始想到的採用的是多進程+多線程+協程。想法是這樣的,多進程是爲了有效利用多核,理論上最好一個核對應一個進程比較好;那我爲何還要用多線程呢?不怕GIL全局鎖嗎?當時我是這麼想的,由於我用了gevent處理,請求採用requests,但requests是阻塞的方法,因此我把requests操做丟到協程作,就沒啥問題了。接下來看看腳本,實現了一個2000併發量的腳本(寫的比較爛,不要在乎這些細節)多線程
# coding:utf-8 import multiprocessing import requests import threading import gevent process_num = 10 # 進程數 gevent_num = 10 # 協程數 threading_num = 20 def asynchronous(url): threads = [] for i in range(gevent_num): threads.append(gevent.spawn(request_url, url)) gevent.joinall(threads) def request_url(url): code = requests.get(url).status_code if code != 200: print "the time request failed: " + str(code) else: print "the time request ok" def run_in_thread(url): threadings = [] for i in xrange(threading_num): t = threading.Thread(target=asynchronous, args=(url,)) t.daemon = True t.start() threadings.append(t) for t in threadings: t.join() if __name__ == '__main__': pool = multiprocessing.Pool(processes=process_num) for i in range(process_num): pool.apply_async(run_in_thread, ("https://www.jd.com",)) pool.close() pool.join()
可是這個腳本是有問題的,由於requests是阻塞方法,所以致使協程實際上是無效的,由於它會阻塞直到前一個協程任務結束,因此須要把requests替換成異步方法,下面看看替換後的方法。併發
# coding:utf-8 import multiprocessing import threading import gevent import time import tornado from tornado.httpclient import AsyncHTTPClient process_num = 10 # 進程數 gevent_num = 10 # 協程數 threading_num = 2 # 線程數 def asynchronous(url): threads = [] for i in range(gevent_num): threads.append(gevent.spawn(request_url, url)) gevent.joinall(threads) def request_url(url): http_client = AsyncHTTPClient() http_client.fetch(url, callback=handle_request) loop = tornado.ioloop.IOLoop.instance() if loop._running is False: loop.start() def run_in_thread(url): threadings = [] for i in xrange(threading_num): t = threading.Thread(target=asynchronous, args=(url,)) t.daemon = True t.start() threadings.append(t) for t in threadings: t.join() def handle_request(response): print response.code if __name__ == '__main__': pool = multiprocessing.Pool(processes=process_num) for i in range(process_num): pool.apply_async(run_in_thread, ("https://www.jd.com/",)) pool.close() pool.join()
不過依然還有問題,可是屏幕輸出的信息滯後,若是在代碼裏print,會致使異步代碼執行效率下降,爲了統計數據,使用了process的Manager來在進程中作併發數累積,此時發現隨着線程數的增長,併發能力反而下降了,這就是GIL鎖的限制了,所以在python2中,使用了協程的話,就不要使用多線程了,接下來看看將線程改成1後,且加了計數器後的代碼app
# coding:utf-8 import multiprocessing import gevent import time import tornado from threadpool import ThreadPool, makeRequests from tornado.httpclient import AsyncHTTPClient from multiprocessing import Process,Manager process_num = 20 # 進程數 gevent_num = 200 # 協程數 threading_num = 1 # 線程數 url = "http://www.baidu.com" sum = Manager().Value('count', 0) def asynchronous(url): try: threads = [] for i in range(gevent_num): threads.append(gevent.spawn(request_url, url)) gevent.joinall(threads) except Exception as e: pass def request_url(url): http_client = AsyncHTTPClient() sum.value += 1 http_client.fetch(url, callback=handle_request) # time.sleep(1) # print " count: " + str(sum.value) + " cur process: " + str(os.getpid()) + " cur thread: " + str(threading.current_thread) global loop loop = tornado.ioloop.IOLoop.instance() if loop._running is False: loop.start() def run_in_thread(url): pool = ThreadPool(threading_num) requests = makeRequests(asynchronous, [url]) [pool.putRequest(req) for req in requests] pool.wait() def handle_request(response): # print "current site: " + str(response.effective_url) + " , request time: " + str( # getattr(response, "request_time", "000")) loop.stop() def main(): starttime = time.time() pool = multiprocessing.Pool(processes=process_num) for i in range(process_num): pool.apply_async(run_in_thread, (url,)) pool.close() pool.join() print sum.value print "cost time: " + str(time.time() - starttime) if __name__ == '__main__': main()
輸出結果能夠看看python2.7
/usr/bin/python2.7 /home/shufeng/workspace/private_project/jobscrawler/center/sample.py
3244
cost time: 2.23202705383
Process finished with exit code 0