一個簡陋的高併發請求腳本的演進過程

由於一個朋友最近想搞接口壓力測試,推薦了jmeter,由於jmeter開源,且有命令行啓動模式,方便封裝。興起時,本身也簡單實現了一下高併發的腳本。python

一開始想到的採用的是多進程+多線程+協程。想法是這樣的,多進程是爲了有效利用多核,理論上最好一個核對應一個進程比較好;那我爲何還要用多線程呢?不怕GIL全局鎖嗎?當時我是這麼想的,由於我用了gevent處理,請求採用requests,但requests是阻塞的方法,因此我把requests操做丟到協程作,就沒啥問題了。接下來看看腳本,實現了一個2000併發量的腳本(寫的比較爛,不要在乎這些細節)多線程

# coding:utf-8
import multiprocessing
import requests
import threading

import gevent


process_num = 10  # 進程數
gevent_num = 10  # 協程數
threading_num = 20


def asynchronous(url):
    threads = []
    for i in range(gevent_num):
        threads.append(gevent.spawn(request_url, url))
    gevent.joinall(threads)


def request_url(url):
    code = requests.get(url).status_code
    if code != 200:
        print "the time request failed: " + str(code)
    else:
        print "the time request ok"


def run_in_thread(url):
    threadings = []

    for i in xrange(threading_num):
        t = threading.Thread(target=asynchronous, args=(url,))
        t.daemon = True
        t.start()
        threadings.append(t)
    for t in threadings:
        t.join()


if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=process_num)
    for i in range(process_num):
        pool.apply_async(run_in_thread, ("https://www.jd.com",))
    pool.close()
    pool.join()

 可是這個腳本是有問題的,由於requests是阻塞方法,所以致使協程實際上是無效的,由於它會阻塞直到前一個協程任務結束,因此須要把requests替換成異步方法,下面看看替換後的方法。併發

# coding:utf-8
import multiprocessing
import threading

import gevent
import time

import tornado
from tornado.httpclient import AsyncHTTPClient

process_num = 10  # 進程數
gevent_num = 10  # 協程數
threading_num = 2   # 線程數


def asynchronous(url):
    threads = []
    for i in range(gevent_num):
        threads.append(gevent.spawn(request_url, url))
    gevent.joinall(threads)


def request_url(url):
    http_client = AsyncHTTPClient()
    http_client.fetch(url, callback=handle_request)
    loop = tornado.ioloop.IOLoop.instance()
    if loop._running is False:
        loop.start()


def run_in_thread(url):
    threadings = []

    for i in xrange(threading_num):
        t = threading.Thread(target=asynchronous, args=(url,))
        t.daemon = True
        t.start()
        threadings.append(t)
    for t in threadings:
        t.join()


def handle_request(response):
    print response.code

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=process_num)
    for i in range(process_num):
        pool.apply_async(run_in_thread, ("https://www.jd.com/",))
    pool.close()
    pool.join()

 不過依然還有問題,可是屏幕輸出的信息滯後,若是在代碼裏print,會致使異步代碼執行效率下降,爲了統計數據,使用了process的Manager來在進程中作併發數累積,此時發現隨着線程數的增長,併發能力反而下降了,這就是GIL鎖的限制了,所以在python2中,使用了協程的話,就不要使用多線程了,接下來看看將線程改成1後,且加了計數器後的代碼app

# coding:utf-8
import multiprocessing

import gevent

import time
import tornado
from threadpool import ThreadPool, makeRequests
from tornado.httpclient import AsyncHTTPClient
from multiprocessing import Process,Manager

process_num = 20    # 進程數
gevent_num = 200     # 協程數
threading_num = 1  # 線程數

url = "http://www.baidu.com"

sum = Manager().Value('count', 0)


def asynchronous(url):
    try:
        threads = []
        for i in range(gevent_num):
            threads.append(gevent.spawn(request_url, url))
        gevent.joinall(threads)
    except Exception as e:
        pass


def request_url(url):
    http_client = AsyncHTTPClient()
    sum.value += 1
    http_client.fetch(url, callback=handle_request)
    # time.sleep(1)
    # print " count: " + str(sum.value) + " cur process: " + str(os.getpid()) + " cur thread: " + str(threading.current_thread)
    global loop
    loop = tornado.ioloop.IOLoop.instance()
    if loop._running is False:
        loop.start()


def run_in_thread(url):
    pool = ThreadPool(threading_num)
    requests = makeRequests(asynchronous, [url])
    [pool.putRequest(req) for req in requests]
    pool.wait()


def handle_request(response):
    # print "current site: " + str(response.effective_url) + " , request  time: " + str(
    #     getattr(response, "request_time", "000"))
    loop.stop()


def main():
    starttime = time.time()
    pool = multiprocessing.Pool(processes=process_num)
    for i in range(process_num):
        pool.apply_async(run_in_thread, (url,))
    pool.close()
    pool.join()
    print sum.value
    print "cost time: " + str(time.time() - starttime)


if __name__ == '__main__':
    main()

輸出結果能夠看看python2.7

/usr/bin/python2.7 /home/shufeng/workspace/private_project/jobscrawler/center/sample.py
3244
cost time: 2.23202705383

Process finished with exit code 0
相關文章
相關標籤/搜索