我的理解,併發是計算機在邏輯上能處理多任務的能力。通常分類三種類型:編程
這裏還有一個概念須要注意,在使用併發的時候弄清楚須要併發的任務是計算密集仍是IO密集。
由於異步對於計算密集的任務是無效的。由於異步的本質是 IO 操做過程當中阻塞時的控制權交換。在計算密集的任務中是沒有這樣的阻塞的。segmentfault
前面說了異步的本質是控制權的交換,這裏經過一個生產者消費者模型的例子來體會一下這麼個過程。網絡
def consumer(): # 定義消費者,因爲有yeild關鍵詞,此消費者爲一個生成器 print("[Consumer] Init Consumer ......") r = "init ok" # 初始化返回結果,並在啓動消費者時,返回給生產者 while True: n = yield r # 消費者經過yield接收生產者的消息,同時返給其結果 print("[Consumer] conusme n = %s, r = %s" % (n, r)) r = "consume %s OK" % n # 消費者消費結果,下個循環返回給生產者 def produce(c): # 定義生產者,此時的 c 爲一個生成器 print("[Producer] Init Producer ......") r = c.send(None) # 啓動消費者生成器,同時第一次接收返回結果 print("[Producer] Start Consumer, return %s" % r) n = 0 while n < 5: n += 1 print("[Producer] While, Producing %s ......" % n) r = c.send(n) # 向消費者發送消息並準備接收結果。此時會切換到消費者執行 print("[Producer] Consumer return: %s" % r) c.close() # 關閉消費者生成器 print("[Producer] Close Producer ......") produce(consumer())
# 異步IO例子:適配Python3.5,使用async和await關鍵字 async def hello(index): # 經過關鍵字async定義協程 print('Hello world! index=%s, thread=%s' % (index, threading.currentThread())) await asyncio.sleep(1) # 模擬IO任務 print('Hello again! index=%s, thread=%s' % (index, threading.currentThread())) loop = asyncio.get_event_loop() # 獲得一個事件循環模型 tasks = [hello(1), hello(2)] # 初始化任務列表 loop.run_until_complete(asyncio.wait(tasks)) # 執行任務 loop.close() # 關閉事件循環列表
async def get(url): async with aiohttp.ClientSession() as session: async with session.get(url) as resp: print(url, resp.status) print(url, await resp.text()) loop = asyncio.get_event_loop() # 獲得一個事件循環模型 tasks = [ # 初始化任務列表 get("http://zhushou.360.cn/detail/index/soft_id/3283370"), get("http://zhushou.360.cn/detail/index/soft_id/3264775"), get("http://zhushou.360.cn/detail/index/soft_id/705490") ] loop.run_until_complete(asyncio.wait(tasks)) # 執行任務 loop.close() # 關閉事件循環列表
這裏以進程的multiprocessing
模塊舉例,線程可使用multiprocessing.dummy
,全部的API均相同。session
import multiprocessing as mp ############## 直接實例化 ############ def func(number): result = number * 2 p = mp.Process(target=func, args=(3, )) #實例化進程對象 p.start() #運行進程 ############ 類封裝 ############# class MyProcess(mp.Process): def __init__(self, interval): mp.Process.__init__(self) # 須要重載的函數 def run(self): print('I'm running) p = MyProcess(1) p.start() ################################# p.terminal() # 主動結束進程 p.join() #讓主進程等待子進程結束 # 一些經常使用的屬性 p.pid #得到進程的id號 p.name #得到進程名 p.is_alive() #判斷進程是否還存活 p.daemon = True #設置進程隨主進程一塊兒結束 mp.active_children() #得到當前進程的全部子進程 mp.current_process() #返回正在運行的進程 os.getpid() #得到當前進程的pid
from multiprocessing.dummy import Pool as ThreadPool tasks = list() def do_task(item): return item pool = ThreadPool(3) ################ 原始操做 ####################### for item in items: pool.apply_async(do_task, (item,)) #添加進程,非阻塞,返回執行結果 pool.apply(do_task, (item,)) #阻塞 ############## map操做 #####################3 results = pool.map(do_task, items) ################################ pool.close() #關閉進程池後不會有新的進程被建立 pool.join() #等到結束,必須在close後使用
# Lock(鎖) # 限制對資源的訪問 def func(lock): #使用with with lock: print('I got lock') def func(lock): #不使用with lock.acquire() #請求鎖 try: print('I got lock') finally: lock.release() #釋放鎖 lock = mp.Lock() #申請鎖 p = mp.Process(target=func, args=(lock,)) p.start() ############################################ # Semaphore(信號量) # 限制資源的最大鏈接數 def func(s): s.aquire() #請求鏈接 s.release() #斷開鏈接 s = mp.Semaphore(2) #定義信號量的最大鏈接數 for i in range(5): p = mp.Process(target=func, arg=(s)) p.start ############################################ # Event(事件) # 進程間同步 def func(e): e.wait() #定義等待時間,默認等待到e.set()爲止,阻塞 e.is_set() #判斷消息是否被髮出 print('got') e = mp.Event() p = mp.Process(target=func, args=(e,)) p.start() e.set() #發出消息 ############################################ # Queue(隊列) # 多進程之間的數據傳遞 import Queue Queue.Queue(maxsize = 0) # 先進先出, maxsize小於等於則不限大小 Queue.LifoQueue(maxsize = 0) # 後進先出 Queue.PriorityQueue(maxsize = 0) # 構造一個優先級隊列 #異常 Queue.Empty #當調用非阻塞的get()獲取空隊列的元素時, 引起異常 Queue.Full #當調用非阻塞的put()向滿隊列中添加元素時, 引起異常 # 生存者消費者模型 def produce(q): try: data = q.put(data, block=, timeout=) # 若block爲False且隊列已滿,則當即拋出Queue.Full # 若block爲True進程會阻塞timeout指定時間,直到隊列有空間,不然拋出Queue.Full except: def cosume(q): try: q.get(block=, timeout=) #與上同理 except: q = mp.Queue() pro = mp.Process(target=produce, args=(q, )) cos = mp.Process(target=cosume, args=(q, )) pro.start() cos.start() pro.join() cos.join() ############################################ # Pipe(管道) # 多進程之間的數據傳遞 def func1(pipe): while True: pipe.send(1) def func2(pipe): while True: pipe.recv() #若是管道內無消息可接受,則會阻塞 pipe = mp.Pipe(duplex=) #參數默認爲True即管道的兩邊都可收發 # 返回(conn1, conn2),當參數爲False時conn1只能收信息,conn2只能發消息 p1 = mp.Process(target=func1, args=(pipe[0], )) p2 = mp.Process(target=func2, args=(pipe[1], )) p1.start() p2.start() p1.join() p2.join()
新的併發池模塊concurrent.futures
再次封裝了併發操做,能夠用於量大但簡單併發操做。
且進程線程通用關鍵字換一下就行。多線程
from concurrent.futures import ThreadPoolExecutor import time def working(message): time.sleep(2) return message pool = ThreadPoolExecutor(max_workers=2) # 建立一個最大可容納2個task的線程池 worker1 = pool.submit(working, ("hello")) # 往線程池裏面加入一個task worker2 = pool.submit(working, ("world")) # 往線程池裏面加入一個task # submit 返回了一個future對象,即未完成的操做,咱們能夠經過調用函數來查看其狀態 worker1.done() # 判斷task1是否結束 worker1.result() # 查看task1返回的結果 worker2.result() # 查看task2返回的結果
import concurrent.futures items = list() # 任務對象 def do_task(item): # 處理函數 return item #################### submit ######################### with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: futures = {executor.submit(do_task, item): item for item in items} for future in concurrent.futures.as_completed(futures): item = futures[future] result = future.result() print(item, result) #################### map ######################### # map跟submit的區別在於submit是無序的,而map是有序的 with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: for item, result in zip(items, executor.map(do_task, items)): print(item, result) #################### wait ######################### # wait返回一個元組,包含已完成任務的集合和未完成任務的集合。 pool = ThreadPoolExecutor(5) futures = [] for item in items: futures.append(pool.submit(do_task, item)) concurrent.futures.wait(futures, timeout=None, return_when='FIRST_COMPLETED')
return_when
參數可選FIRST_COMPLETED
, FIRST_EXCEPTION
和ALL_COMPLETE
ALL_COMPLETE
會阻塞併發