Python的併發實現有三種方法。python
串行:同時只能執行單個任務
並行:同時執行多個任務數據庫
在Python中,雖然嚴格說來多線程與協程都是串行的,但其效率高,在遇到阻塞時會將阻塞任務交給系統執行,經過合理調度任務,使得程序高效。編程
最高效的固然是多進程了,但因爲多進程依賴硬件配置,而且當任務量超過CPU核心數時,多進程會有進程上下文切換開銷,而這個開銷很大,因此不是最佳解決方案。安全
多線程對比單線程,因爲GIL的存在,切換線程須要不斷加鎖、釋放鎖,效率反而更低;多進程至關於多個CPU同時工做,所以效率很高。網絡
IO密集型能夠是磁盤IO、網絡IO、數據庫IO等,都屬於計算量小,IO等待浪費高。越是IO等待時間長,則多線程的優點相比單線程越明顯,多進程效率高但依賴配置資源。多線程
單線程老是最慢的,多線程適合在IO密集型場景使用,多進程適合CPU計算要求高的場景下使用,多進程雖然老是最快的,但須要CPU資源支持。併發
Python建立多線程有兩種方法。app
from threading import Thread def func(): for i in range(2): print('Hello world!') sleep(1) th1 = Thread(target=func) th1.start() th2 = Thread(target=func) th2.start()
這個類必須繼承Thread,必須重載run()
方法框架
from threading import Thread class MyThread(Thread): def __init__(self): super().__init__() self.name = 'Bob' def run(self): for i in range(2): print('Hello world!') sleep(1) th1 = MyThread() th2 = MyThread() th1.start() th2.start()
import threading lock = threading.Lock() # 生成鎖,全局惟一 lock.acquire() # 加鎖 lock.release() # 釋放鎖
加鎖與解鎖必須成對出現,或者使用上下文管理器with
來管理鎖。異步
在Redis分佈式鎖中提到過,用於讓非阻塞線程重複得到鎖來發送或讀取數據,這裏的可重入鎖僅指讓同一線程能夠屢次獲取鎖。
import threading rlock = threading.RLock() # 生成可重入鎖
死鎖一般有兩種。
多進程是真正的並行,而多線程是僞並行,實際是多個線程交替執行。
遇到GIL影響性能的狀況,要麼考慮用多進程替代多線程,要麼更換Python解釋器。
經常使用線程通訊方法。
threading.Event
threading.Condition
queue.Queue
import threading event = threading.Event() event.clear() # 重置event,使全部該event事件都處於待命狀態 event.wait() # 等待接收event指令,決定是否阻塞程序執行 evnet.set() # 發送event指令,全部該event事件的線程開始執行
import time import threading class MyThread(threading.Thread): def __init__(self, name, event): super().__init__() self.name = name self.event = event def run(self): self.event.wait() # 等待event.set()才能執行下去 time.sleep(1) print('{} Done'.format(self.name)) threads = [] event = threading.Event() for i in range(5): threads.append(MyThread(event)) event.clear() # 重置event,使event.wait()生效 for t in threads: t.start() print('Waiting 3s') time.sleep(3) print('Awake all threads') event.set() # 發送event指令,全部綁定了event的線程開始執行
全部線程在調用start()
方法後並不會執行完,而是在event.wait()
處停住了,須要發送event.set()
指令才能繼續執行。
import threading cond = threading.Condition() cond.acquire() cond.release() cond.wait() # 等待指令觸發,同時臨時釋放鎖,直到調用notify才從新佔有鎖 cond.notify() # 發送指令
Condition與Event很相似,不過因爲wait()
與notify()
能夠反覆調用,所以通常做爲編程人員可控調用鎖來使用,放在run()
方法下。
隊列是線程安全的,經過put()
和get()
方法來操做隊列。
from queue import Queue q = Queue(maxsize=0) # 設置0表示無限長隊列 q.get(timeout=0.5) # 阻塞程序,等待隊列消息,能夠設置超時時間 q.put() # 發送消息 q.join() # 等待全部消息被消費完 # 不經常使用但要了解的方法 q.qsize() # 返回消息個數 q.empty() # 返回bool值,隊列是否空 q.full() # 返回bool值,隊列是否滿
Queue
是FIFO隊列,還有queue.LifoQueue
,queue.PriorityQueue
。
兩個線程的變量不能被相互訪問。
一般使用threading.local
類來實現,該類的實例是一個字典型對象,直接經過key-value形式存入變量,如threading.local().name = 'bob'
。
若是想要實現一個線程內的全局變量或實現線程間的信息隔離,就使用local類。
多線程並非越多越好,由於在切換線程時會切換上下文環境(固然相比多進程的開銷要小的多),在量大時依然會形成CPU的開銷。
所以出現了線程池的概念,即預先建立好合適數量的線程,使任務能馬上使用。
經過concurrent.futures
庫的ThreadPoolExecutor
類來實現。
import time import threading from concurrent.futures import ThreadPoolExecutor def target(): for i in range(5): print('{}-{}\n'.format(threading.get_ident(), i) time.sleep(1) pool = ThreadPoolExecutor(5) # 線程池數量限制爲5 for i in range(100): pool.submit(target) # 往線程中提交併運行
學習協程,要先理解生成器,由於Python的協程是從生成器中誕生並演變到如今這個樣子的。
可迭代對象,其類或元類都實現了__iter__()
方法,而該方法返回一個對象支持迭代,既能夠是string/list/tuple/dict等內置類型的對象,也能夠是本身寫的對象(這個對象的類實現了遍歷元素的__iter__
方法)。
迭代器對象,可迭代對象是迭代器的基礎,迭代器只是比可迭代對象多了一個__next__()
方法,這個方法讓咱們能夠再也不用for循環來獲取元素。
生成器對象,在迭代器的基礎上,實現了yield
,至關於函數中的return
,在每次for循環遍歷或調用next()時,都會返回一個值並阻塞等待下一次調用。
可迭代對象、迭代器都是將全部元素放在內存裏,而生成器則是須要時臨時生成元素,因此生成器節省時間、空間。
兩個方法。
next()
send(None)
這兩個方法是等價的,但因爲send方法能夠傳值進去,因此在協程中大有用處。
經過inspect
庫的getgeneratorstate
方法獲取狀態信息。
StopIteration
生成器引入了函數暫停執行(yield)功能,後來又引入了向暫停的生成器發送信息的功能(send),並以此催生了協程。
協程是爲非搶佔式多任務產生子程序的計算機程序組件,協程容許不一樣入口點在不一樣位置暫停或開始執行程序。
協程和線程有類似點,多個協程之間與線程同樣,只會交叉串行執行;也有不一樣點,線程之間要頻繁切換,加鎖、解鎖,協程不須要。
協程經過yield暫停生成器,將程序的執行流程交給其它子程序,從而實現不一樣子程序之間的交替執行。
經過例子演示如何向生成器發送信息。
def func(n): index = 0 while index < n: num = yield index # 這裏分紅兩部分,yield index將index return給外部程序, num = yield接受外部send的信息並賦值給num if num is None: num = 1 index += num f = func(5) print(next(f)) # 0 print(f.send(2)) # 2 print(next(f)) # 3 print(f.send(-1)) # 2
yield from
語法從Python3.3纔出現的語法。
yield from
後面須要添加可迭代對象(迭代器、生成器固然知足要求)。
# 拼接一個可迭代對象 # 使用yield astr = 'ABC' alist = [1, 2, 3] adict = dict(name='kct', age=18) agen = (i for i in range(5)) def gen(*args): for item in args: for i in item: yield i new_list = gen(astr, alist, adict, agen) print("use yield:", list(new_list)) # 使用yield from def gen(*args): for item in args: yield from item new_flist = fgen(astr, alist, adict, agen) print("use yield from:", list(new_flist))
能夠看出,使用yield from
能夠直接從可迭代對象中yield全部元素,減小了一個for循環,代碼更簡潔,固然yield from
不止作了這件事。
yield from
後能夠接生成器,以此造成生成器嵌套,yield from
就幫咱們處理了各類異常,讓咱們只需專心於業務代碼便可。
具體講解yield from
前先了解幾個概念:
yield from
表達式的生成器函數yield from
後接的生成器函數舉個例子,實時計算平均值
# 子生成器 def average_gen(): total = 0 count = 0 average = 0 while True: num = yield average count += 1 total += num average = total/count # 委託生成器 def proxy_gen(): while True: yield from average_gen() # 調用函數 def main(): get_average = proxy_gen() next(get_average) # 第一次調用不傳值,讓子生成器開始運行 print(get_average.send(10)) # 10 print(get_average.send(20)) # 15 print(get_average.send(30)) # 20
委託生成器的做用是在調用函數與子生成器之間創建一個雙向通訊通道,調用函數能夠send消息給子生成器,子生成器yield值也是直接返回給調用函數。
有時會在yield from
前做賦值操做,這是用於作結束操做,改造上面的例子。
# 子生成器 def average_gen(): total = 0 count = 0 average = 0 while True: num = yield average if num is None: break count += 1 total += num average = total/count return total, count, average # 當協程結束時,調用return # 委託生成器 def proxy_gen(): while True: total, count, average = yield from average_gen() # 只有子生成器的協程結束了纔會進行賦值,後面的語句纔會執行 print('Count for {} times, Total is {}, Average is {}'.format(count, total, average)) # 調用函數 def main(): get_average = proxy_gen() next(get_average) # 第一次調用不傳值,讓子生成器開始運行 print(get_average.send(10)) # 10 print(get_average.send(20)) # 15 print(get_average.send(30)) # 20 get_average.send(None) # 結束協程,若是後面再調用send,將會另起一協程
yield from
作了全面的異常處理。直接調用子生成器,首先就要處理StopIteration異常,其次若子生成器不是協程生成器而是迭代器,則會有其它異常拋出,所以須要知道,委託生成器在這之中扮演着重要角色,不可忽略。
asyncio
asyncio
是Python3.4引入的標準庫,直接內置對異步IO的支持。
雖然學了yield
和yield from
,但仍是不知如何入手去作併發,asyncio
則是爲了提供這麼個框架來精簡複雜的代碼操做。
經過前面學習,咱們知道調用函數/委託生成器/子生成器這三劍客中,子生成器就是協程,那麼asyncio
如何來定義建立協程呢?
asyncio
經過在函數定義前增長async
關鍵字來定義協程對象,經過isinstance(obj, Coroutine)
便可判斷是不是協程,這個協程類從collections.abc
包導入。
咱們也知道,生成器是協程的基礎,那麼有什麼辦法將生成器變成協程來使用?
經過@asyncio.coroutine
裝飾器能夠標記生成器函數爲協程對象,可是經過isinstance(obj, Generator)
、isinstance(obj, Coroutine)
仍然能夠看到,這個生成器函數只是被標記爲協程了,但其本質依然是生成器。
async
關鍵字定義的函數,調用它不會當即執行函數,而是返回一個協程對象,這個協程對象須要註冊到事件循環中,由事件循環調用;import asyncio async def hello(name): print('Hello, ', name) coroutine = hello('World') # 建立事件循環 loop = asyncio.get_event_loop() # 將協程轉換爲任務 task = loop.create_task(coroutine) # 將任務放入事件循環對象中觸發 loop.run_until_complete(task)
await
和yield
這二者都能實現暫停的效果,但功能是不兼容的,在生成器中不能用await
,在async
定義的協程中不能用yield
。
而且,yield from
後可接可迭代對象、迭代器、生成器、future對象、協程對象,await
後只能接future對象、協程對象。
前面咱們知道經過async
能夠定義一個協程對象,那麼如何建立一個future對象呢?
答案是經過task,只須要建立一個task對象便可。
# 在前一個例子中,咱們先建立了事件循環,而後經過事件循環建立了task,咱們來測試下 import asyncio from asyncio.futures import Future async def hello(name): print('Hello, ', name) coroutine = hello('World') # 建立事件循環 loop = asyncio.get_event_loop() # 將協程轉換爲任務 task = loop.create_task(coroutine) print(isinstance(task, Future)) # 結果是True # 不創建事件循環的方法 task = asyncio.ensure_future(coroutine) print(isinstance(task, Future)) # 結果也是True
知道了建立future對象(也便是建立task對象)的方法,那麼咱們驗證下await
和yield
後接coroutine和future對象。
import sys import asyncio async def f1(): await asyncio.sleep(2) return 'Hello, {}'.format(sys._getframe().f_code.co_name) @asyncio.coroutine def f2(): yield from asyncio.sleep(2) return 'Hello, {}'.format(sys._getframe().f_code.co_name) async def f3(): await asyncio.ensure_future(asyncio.sleep(2)) return 'Hello, {}'.format(sys._getframe().f_code.co_name) @asyncio.coroutine def f4(): yield from asyncio.ensure_future(asyncio.sleep(2)) return 'Hello, {}'.format(sys._getframe().f_code.co_name) tasks = [ asyncio.ensure_future(f1()), asyncio.ensure_future(f2()), asyncio.ensure_future(f3()), asyncio.ensure_future(f4()) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) for task in tasks: print(task.result()) loop.close()
異步IO都是在IO高的地方掛起,等IO操做結束後再繼續執行,大多數時候,咱們後續的代碼執行都是須要依賴IO的返回值的,此時就要用到回調了。
回調的實現有兩種方式。
這種方法要求咱們可以取得協程的await的返回值。經過task
對象的result()
方法能夠得到返回結果。
import time import asyncio async def _sleep(x): time.sleep(x) return 'Stopped {} seconds!'.format(x) coroutine = _sleep(2) loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) loop.run_until_complete(task) # 直接經過task獲取任務結果 print('Result: {}'.format(task.result()))
asyncio
自帶的添加回調函數功能實現import time import asyncio async def _sleep(x): time.sleep(x) return 'Stopped {} seconds!'.format(x) def callback(future): print('Result: {}'.format(future.result())) coroutine = _sleep(2) loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) # 添加回調函數 task.add_done_callback(callback) loop.run_until_complete(task)
asyncio
實現併發,就須要多個協程來完成任務,前面作await
和yield
的驗證時就用了併發。
每當有任務阻塞的時候就await
,而後其餘協程繼續工做。
# 協程函數 async def worker(n): print('Waiting: {}'.format(n)) await asyncio.sleep(n) return 'Done {}'.format(n) # 協程對象 c1 = worker(1) c2 = worker(2) c3 = worker(4) # 協程轉換爲task tasks = [ asyncio.ensure_future(c1), asyncio.ensure_future(c2), asyncio.ensure_future(c3) ] loop = asyncio.get_event_loop()
有兩種方法,這兩種方法的區別後面說。
return
的結果能夠經過task.result()
查看。
# asyncio.wait() loop.run_until_complete(asyncio.wait(tasks)) # asyncio.gather() loop.run_until_complete(asyncio.gather(*tasks)) # *不能省略 # 查看結果 for task in tasks: print('Result: {}'.format(task.result()))
使用async
能夠定義協程,協程用於耗時的IO操做,咱們也能夠封裝更多的IO操做過程,實現一個協程中await
另外一個協程,實現協程的嵌套。
# 內部協程函數 async def worker(n): print('Waiting: {}'.format(n)) await asyncio.sleep(n) return 'Done {}'.format(n) # 外部協程函數 async def main(): c1 = worker(1) c2 = worker(2) c3 = worker(4) tasks = [ asyncio.ensure_future(c1), asyncio.ensure_future(c2), asyncio.ensure_future(c3) ] dones, pendings = await asyncio.wait(tasks) for task in tasks: print('Result: {}'.format(task.result())) loop = asyncio.get_event_loop() loop.run_until_complete(main())
若是外部協程使用的asyncio.gather()
,那麼做以下替換。
results = await asyncio.gather(*tasks) for result in results: print('Result: {}'.format(result))
講生成器時提到了四種狀態,對協程咱們也瞭解一下其狀態(準確地說是future/task對象的狀態)。
wait接收的tasks,必須是一個list
對象,該list
對象中存放多個task
,既能夠經過asyncio.ensure_future
轉爲task
對象也能夠不轉。
gather也能夠接收list
對象,但*
不能省,也能夠直接將多個task
做爲可變長參數傳入,參數能夠是協程對象或future對象。
wait返回dones
和pendings
,前者表示已完成的任務,後者表示未完成的任務,須要經過task.result()
手工獲取結果。
gather直接將值返回。
# FIRST_COMPLETED:完成第一個任務就返回 # FIRST_EXCEPTION:產生第一個異常就返回 # ALL_COMPLETED:全部任務完成再返回(默認選項) dones, pendings = loop.run_until_complete( asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)) # 控制運行時間:1秒後返回 dones, pendings = loop.run_until_complete( asyncio.wait(tasks, timeout=1))
在asyncio
中如何動態添加協程到事件循環中?
兩種方法,一種是同步的,一種是異步的。
import time import asyncio from queue import Queue from threading import Thread # 在後臺永遠運行的事件循環 def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() def do_sleep(x, queue, msg=""): time.sleep(x) queue.put(msg) queue = Queue() new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.start() print(time.ctime()) # 動態添加兩個協程 # 這種方法在主線程是同步的 new_loop.call_soon_threadsafe(do_sleep, 6, queue, 'First') new_loop.call_soon_threadsafe(do_sleep, 3, queue, 'Second') while True: msg = queue.get() print('{} is done'.format(msg)) print(time.ctime())
import time import asyncio from queue import Queue from threading import Thread # 在後臺永遠運行的事件循環 def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() async def do_sleep(x, queue, msg=""): await asyncio.sleep(x) queue.put(msg) queue = Queue() new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.start() print(time.ctime()) # 動態添加兩個協程 # 這種方法在主線程是異步的 asyncio.run_coroutine_threadsafe(do_sleep(6, queue, 'First'), new_loop) asyncio.run_coroutine_threadsafe(do_sleep(3, queue, 'Second'), new_loop) while True: msg = queue.get() print('{} is done'.format(msg)) print(time.ctime())