Python中的併發

Python併發

併發三種層次

我的理解,併發是計算機在邏輯上能處理多任務的能力。通常分類三種類型:編程

  1. 異步,異步本質上是單線程的,由於 IO 操做在不少時候會存在阻塞,異步就是在這種阻塞的時候,經過控制權的交換來實現多任務的。即異步本質上是運行過程當中的控制權的交換。最典型的例子就是生產者消費者模型。
    • 異步這個概念在不一樣的地方有不一樣的說法,好比 python 裏面叫作協程,內部經過生成器來實現控制權的交換。可是不管怎麼稱呼,異步這種併發方式都脫離不了控制權的交換這麼一個事實。
  2. 多進程,進程是一個程序具體的實例,擁有本身獨立的內存單元。
  3. 多線程,線程依附於進程,共享存儲空間。
    • 因爲 Python 官方的解釋器 Cython 對多線程有一個全局的鎖(GIL),因此 Cython 中的線程侷限性會比較大。這裏很少解釋。

這裏還有一個概念須要注意,在使用併發的時候弄清楚須要併發的任務是計算密集仍是IO密集
由於異步對於計算密集的任務是無效的。由於異步的本質是 IO 操做過程當中阻塞時的控制權交換。在計算密集的任務中是沒有這樣的阻塞的。segmentfault

協程

前面說了異步的本質是控制權的交換,這裏經過一個生產者消費者模型的例子來體會一下這麼個過程。網絡

生成者消費者

def consumer():         # 定義消費者,因爲有yeild關鍵詞,此消費者爲一個生成器
    print("[Consumer] Init Consumer ......")
    r = "init ok"       # 初始化返回結果,並在啓動消費者時,返回給生產者
    while True:
        n = yield r     # 消費者經過yield接收生產者的消息,同時返給其結果
        print("[Consumer] conusme n = %s, r = %s" % (n, r))
        r = "consume %s OK" % n     # 消費者消費結果,下個循環返回給生產者

def produce(c):         # 定義生產者,此時的 c 爲一個生成器
    print("[Producer] Init Producer ......")
    r = c.send(None)    # 啓動消費者生成器,同時第一次接收返回結果
    print("[Producer] Start Consumer, return %s" % r)
    n = 0
    while n < 5:
        n += 1
        print("[Producer] While, Producing %s ......" % n)
        r = c.send(n)   # 向消費者發送消息並準備接收結果。此時會切換到消費者執行
        print("[Producer] Consumer return: %s" % r)
    c.close()           # 關閉消費者生成器
    print("[Producer] Close Producer ......")

produce(consumer())

新關鍵字

# 異步IO例子:適配Python3.5,使用async和await關鍵字
async def hello(index):       # 經過關鍵字async定義協程
    print('Hello world! index=%s, thread=%s' % (index, threading.currentThread()))
    await asyncio.sleep(1)     # 模擬IO任務
    print('Hello again! index=%s, thread=%s' % (index, threading.currentThread()))

loop = asyncio.get_event_loop()     # 獲得一個事件循環模型
tasks = [hello(1), hello(2)]        # 初始化任務列表
loop.run_until_complete(asyncio.wait(tasks))    # 執行任務
loop.close()                        # 關閉事件循環列表

網絡io

async def get(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
print(url, resp.status)
print(url, await resp.text())

loop = asyncio.get_event_loop()     # 獲得一個事件循環模型
tasks = [                           # 初始化任務列表
    get("http://zhushou.360.cn/detail/index/soft_id/3283370"),
    get("http://zhushou.360.cn/detail/index/soft_id/3264775"),
    get("http://zhushou.360.cn/detail/index/soft_id/705490")
]
loop.run_until_complete(asyncio.wait(tasks))    # 執行任務
loop.close()                        # 關閉事件循環列表

線/進程

這裏以進程的multiprocessing模塊舉例,線程可使用multiprocessing.dummy,全部的API均相同。session

例子

import multiprocessing as mp

############## 直接實例化 ############

def func(number):
    result = number * 2

p = mp.Process(target=func, args=(3, )) #實例化進程對象
p.start() #運行進程

############ 類封裝 #############

class MyProcess(mp.Process):
    def __init__(self, interval):
        mp.Process.__init__(self)

    # 須要重載的函數
    def run(self):
        print('I'm running)

p = MyProcess(1)
p.start()

#################################

p.terminal() # 主動結束進程
p.join() #讓主進程等待子進程結束


# 一些經常使用的屬性
p.pid #得到進程的id號
p.name #得到進程名
p.is_alive() #判斷進程是否還存活
p.daemon = True #設置進程隨主進程一塊兒結束

mp.active_children() #得到當前進程的全部子進程
mp.current_process() #返回正在運行的進程
os.getpid() #得到當前進程的pid

線程池

from multiprocessing.dummy import Pool as ThreadPool 

tasks = list()

def do_task(item):
    return item

pool = ThreadPool(3)

################ 原始操做  #######################

for item in items:
    pool.apply_async(do_task, (item,)) #添加進程,非阻塞,返回執行結果
    pool.apply(do_task, (item,)) #阻塞

############## map操做  #####################3

results = pool.map(do_task, items)

################################

pool.close() #關閉進程池後不會有新的進程被建立
pool.join() #等到結束,必須在close後使用

進程通訊

# Lock(鎖)
# 限制對資源的訪問

def func(lock): #使用with
    with lock:
        print('I got lock')
def func(lock): #不使用with
    lock.acquire() #請求鎖
    try:
        print('I got lock')
    finally:
        lock.release() #釋放鎖

lock = mp.Lock() #申請鎖
p = mp.Process(target=func, args=(lock,))
p.start()

############################################

# Semaphore(信號量)
# 限制資源的最大鏈接數

def func(s):
    s.aquire() #請求鏈接
    s.release() #斷開鏈接

s = mp.Semaphore(2) #定義信號量的最大鏈接數
for i in range(5):
    p = mp.Process(target=func, arg=(s))
    p.start

############################################

# Event(事件)
# 進程間同步

def func(e):
    e.wait() #定義等待時間,默認等待到e.set()爲止,阻塞
    e.is_set() #判斷消息是否被髮出
    print('got')
    
e = mp.Event()
p = mp.Process(target=func, args=(e,))
p.start()
e.set() #發出消息

############################################

# Queue(隊列)
# 多進程之間的數據傳遞

import Queue

Queue.Queue(maxsize = 0)  # 先進先出, maxsize小於等於則不限大小
Queue.LifoQueue(maxsize = 0)  # 後進先出
Queue.PriorityQueue(maxsize = 0)  # 構造一個優先級隊列

#異常
Queue.Empty  #當調用非阻塞的get()獲取空隊列的元素時, 引起異常
Queue.Full  #當調用非阻塞的put()向滿隊列中添加元素時, 引起異常

# 生存者消費者模型

def produce(q):
    try:
        data = q.put(data, block=, timeout=) 
        # 若block爲False且隊列已滿,則當即拋出Queue.Full
        # 若block爲True進程會阻塞timeout指定時間,直到隊列有空間,不然拋出Queue.Full
    except:

def cosume(q):
    try:
        q.get(block=, timeout=) #與上同理
    except:

q = mp.Queue()
pro = mp.Process(target=produce, args=(q, ))
cos = mp.Process(target=cosume, args=(q, ))
pro.start()
cos.start()
pro.join()
cos.join()

############################################

# Pipe(管道)
# 多進程之間的數據傳遞

def func1(pipe):
    while True:
        pipe.send(1)
def func2(pipe):
    while True:
        pipe.recv() #若是管道內無消息可接受,則會阻塞
pipe = mp.Pipe(duplex=) #參數默認爲True即管道的兩邊都可收發
# 返回(conn1, conn2),當參數爲False時conn1只能收信息,conn2只能發消息
p1 = mp.Process(target=func1, args=(pipe[0], ))
p2 = mp.Process(target=func2, args=(pipe[1], ))
p1.start()
p2.start()
p1.join()
p2.join()

併發池

新的併發池模塊concurrent.futures再次封裝了併發操做,能夠用於量大但簡單併發操做。
進程線程通用關鍵字換一下就行。多線程

future對象

from concurrent.futures import ThreadPoolExecutor
import time

def working(message):
    time.sleep(2)
    return message

pool = ThreadPoolExecutor(max_workers=2)  # 建立一個最大可容納2個task的線程池

worker1 = pool.submit(working, ("hello"))  # 往線程池裏面加入一個task
worker2 = pool.submit(working, ("world"))  # 往線程池裏面加入一個task

# submit 返回了一個future對象,即未完成的操做,咱們能夠經過調用函數來查看其狀態

worker1.done()  # 判斷task1是否結束

worker1.result()  # 查看task1返回的結果
worker2.result()  # 查看task2返回的結果

executor對象

import concurrent.futures

items = list() # 任務對象

def do_task(item): # 處理函數
    return item

#################### submit #########################

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    futures = {executor.submit(do_task, item): item for item in items}
    for future in concurrent.futures.as_completed(futures):
        item = futures[future]
        result = future.result()
        print(item, result)

#################### map  #########################

# map跟submit的區別在於submit是無序的,而map是有序的

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    for item, result in zip(items, executor.map(do_task, items)):
            print(item, result)

#################### wait  #########################

# wait返回一個元組,包含已完成任務的集合和未完成任務的集合。

pool = ThreadPoolExecutor(5)
futures = []
for item in items:
    futures.append(pool.submit(do_task, item))

concurrent.futures.wait(futures, timeout=None, return_when='FIRST_COMPLETED')

return_when參數可選FIRST_COMPLETED, FIRST_EXCEPTIONALL_COMPLETE
ALL_COMPLETE 會阻塞併發

參考

Python 並行任務技巧
Python併發編程之線程池/進程池app

相關文章
相關標籤/搜索