咱們經過一個簡單web server程序來觀察python的線程,首先寫一個耗時的小函數python
def fib(n): if n <= 2: return 1 else: return fib(n - 1) + fib(n - 2)
而後寫一個fib web server,程序比較簡單就不解釋了。linux
from socket import * from fib import fib def fib_server(address): sock = socket(AF_INET, SOCK_STREAM) sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) sock.bind(address) sock.listen(5) while True: client, addr = sock.accept() print('Connection', addr) fib_handle(client) def fib_handler(client): while True: req = client.recv(100) if not req: break n = int(req) result = fib(n) resp = str(result).encode('ascii') + b'\n' client.send(resp) print('Closed') fib_server(('', 25002))
運行shell命令能夠看到計算結果web
nc localhost 25002shell
10編程
55windows
因爲服務段是單線程的,若是另外啓動一個鏈接將得不到計算結果安全
nc localhost 25002服務器
10多線程
爲了能讓咱們的server支持多個請求,咱們對服務端代碼加入多線程支持併發
#sever.py #服務端代碼 from socket import * from fib import fib from threading import Thread def fib_server(address): sock = socket(AF_INET, SOCK_STREAM) sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) sock.bind(address) sock.listen(5) while True: client, addr = sock.accept() print('Connection', addr) #fib_handler(client) Thread(target=fib_handler, args=(client,), daemon=True).start() #須要在python3下運行 def fib_handler(client): while True: req = client.recv(100) if not req: break n = int(req) result = fib(n) resp = str(result).encode('ascii') + b'\n' client.send(resp) print('Closed') fib_server(('', 25002)) #在25002端口啓動程序
運行shell命令能夠看到計算結果
nc localhost 25002
10
55
因爲服務端是多線程的,啓動一個新鏈接將獲得計算結果
nc localhost 25002
10
55
咱們加入一段性能測試代碼
#perf1.py from socket import * from threading import Thread import time sock = socket(AF_INET, SOCK_STREAM) sock.connect(('localhost', 25002)) n = 0 def monitor(): global n while True: time.sleep(1) print(n, 'reqs/sec') n = 0 Thread(target=monitor).start() while True: start = time.time() sock.send(b'1') resp = sock.recv(100) end = time.time() n += 1 #代碼很是簡單,經過全局變量n來統計qps(req/sec 每秒請求數)
在shell中運行perf1.py能夠看到結果以下:
平均每秒請求數大概是10w左右
若是咱們另外啓動一個進程來進行性能測試就會發現python的GIL對線程形成的影響
python3 perf1.py
而且原來的shell中的qps也是相似結果
若是咱們再運行
nc localhost 25002
40
來徹底佔用服務器資源一段時間,就能夠看到shell窗口內的rqs迅速降低到
這也反映了Python的GIL的一個特色,會優先處理佔用CPU資源大的任務
具體緣由我也不知道,可能須要閱讀GIL實現源碼才能知道。
python有個庫叫作cherrypy,最近用到,大體瀏覽了一下其源代碼,其內核使用的是python線程池技術。
cherrypy經過Python線程安全的隊列來維護線程池,具體實現爲:
class ThreadPool(object): """A Request Queue for an HTTPServer which pools threads. ThreadPool objects must provide min, get(), put(obj), start() and stop(timeout) attributes. """ def __init__(self, server, min=10, max=-1, accepted_queue_size=-1, accepted_queue_timeout=10): self.server = server self.min = min self.max = max self._threads = [] self._queue = queue.Queue(maxsize=accepted_queue_size) self._queue_put_timeout = accepted_queue_timeout self.get = self._queue.get def start(self): """Start the pool of threads.""" for i in range(self.min): self._threads.append(WorkerThread(self.server)) for worker in self._threads: worker.setName('CP Server ' + worker.getName()) worker.start() for worker in self._threads: while not worker.ready: time.sleep(.1) .... def put(self, obj): self._queue.put(obj, block=True, timeout=self._queue_put_timeout) if obj is _SHUTDOWNREQUEST: return def grow(self, amount): """Spawn new worker threads (not above self.max).""" if self.max > 0: budget = max(self.max - len(self._threads), 0) else: # self.max <= 0 indicates no maximum budget = float('inf') n_new = min(amount, budget) workers = [self._spawn_worker() for i in range(n_new)] while not all(worker.ready for worker in workers): time.sleep(.1) self._threads.extend(workers) .... def shrink(self, amount): """Kill off worker threads (not below self.min).""" [...] def stop(self, timeout=5): # Must shut down threads here so the code that calls # this method can know when all threads are stopped. [...]
能夠看出來,cherrypy的線程池將大小初始化爲10,每當有一個httpconnect進來時就將其放入任務隊列中,而後WorkerThread會不斷從任務隊列中取出任務執行,能夠看到這是一個很是標準的線程池模型。
因爲Python的thread沒法利用多核,爲了充分利用多核CPU,Python可使用了多進程來模擬線程以提升併發的性能。Python的進程代價比較高能夠看作是另外再啓動一個python進程。
#server_pool.py from socket import * from fib import fib from threading import Thread from concurrent.futures import ProcessPoolExecutor as Pool #這裏用的python3的線程池,對應python2的threadpool pool = Pool(4) #啓動一個大小爲4的進程池 def fib_server(address): sock = socket(AF_INET, SOCK_STREAM) sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) sock.bind(address) sock.listen(5) while True: client, addr = sock.accept() print('Connection', addr) Thread(target=fib_handler, args=(client,), daemon=True).start() def fib_handler(client): while True: req = client.recv(100) if not req: break n = int(req) future = pool.submit(fib, n) result = future.result() resp = str(result).encode('ascii') + b'\n' client.send(resp) print('Closed') fib_server(('', 25002))
能夠看到新的server的qps爲:
這個結果遠低於前面的10w qps主要緣由是進程啓動速度較慢,進程池內部邏輯比較複雜,涉及到了數據傳輸,隊列等問題。
可是經過多進程咱們能夠保證每個連接相對獨立,不會受其餘請求太大的影響。
即便咱們使用如下耗時的命令也不會影響到性能測試
nc localhost 25502
40
協程是一個古老的概念,最先出如今早期的os中,它出現的時間甚至比線程進程還要早。
協程也是一個比較難以理解和運用的併發方式,用協程寫出來的代碼比較難以理解。
python中使用yield和next來實現協程的控制。
def count(n): while(n > 0): yield n #yield起到的做用是blocking,將代碼阻塞在這裏,生成一個generator,而後經過next調用。 n -= 1 for i in count(5): print(i) #能夠看到運行結果: 5 4 3 2 1
下面咱們經過例子來介紹如何書寫協程代碼。首先回到以前的代碼。首先咱們要想到咱們爲何要用線程,固然是爲了防止阻塞,
這裏的阻塞來自socket的IO和cpu佔用2個方面。協程的引入也是爲了防止阻塞,所以咱們先將代碼中的阻塞點標記出來。
#sever.py #服務端代碼 from socket import * from fib import fib def fib_server(address): sock = socket(AF_INET, SOCK_STREAM) sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) sock.bind(address) sock.listen(5) while True: client, addr = sock.accept() #blocking print('Connection', addr) fib_handler(client) def fib_handler(client): while True: req = client.recv(100) #blocking if not req: break n = int(req) result = fib(n) resp = str(result).encode('ascii') + b'\n' client.send(resp) #blocking print('Closed') fib_server(('', 25002)) #在25002端口啓動程序
上面標記了3個socket IO阻塞點,咱們先忽略CPU佔用。
#sever.py #服務端代碼 from socket import * from fib import fib def fib_server(address): sock = socket(AF_INET, SOCK_STREAM) sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) sock.bind(address) sock.listen(5) while True: yield 'recv', sock client, addr = sock.accept() #blocking print('Connection', addr) fib_handler(client) def fib_handler(client): while True: yield 'recv', client req = client.recv(100) #blocking if not req: break n = int(req) result = fib(n) resp = str(result).encode('ascii') + b'\n' yield 'send', client client.send(resp) #blocking print('Closed') fib_server(('', 25002)) #在25002端口啓動程序
from socket import * from fib import fib from threading import Thread from collections import deque from concurrent.futures import ProcessPoolExecutor as Pool from select import select tasks = deque() recv_wait = {} send_wait = {} def run(): while any([tasks, recv_wait, send_wait]): while not tasks: can_recv, can_send, _ = select(recv_wait, send_wait, []) for s in can_recv: tasks.append(recv_wait.pop(s)) for s in can_send: tasks.append(send_wait.pop(s)) task = tasks.popleft() try: why, what = next(task) if why == 'recv': recv_wait[what] = task elif why == 'send': send_wait[what] = task else: raise RuntimeError("ARG!") except StopIteration: print("task done") def fib_server(address): sock = socket(AF_INET, SOCK_STREAM) sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) sock.bind(address) sock.listen(5) while True: yield 'recv', sock client, addr = sock.accept() print('Connection', addr) tasks.append(fib_handler(client)) def fib_handler(client): while True: yield 'recv', client req = client.recv(100) if not req: break n = int(req) result = fib(n) resp = str(result).encode('ascii') + b'\n' yield 'send', client client.send(resp) print('Closed') tasks.append(fib_server(('', 25003))) run()
能夠看到新的server的qps爲:
與以前的thread模型性能比較接近,協程的好處是異步的,可是協程 仍然只能使用到一個CPU
當咱們讓服務器計算40的fib從而佔滿cpu時,qps迅速降低到了0。
tornado是facebook出品的異步web框架,tornado中協程的使用比較簡單,利用coroutine.gen裝飾器能夠將本身的異步函數註冊進tornado的ioloop中,tornado異步方法通常的書寫方式爲:
@gen.coroutime def post(self): resp = yield GetUser() self.write(resp)
def start(self): """Starts the I/O loop. The loop will run until one of the I/O handlers calls stop(), which will make the loop stop after the current event iteration completes. """ self._running = True while True: [ ... ] if not self._running: break [ ... ] try: event_pairs = self._impl.poll(poll_timeout) except Exception, e: if e.args == (4, "Interrupted system call"): logging.warning("Interrupted system call", exc_info=1) continue else: raise # Pop one fd at a time from the set of pending fds and run # its handler. Since that handler may perform actions on # other file descriptors, there may be reentrant calls to # this IOLoop that update self._events self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() try: self._handlers[fd](fd, events) except KeyboardInterrupt: raise except OSError, e: if e[0] == errno.EPIPE: # Happens when the client closes the connection pass else: logging.error("Exception in I/O handler for fd %d", fd, exc_info=True) except: logging.error("Exception in I/O handler for fd %d",fd, exc_info=True)
這是tornado異步調度的核心主循環,poll()方法返回一個形如(fd: events)的鍵值對,並賦值給event_pairs變量,在內部的while循環中,event_pairs中的內容被一個一個的取出,而後相應的處理器會被調用,tornado經過下面的函數講socket註冊進epoll中。tornado在linux默認選擇epoll,在windows下默認選擇select(只能選擇select)。
def add_handler(self, fd, handler, events): """Registers the given handler to receive the given events for fd.""" self._handlers[fd] = handler self._impl.register(fd, events | self.ERROR)
咱們經過最簡單程序運行在單機上進行性能比較
測試的語句爲:
ab -c 100 -n 1000 -k localhost:8080/ | grep "Time taken for tests:"
其中cherrypy的表現爲:
Time taken for tests: 10.773 seconds
tornado的表現爲:
Time taken for tests: 0.377 seconds
能夠看出tornado的性能仍是很是驚人的,當應用程序涉及到異步IO仍是要儘可能使用tornado
本文主要介紹了python的線程、進程和協程以及其應用,並對這幾種模型進行了簡單的性能分析,python因爲GIL的存在,不論是線程仍是協程都不能利用到多核。
略