python併發與web

python併發與web

python併發主要方式有:

  • Thread(線程)
  • Process(進程)
  • 協程
    python由於GIL的存在使得python的併發沒法利用CPU多核的優點以致於性能比較差,下面咱們將經過幾個例子來介紹python的併發。

線程

咱們經過一個簡單web server程序來觀察python的線程,首先寫一個耗時的小函數python

def fib(n):
    if n <= 2:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)

而後寫一個fib web server,程序比較簡單就不解釋了。linux

from socket import *
from fib import fib

def fib_server(address):
    sock = socket(AF_INET, SOCK_STREAM)
    sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    sock.bind(address)
    sock.listen(5)
    while True:
        client, addr = sock.accept()
        print('Connection', addr)
        fib_handle(client)
    
def fib_handler(client):
    while True:
        req = client.recv(100)
        if not req:
            break
        n = int(req)
        result = fib(n)
        resp = str(result).encode('ascii') + b'\n'
        client.send(resp)
    print('Closed')

fib_server(('', 25002))

運行shell命令能夠看到計算結果web

nc localhost 25002shell

10編程

55windows

因爲服務段是單線程的,若是另外啓動一個鏈接將得不到計算結果安全

nc localhost 25002服務器

10多線程

爲了能讓咱們的server支持多個請求,咱們對服務端代碼加入多線程支持併發

#sever.py
#服務端代碼
from socket import *
from fib import fib
from threading import Thread

def fib_server(address):
    sock = socket(AF_INET, SOCK_STREAM)
    sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    sock.bind(address)
    sock.listen(5)
    while True:
        client, addr = sock.accept()
        print('Connection', addr)
        #fib_handler(client)
        Thread(target=fib_handler, args=(client,), daemon=True).start() #須要在python3下運行

def fib_handler(client):
    while True:
        req = client.recv(100)
        if not req:
            break
        n = int(req)
        result = fib(n)
        resp = str(result).encode('ascii') + b'\n'
        client.send(resp)
    print('Closed')
    
fib_server(('', 25002)) #在25002端口啓動程序

運行shell命令能夠看到計算結果

nc localhost 25002

10

55

因爲服務端是多線程的,啓動一個新鏈接將獲得計算結果

nc localhost 25002

10

55

性能測試

咱們加入一段性能測試代碼

#perf1.py
from socket import *
from threading import Thread
import time

sock = socket(AF_INET, SOCK_STREAM)
sock.connect(('localhost', 25002))

n = 0

def monitor():
    global n
    while True:
        time.sleep(1)
        print(n, 'reqs/sec')
        n = 0
Thread(target=monitor).start()


while True:
    start = time.time()
    sock.send(b'1')
    resp = sock.recv(100)
    end = time.time()
    n += 1

#代碼很是簡單,經過全局變量n來統計qps(req/sec 每秒請求數)

在shell中運行perf1.py能夠看到結果以下:

  • 106025 reqs/sec
  • 109382 reqs/sec
  • 98211 reqs/sec
  • 105391 reqs/sec
  • 108875 reqs/sec

平均每秒請求數大概是10w左右

若是咱們另外啓動一個進程來進行性能測試就會發現python的GIL對線程形成的影響

python3 perf1.py

  • 74677 reqs/sec
  • 78284 reqs/sec
  • 72029 reqs/sec
  • 81719 reqs/sec
  • 82392 reqs/sec
  • 84261 reqs/sec

而且原來的shell中的qps也是相似結果

  • 96488 reqs/sec
  • 99380 reqs/sec
  • 84918 reqs/sec
  • 87485 reqs/sec
  • 85118 reqs/sec
  • 78211 reqs/sec

若是咱們再運行

nc localhost 25002

40

來徹底佔用服務器資源一段時間,就能夠看到shell窗口內的rqs迅速降低到

  • 99 reqs/sec
  • 99 reqs/sec

這也反映了Python的GIL的一個特色,會優先處理佔用CPU資源大的任務

具體緣由我也不知道,可能須要閱讀GIL實現源碼才能知道。

線程池在web編程的應用

python有個庫叫作cherrypy,最近用到,大體瀏覽了一下其源代碼,其內核使用的是python線程池技術。

cherrypy經過Python線程安全的隊列來維護線程池,具體實現爲:

class ThreadPool(object):

    """A Request Queue for an HTTPServer which pools threads.

    ThreadPool objects must provide min, get(), put(obj), start()
    and stop(timeout) attributes.
    """

    def __init__(self, server, min=10, max=-1,
        accepted_queue_size=-1, accepted_queue_timeout=10):
        self.server = server
        self.min = min
        self.max = max
        self._threads = []
        self._queue = queue.Queue(maxsize=accepted_queue_size)
        self._queue_put_timeout = accepted_queue_timeout
        self.get = self._queue.get

    def start(self):
        """Start the pool of threads."""
        for i in range(self.min):
            self._threads.append(WorkerThread(self.server))
        for worker in self._threads:
            worker.setName('CP Server ' + worker.getName())
            worker.start()
        for worker in self._threads:
            while not worker.ready:
                time.sleep(.1)
         ....
  
    def put(self, obj):
        self._queue.put(obj, block=True, timeout=self._queue_put_timeout)
        if obj is _SHUTDOWNREQUEST:
            return

    def grow(self, amount):
        """Spawn new worker threads (not above self.max)."""
        if self.max > 0:
            budget = max(self.max - len(self._threads), 0)
        else:
            # self.max <= 0 indicates no maximum
            budget = float('inf')

        n_new = min(amount, budget)

        workers = [self._spawn_worker() for i in range(n_new)]
        while not all(worker.ready for worker in workers):
            time.sleep(.1)
        self._threads.extend(workers)

        ....
        
    def shrink(self, amount):
        """Kill off worker threads (not below self.min)."""
        [...]

    def stop(self, timeout=5):
        # Must shut down threads here so the code that calls
        # this method can know when all threads are stopped.
        [...]

能夠看出來,cherrypy的線程池將大小初始化爲10,每當有一個httpconnect進來時就將其放入任務隊列中,而後WorkerThread會不斷從任務隊列中取出任務執行,能夠看到這是一個很是標準的線程池模型。

進程

因爲Python的thread沒法利用多核,爲了充分利用多核CPU,Python可使用了多進程來模擬線程以提升併發的性能。Python的進程代價比較高能夠看作是另外再啓動一個python進程。

#server_pool.py

from socket import *
from fib import fib
from threading import Thread
from concurrent.futures import ProcessPoolExecutor as Pool #這裏用的python3的線程池,對應python2的threadpool

pool = Pool(4) #啓動一個大小爲4的進程池

def fib_server(address):
    sock = socket(AF_INET, SOCK_STREAM)
    sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    sock.bind(address)
    sock.listen(5)
    while True:
        client, addr = sock.accept()
        print('Connection', addr)
        Thread(target=fib_handler, args=(client,), daemon=True).start()
    
def fib_handler(client):
    while True:
        req = client.recv(100)
        if not req:
            break
        n = int(req)
        future = pool.submit(fib, n)
        result = future.result()
        resp = str(result).encode('ascii') + b'\n'
        client.send(resp)
    print('Closed')

fib_server(('', 25002))

性能測試

能夠看到新的server的qps爲:

  • 4613 reqs/sec
  • 4764 reqs/sec
  • 4619 reqs/sec
  • 4393 reqs/sec
  • 4768 reqs/sec
  • 4846 reqs/sec

這個結果遠低於前面的10w qps主要緣由是進程啓動速度較慢,進程池內部邏輯比較複雜,涉及到了數據傳輸,隊列等問題。

可是經過多進程咱們能夠保證每個連接相對獨立,不會受其餘請求太大的影響。

即便咱們使用如下耗時的命令也不會影響到性能測試

nc localhost 25502

40

協程

協程簡介

協程是一個古老的概念,最先出如今早期的os中,它出現的時間甚至比線程進程還要早。

協程也是一個比較難以理解和運用的併發方式,用協程寫出來的代碼比較難以理解。

python中使用yield和next來實現協程的控制。

def count(n):
    while(n > 0):
        yield n   #yield起到的做用是blocking,將代碼阻塞在這裏,生成一個generator,而後經過next調用。
        n -= 1
for i in count(5):
    print(i)
#能夠看到運行結果:
5
4
3
2
1

下面咱們經過例子來介紹如何書寫協程代碼。首先回到以前的代碼。首先咱們要想到咱們爲何要用線程,固然是爲了防止阻塞,
這裏的阻塞來自socket的IO和cpu佔用2個方面。協程的引入也是爲了防止阻塞,所以咱們先將代碼中的阻塞點標記出來。

#sever.py
#服務端代碼
from socket import *
from fib import fib

def fib_server(address):
    sock = socket(AF_INET, SOCK_STREAM)
    sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    sock.bind(address)
    sock.listen(5)
    while True:
        client, addr = sock.accept()  #blocking
        print('Connection', addr)
        fib_handler(client)

def fib_handler(client):
    while True:
        req = client.recv(100)    #blocking
        if not req:
            break
        n = int(req)
        result = fib(n)
        resp = str(result).encode('ascii') + b'\n'
        client.send(resp)    #blocking
    print('Closed')
    
fib_server(('', 25002)) #在25002端口啓動程序

上面標記了3個socket IO阻塞點,咱們先忽略CPU佔用。

  • 首先咱們在blocking點插入yield語句,這樣作的緣由就是,經過yield標記出blocking點以及blocking的緣由,這樣咱們就能夠在調度的時候實現noblocking,咱們調度的時候遇到yield語句而且block以後就能夠直接去執行其餘的請求而不用阻塞在這裏,這裏咱們也將實現一個簡單的noblocking調度方法。
#sever.py
#服務端代碼
from socket import *
from fib import fib

def fib_server(address):
    sock = socket(AF_INET, SOCK_STREAM)
    sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    sock.bind(address)
    sock.listen(5)
    while True:
        yield 'recv', sock
        client, addr = sock.accept()  #blocking
        print('Connection', addr)
        fib_handler(client)

def fib_handler(client):
    while True:
        yield 'recv', client
        req = client.recv(100)    #blocking
        if not req:
            break
        n = int(req)
        result = fib(n)
        resp = str(result).encode('ascii') + b'\n'
        yield 'send', client
        client.send(resp)    #blocking
    print('Closed')
    
fib_server(('', 25002)) #在25002端口啓動程序
  • 上述程序沒法運行,由於咱們尚未一個yield的調度器,程序只是單純的阻塞在了yield所標記的地方,這也是協程的一個好處,能夠人爲來調度,不像thread同樣亂序執行。下面是包含了調度器的代碼。
from socket import *
from fib import fib
from threading import Thread
from collections import deque
from concurrent.futures import ProcessPoolExecutor as Pool
from select import select

tasks = deque()  
recv_wait = {}
send_wait = {}
def run():
    while any([tasks, recv_wait, send_wait]):
        while not tasks:
            can_recv, can_send, _ = select(recv_wait, send_wait, [])
            for s in can_recv:
                tasks.append(recv_wait.pop(s))
            for s in can_send:
                tasks.append(send_wait.pop(s))         
        task = tasks.popleft()
        try:
            why, what = next(task)
            if why == 'recv':
                recv_wait[what] = task
            elif why == 'send':
                send_wait[what] = task
            else:
                raise RuntimeError("ARG!")
        except StopIteration:
            print("task done")

def fib_server(address):
    sock = socket(AF_INET, SOCK_STREAM)
    sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    sock.bind(address)
    sock.listen(5)
    while True:
        yield 'recv', sock
        client, addr = sock.accept()
        print('Connection', addr)
        tasks.append(fib_handler(client))
    
def fib_handler(client):
    while True:
        yield 'recv', client
        req = client.recv(100)
        if not req:
            break
        n = int(req)
        result = fib(n)
        resp = str(result).encode('ascii') + b'\n'
        yield 'send', client
        client.send(resp)
    print('Closed')

tasks.append(fib_server(('', 25003)))
run()
  • 咱們經過輪詢+select來控制協程,核心是用一個task queue來維護程序運行的流水線,用recv_wait和send_wait兩個字典來實現任務的分發。

性能測試

能夠看到新的server的qps爲:

  • (82262, 'reqs/sec')
  • (82915, 'reqs/sec')
  • (82128, 'reqs/sec')
  • (82867, 'reqs/sec')
  • (82284, 'reqs/sec')
  • (82363, 'reqs/sec')
  • (82954, 'reqs/sec')

與以前的thread模型性能比較接近,協程的好處是異步的,可是協程 仍然只能使用到一個CPU

當咱們讓服務器計算40的fib從而佔滿cpu時,qps迅速降低到了0。

tornado 基於協程的 python web框架

tornado是facebook出品的異步web框架,tornado中協程的使用比較簡單,利用coroutine.gen裝飾器能夠將本身的異步函數註冊進tornado的ioloop中,tornado異步方法通常的書寫方式爲:

@gen.coroutime
def post(self):
    resp = yield GetUser()
    self.write(resp)

tornado異步原理

def start(self):
    """Starts the I/O loop.
    The loop will run until one of the I/O handlers calls stop(), which
    will make the loop stop after the current event iteration completes.
    """
    self._running = True
    while True:
    [ ... ]
        if not self._running:
            break
        [ ... ]
        try:
            event_pairs = self._impl.poll(poll_timeout)
        except Exception, e:
            if e.args == (4, "Interrupted system call"):
                logging.warning("Interrupted system call", exc_info=1)
                continue
            else:
                raise
        # Pop one fd at a time from the set of pending fds and run
        # its handler. Since that handler may perform actions on
        # other file descriptors, there may be reentrant calls to
        # this IOLoop that update self._events
        self._events.update(event_pairs)
        while self._events:
            fd, events = self._events.popitem()
            try:
                self._handlers[fd](fd, events)
            except KeyboardInterrupt:
                raise
            except OSError, e:
                if e[0] == errno.EPIPE:
                    # Happens when the client closes the connection
                    pass
                else:
                    logging.error("Exception in I/O handler for fd %d",
                                  fd, exc_info=True)
            except:
                logging.error("Exception in I/O handler for fd %d",fd, exc_info=True)

這是tornado異步調度的核心主循環,poll()方法返回一個形如(fd: events)的鍵值對,並賦值給event_pairs變量,在內部的while循環中,event_pairs中的內容被一個一個的取出,而後相應的處理器會被調用,tornado經過下面的函數講socket註冊進epoll中。tornado在linux默認選擇epoll,在windows下默認選擇select(只能選擇select)。

def add_handler(self, fd, handler, events):
    """Registers the given handler to receive the given events for fd."""
    self._handlers[fd] = handler
    self._impl.register(fd, events | self.ERROR)

cherrypy線程池與tornado協程的比較

咱們經過最簡單程序運行在單機上進行性能比較

測試的語句爲:

ab -c 100 -n 1000 -k localhost:8080/ | grep "Time taken for tests:"

其中cherrypy的表現爲:

  • Completed 100 requests
  • Completed 200 requests
  • Completed 300 requests
  • Completed 400 requests
  • Completed 500 requests
  • Completed 600 requests
  • Completed 700 requests
  • Completed 800 requests
  • Completed 900 requests
  • Completed 1000 requests
  • Finished 1000 requests

Time taken for tests: 10.773 seconds

tornado的表現爲:

  • Completed 100 requests
  • Completed 200 requests
  • Completed 300 requests
  • Completed 400 requests
  • Completed 500 requests
  • Completed 600 requests
  • Completed 700 requests
  • Completed 800 requests
  • Completed 900 requests
  • Completed 1000 requests
  • Finished 1000 requests

Time taken for tests: 0.377 seconds

能夠看出tornado的性能仍是很是驚人的,當應用程序涉及到異步IO仍是要儘可能使用tornado

總結

本文主要介紹了python的線程、進程和協程以及其應用,並對這幾種模型進行了簡單的性能分析,python因爲GIL的存在,不論是線程仍是協程都不能利用到多核。

  • 對於計算密集型的web app線程模型與協程模型的性能大體同樣,線程因爲調度受操做系統管理,其性能略好。
  • 對於IO密集型的web app協程模型性能會有很大的優點。

參考文獻

相關文章
相關標籤/搜索