目錄python
queue隊列:使用import queue
,用法與進程Queue同樣編程
import queue q=queue.Queue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 結果(先進先出): first second third '''
import queue q=queue.LifoQueue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 結果(後進先出): third second first '''
class queue.PriorityQueue(maxsize=0)數組
import queue q=queue.PriorityQueue() #put進入一個元組,元組的第一個元素是優先級(一般是數字,也能夠是非數字之間的比較),數字越小優先級越高 q.put((20,'a')) q.put((10,'b')) q.put((30,'c')) print(q.get()) print(q.get()) print(q.get()) ''' 結果(數字越小優先級越高,優先級高的優先出隊): (10, 'b') (20, 'a') (30, 'c') '''
exception queue.Empty:在Queue對象爲空時調用非阻塞get()(或get_nowait())時引起異常。併發
exception queue.Full:在已滿的Queue對象上調用非阻塞put()(或put_nowait())時引起異常。app
Queue.qsize()異步
Queue.empty():若是爲空則返回Trueasync
Queue.full():若是已滿,則返回True異步編程
Queue.put(item, block=True, timeout=None):將項目放入隊列。若是可選的args塊爲true且timeout爲None(默認值),則在必要時阻塞,直到有空閒插槽可用。若是timeout是一個正數,它會阻止最多超時秒,若是在該時間內沒有可用的空閒槽,則會引起Full異常。不然(塊爲假),若是空閒插槽當即可用,則將項目放入隊列,不然引起徹底異常(在這種狀況下忽略超時)。函數
Queue.put_nowait(item):至關於put(item,False)。學習
Queue.get(block=True, timeout=None):從隊列中刪除並返回一個項目。若是可選的args塊爲true且timeout爲None(默認值),則在必要時阻止,直到某個項可用爲止。若是timeout是一個正數,它會阻止最多超時秒,若是在該時間內沒有可用的項,則會引起Empty異常。不然(塊爲假),若是一個項當即可用則返回一個項,不然引起Empty異常(在這種狀況下忽略超時)。
Queue.get_nowait():至關於get(False)。
提供了兩種方法來支持跟蹤守護進程消費者線程是否已徹底處理入隊任務。
Queue.task_done():表示之前排隊的任務已完成。由隊列使用者線程使用。對於用於獲取任務的每一個get(),對task_done()的後續調用會告知隊列該任務的處理已完成。
若是join()當前正在阻塞,則它將在全部項目都已處理後恢復(這意味着已爲每一個已放入隊列的項目收到task_done()調用)。
若是調用的次數超過隊列中放置的項目,則引起ValueError。
Queue.join(): block直到queue被消費完畢。
from threading import Timer,current_thread def task(x): print('%s run....' %x) print(current_thread().name) if __name__ == '__main__': t=Timer(3,task,args=(10,)) # 3s後執行該線程 t.start() print('主')
concurrent.futures模塊提供了高度封裝的異步調用接口
ThreadPoolExecutor:線程池,提供異步調用
ProcessPoolExecutor:進程池,提供異步調用
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor from threading import currentThread from multiprocessing import current_process import time def task(i): print(f'{currentThread().name} 在執行任務 {i}') # print(f'進程 {current_process().name} 在執行任務 {i}') time.sleep(1) return i**2 if __name__ == '__main__': pool = ThreadPoolExecutor(4) # 池子裏只有4個線程 # pool = ProcessPoolExecutor(4) # 池子裏只有4個線程 fu_list = [] for i in range(20): # pool.submit(task,i) # task任務要作20次,4個線程負責作這個事 future = pool.submit(task,i) # task任務要作20次,4個進程負責作這個事 # print(future.result()) # 若是沒有結果一直等待拿到結果,致使了全部的任務都在串行 fu_list.append(future) pool.shutdown() # 關閉了池的入口,會等待全部的任務執行完,結束阻塞. for fu in fu_list: print(fu.result())
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor from threading import currentThread from multiprocessing import current_process import time def task(i): print(f'{currentThread().name} 在執行任務 {i}') # print(f'進程 {current_process().name} 在執行任務 {i}') time.sleep(1) return i**2 def parse(future): # 處理拿到的結果 print(future.result()) if __name__ == '__main__': pool = ThreadPoolExecutor(4) # 池子裏只有4個線程 # pool = ProcessPoolExecutor(4) # 池子裏只有4個線程 fu_list = [] for i in range(20): # pool.submit(task,i) # task任務要作20次,4個線程負責作這個事 future = pool.submit(task,i) # task任務要作20次,4個進程負責作這個事 future.add_done_callback(parse) # 爲當前任務綁定了一個函數,在當前任務執行結束的時候會觸發這個函數, # 會把future對象做爲參數傳給函數 # 這個稱之爲回調函數,處理完了回來就調用這個函數. # print(future.result()) # 若是沒有結果一直等待拿到結果,致使了全部的任務都在串行 # pool.shutdown() # 關閉了池的入口,會等待全部的任務執行完,結束阻塞. # for fu in fu_list: # print(fu.result())
協程:是單線程下的併發,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是協程:協程是一種用戶態的輕量級線程,即協程是由用戶程序本身控制調度的。
須要強調的是:
對比操做系統控制線程的切換,用戶在單線程內控制協程的切換。
優勢以下:
缺點以下:
總結協程特色:
g1=gevent.spawn(func,1,,2,3,x=4,y=5):建立一個協程對象g1,spawn括號內第一個參數是函數名,如eat,後面能夠有多個參數,能夠是位置實參或關鍵字實參,都是傳給函數eat的
g2=gevent.spawn(func2)
g.join():等待g1結束
g2.join():等待g2結束
上述兩步合做一步:gevent.joinall([g1,g2])
g1.value:拿到func1的返回值
Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet,它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。
from gevent import spawn,joinall,monkey;monkey.patch_all() import time def task(pid): """ Some non-deterministic task """ time.sleep(0.5) print('Task %s done' % pid) def synchronous(): # 同步 for i in range(10): task(i) def asynchronous(): # 異步 g_l=[spawn(task,i) for i in range(10)] joinall(g_l) print('DONE') if __name__ == '__main__': print('Synchronous:') synchronous() print('Asynchronous:') asynchronous() # 上面程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn。 # 初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall 函數, # 後者阻塞當前流程,並執行全部給定的greenlet任務。執行流程只會在 全部greenlet執行完後纔會繼續向下走。
from gevent import monkey;monkey.patch_all() # gevent識別不了 time.sleep等io操做 須要打monkey補丁才能識別 import gevent import time def eat(): print('eat food 1') time.sleep(2) print('eat food 2') def play(): print('play 1') time.sleep(1) print('play 2') g1=gevent.spawn(eat) g2=gevent.spawn(play) gevent.joinall([g1,g2]) print('主')