122 python程序中的線程操做-concurrent模塊

1、concurrent模塊的介紹

concurrent.futures模塊提供了高度封裝的異步調用接口python

ThreadPoolExecutor:線程池,提供異步調用編程

ProcessPoolExecutor:進程池,提供異步調用服務器

ProcessPoolExecutor 和 ThreadPoolExecutor:二者都實現相同的接口,該接口由抽象Executor類定義。併發

2、基本方法

submit(fn, *args, **kwargs):異步提交任務app

map(func, *iterables, timeout=None, chunksize=1):取代for循環submit的操做dom

shutdown(wait=True):至關於進程池的pool.close()+pool.join()操做異步

  • wait=True,等待池內全部任務執行完畢回收完資源後才繼續
  • wait=False,當即返回,並不會等待池內的任務執行完畢
  • 但無論wait參數爲什麼值,整個程序都會等到全部任務執行完畢
  • submit和map必須在shutdown以前

result(timeout=None):取得結果函數

add_done_callback(fn):回調函數操作系統

3、進程池和線程池

池的功能:限制進程數或線程數.線程

何時限制: 當併發的任務數量遠遠大於計算機所能承受的範圍,即沒法一次性開啓過多的任務數量 我就應該考慮去限制我進程數或線程數,從保證服務器不崩.

3.1 進程池

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Process,current_process
import time



def task(i):
    print(f'{current_process().name} 在執行任務{i}')
    time.sleep(1)


if __name__ == '__main__':
    pool = ProcessPoolExecutor(4) # 進程池裏又4個進程
    for i in range(20): # 20個任務
        pool.submit(task,i)# 進程池裏當前執行的任務i,池子裏的4個進程一次一次執行任務

3.2 線程池

from concurrent.futures import ThreadPoolExecutor
from threading import Thread,currentThread
import time


def task(i):
    print(f'{currentThread().name} 在執行任務{i}')
    time.sleep(1)

if __name__ == '__main__':
    pool = ThreadPoolExecutor(4) # 進程池裏又4個線程
    for i in range(20): # 20個任務
        pool.submit(task,i)# 線程池裏當前執行的任務i,池子裏的4個線程一次一次執行任務

4、Map的用法

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ThreadPoolExecutor(max_workers=3)

    # for i in range(20):
    #     future=executor.submit(task,i)

    executor.map(task,range(1,21)) #map取代了for+submit

5、同步和異步

理解爲提交任務的兩種方式

同步: 提交了一個任務,必須等任務執行完了(拿到返回值),才能執行下一行代碼

異步: 提交了一個任務,不要等執行完了,能夠直接執行下一行代碼.

同步:至關於執行任務的串行執行

異步

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Process,current_process
import time

n = 1

def task(i):
    global n
    print(f'{current_process().name} 在執行任務{i}')
    time.sleep(1)
    n += i
    return n

if __name__ == '__main__':
    pool = ProcessPoolExecutor(4) # 進程池裏又4個線程
    pool_lis = []
    for i in range(20): # 20個任務
        future = pool.submit(task,i)# 進程池裏當前執行的任務i,池子裏的4個線程一次一次執行任務
        # print(future.result()) # 這是在等待我執行任務獲得的結果,若是一直沒有結果,這裏會致使咱們全部任務編程了串行
                               # 在這裏就引出了下面的pool.shutdown()方法
        pool_lis.append(future)
    pool.shutdown(wait=True) # 關閉了池的入口,不容許在往裏面添加任務了,會等帶全部的任務執行完,結束阻塞
    for p in pool_lis:
        print(p.result())


    print(n)# 這裏一開始確定是拿到0的,由於我只是去告訴操做系統執行子進程的任務,代碼依然會繼續往下執行
    # 能夠用join去解決,等待每個進程結束後,拿到他的結果

6、回調函數

import time
from threading import Thread,currentThread
from concurrent.futures import ThreadPoolExecutor

def task(i):
    print(f'{currentThread().name} 在執行{i}')
    time.sleep(1)
    return i**2

# parse 就是一個回調函數
def parse(future):
    # 處理拿到的結果
    print(f'{currentThread().name} 結束了當前任務')
    print(future.result())


if __name__ == '__main__':
    pool = ThreadPoolExecutor(4)
    for i in range(20):
        future = pool.submit(task,i)

        '''
        給當前執行的任務綁定了一個函數,在當前任務結束的時候就會觸發這個函數(稱之爲回調函數)
        會把future對象做爲參數傳給函數
        注:這個稱爲回調函數,當前任務處理結束了,就回來調parse這個函數
        '''
        future.add_done_callback(parse)
        # add_done_callback (parse) parse是一個回調函數
        # add_done_callback () 是對象的一個綁定方法,他的參數就是一個函數
相關文章
相關標籤/搜索