day40學習整理-併發編程

2019/09/19 學習整理

併發編程

線程queue

queue隊列:使用import queue,用法與進程Queue同樣編程

1、先進先出

import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結果(先進先出):
first
second
third
'''

2、後進先出

import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
結果(後進先出):
third
second
first
'''

3、存儲數據時可設置優先級的隊列

class queue.PriorityQueue(maxsize=0)數組

import queue

q=queue.PriorityQueue()
#put進入一個元組,元組的第一個元素是優先級(一般是數字,也能夠是非數字之間的比較),數字越小優先級越高
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))

print(q.get())
print(q.get())
print(q.get())
'''
結果(數字越小優先級越高,優先級高的優先出隊):
(10, 'b')
(20, 'a')
(30, 'c')
'''

4、其餘用法

exception queue.Empty:在Queue對象爲空時調用非阻塞get()(或get_nowait())時引起異常。併發

exception queue.Full:在已滿的Queue對象上調用非阻塞put()(或put_nowait())時引起異常。app

Queue.qsize()異步

Queue.empty():若是爲空則返回Trueasync

Queue.full():若是已滿,則返回True異步編程

Queue.put(item, block=True, timeout=None):將項目放入隊列。若是可選的args塊爲true且timeout爲None(默認值),則在必要時阻塞,直到有空閒插槽可用。若是timeout是一個正數,它會阻止最多超時秒,若是在該時間內沒有可用的空閒槽,則會引起Full異常。不然(塊爲假),若是空閒插槽當即可用,則將項目放入隊列,不然引起徹底異常(在這種狀況下忽略超時)。函數

Queue.put_nowait(item):至關於put(item,False)。學習

Queue.get(block=True, timeout=None):從隊列中刪除並返回一個項目。若是可選的args塊爲true且timeout爲None(默認值),則在必要時阻止,直到某個項可用爲止。若是timeout是一個正數,它會阻止最多超時秒,若是在該時間內沒有可用的項,則會引起Empty異常。不然(塊爲假),若是一個項當即可用則返回一個項,不然引起Empty異常(在這種狀況下忽略超時)。

Queue.get_nowait():至關於get(False)。

提供了兩種方法來支持跟蹤守護進程消費者線程是否已徹底處理入隊任務。

Queue.task_done():表示之前排隊的任務已完成。由隊列使用者線程使用。對於用於獲取任務的每一個get(),對task_done()的後續調用會告知隊列該任務的處理已完成。

若是join()當前正在阻塞,則它將在全部項目都已處理後恢復(這意味着已爲每一個已放入隊列的項目收到task_done()調用)。

若是調用的次數超過隊列中放置的項目,則引起ValueError。

Queue.join(): block直到queue被消費完畢。

線程定時器

from threading import Timer,current_thread

def task(x):
    print('%s run....' %x)
    print(current_thread().name)

if __name__ == '__main__':
    t=Timer(3,task,args=(10,)) # 3s後執行該線程
    t.start()
    print('主')

進程池和線程池

Python標準模塊——concurrent.futures

concurrent.futures模塊提供了高度封裝的異步調用接口

ThreadPoolExecutor:線程池,提供異步調用

ProcessPoolExecutor:進程池,提供異步調用

1、ProcessPoolExecutor 與 ThreadPoolExecutor

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import currentThread
from multiprocessing import current_process
import time

def task(i):
    print(f'{currentThread().name} 在執行任務 {i}')
    # print(f'進程 {current_process().name} 在執行任務 {i}')
    time.sleep(1)
    return i**2

if __name__ == '__main__':
    pool = ThreadPoolExecutor(4) # 池子裏只有4個線程
    # pool = ProcessPoolExecutor(4) # 池子裏只有4個線程
    fu_list = []
    for i in range(20):
        # pool.submit(task,i) # task任務要作20次,4個線程負責作這個事
        future = pool.submit(task,i) # task任務要作20次,4個進程負責作這個事
        # print(future.result()) # 若是沒有結果一直等待拿到結果,致使了全部的任務都在串行
        fu_list.append(future)
    pool.shutdown() # 關閉了池的入口,會等待全部的任務執行完,結束阻塞.
    for fu in fu_list:
        print(fu.result())

2、回調函數

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import currentThread
from multiprocessing import current_process
import time

def task(i):
    print(f'{currentThread().name} 在執行任務 {i}')
    # print(f'進程 {current_process().name} 在執行任務 {i}')
    time.sleep(1)
    return i**2


def parse(future):
    # 處理拿到的結果
    print(future.result())



if __name__ == '__main__':
    pool = ThreadPoolExecutor(4) # 池子裏只有4個線程
    # pool = ProcessPoolExecutor(4) # 池子裏只有4個線程
    fu_list = []
    for i in range(20):
        # pool.submit(task,i) # task任務要作20次,4個線程負責作這個事
        future = pool.submit(task,i) # task任務要作20次,4個進程負責作這個事
        future.add_done_callback(parse)
        # 爲當前任務綁定了一個函數,在當前任務執行結束的時候會觸發這個函數,
        # 會把future對象做爲參數傳給函數
        # 這個稱之爲回調函數,處理完了回來就調用這個函數.


        # print(future.result()) # 若是沒有結果一直等待拿到結果,致使了全部的任務都在串行

    # pool.shutdown() # 關閉了池的入口,會等待全部的任務執行完,結束阻塞.
    # for fu in fu_list:
    #     print(fu.result())

協程

1、什麼是協程

協程:是單線程下的併發,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是協程:協程是一種用戶態的輕量級線程,即協程是由用戶程序本身控制調度的。

須要強調的是:

  1. python的線程屬於內核級別的,即由操做系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其餘線程運行)
  2. 單線程內開啓協程,一旦遇到io,就會從應用程序級別(而非操做系統)控制切換,以此來提高效率(!!!非io操做的切換與效率無關)

對比操做系統控制線程的切換,用戶在單線程內控制協程的切換。

優勢以下:

  1. 協程的切換開銷更小,屬於程序級別的切換,操做系統徹底感知不到,於是更加輕量級
  2. 單線程內就能夠實現併發的效果,最大限度地利用cpu

缺點以下:

  1. 協程的本質是單線程下,沒法利用多核,能夠是一個程序開啓多個進程,每一個進程內開啓多個線程,每一個線程內開啓協程
  2. 協程指的是單個線程,於是一旦協程出現阻塞,將會阻塞整個線程

總結協程特色:

  1. 必須在只有一個單線程裏實現併發
  2. 修改共享數據不需加鎖
  3. 用戶程序裏本身保存多個控制流的上下文棧
  4. 附加:一個協程遇到IO操做自動切換到其它協程(如何實現檢測IO,yield、greenlet都沒法實現,就用到了gevent模塊(select機制))

用法介紹

g1=gevent.spawn(func,1,,2,3,x=4,y=5):建立一個協程對象g1,spawn括號內第一個參數是函數名,如eat,後面能夠有多個參數,能夠是位置實參或關鍵字實參,都是傳給函數eat的

g2=gevent.spawn(func2)

g.join():等待g1結束

g2.join():等待g2結束

上述兩步合做一步:gevent.joinall([g1,g2])

g1.value:拿到func1的返回值

2、協程操做-gevent模塊

Gevent 是一個第三方庫,能夠輕鬆經過gevent實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet,它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet所有運行在主程序操做系統進程的內部,但它們被協做式地調度。

gevent的同步與異步

from gevent import spawn,joinall,monkey;monkey.patch_all()

import time
def task(pid):
    """
    Some non-deterministic task
    """
    time.sleep(0.5)
    print('Task %s done' % pid)


def synchronous():  # 同步
    for i in range(10):
        task(i)

def asynchronous(): # 異步
    g_l=[spawn(task,i) for i in range(10)]
    joinall(g_l)
    print('DONE')
    
if __name__ == '__main__':
    print('Synchronous:')
    synchronous()
    print('Asynchronous:')
    asynchronous()
#  上面程序的重要部分是將task函數封裝到Greenlet內部線程的gevent.spawn。
#  初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall 函數,
#  後者阻塞當前流程,並執行全部給定的greenlet任務。執行流程只會在 全部greenlet執行完後纔會繼續向下走。
from gevent import monkey;monkey.patch_all()
# gevent識別不了 time.sleep等io操做 須要打monkey補丁才能識別

import gevent
import time
def eat():
    print('eat food 1')
    time.sleep(2)
    print('eat food 2')

def play():
    print('play 1')
    time.sleep(1)
    print('play 2')

g1=gevent.spawn(eat)
g2=gevent.spawn(play)
gevent.joinall([g1,g2])

print('主')
相關文章
相關標籤/搜索