Python線程池 ThreadPoolExecutor 的用法及實戰

✨ 前言

從Python3.2開始,標準庫爲咱們提供了 concurrent.futures 模塊,它提供了 ThreadPoolExecutor (線程池)和ProcessPoolExecutor (進程池)兩個類。python

相比 threading 等模塊,該模塊經過 submit 返回的是一個 future 對象,它是一個將來可期的對象,經過它能夠獲悉線程的狀態主線程(或進程)中能夠獲取某一個線程(進程)執行的狀態或者某一個任務執行的狀態及返回值:json

  1. 主線程能夠獲取某一個線程(或者任務的)的狀態,以及返回值。
  2. 當一個線程完成的時候,主線程可以當即知道。
  3. 讓多線程和多進程的編碼接口一致。

✨ 線程池的基本使用

# coding: utf-8
from concurrent.futures import ThreadPoolExecutor
import time


def spider(page):
    time.sleep(page)
    print(f"crawl task{page} finished")
    return page

with ThreadPoolExecutor(max_workers=5) as t:  # 建立一個最大容納數量爲5的線程池
    task1 = t.submit(spider, 1)
    task2 = t.submit(spider, 2)  # 經過submit提交執行的函數到線程池中
    task3 = t.submit(spider, 3)

    print(f"task1: {task1.done()}")  # 經過done來判斷線程是否完成
    print(f"task2: {task2.done()}")
    print(f"task3: {task3.done()}")

    time.sleep(2.5)
    print(f"task1: {task1.done()}")
    print(f"task2: {task2.done()}")
    print(f"task3: {task3.done()}")
    print(task1.result())  # 經過result來獲取返回值
複製代碼
執行結果以下:
task1: False
task2: False
task3: False
crawl task1 finished
crawl task2 finished
task1: True
task2: True
task3: False
1
crawl task3 finished
複製代碼
  1. 使用 with 語句 ,經過 ThreadPoolExecutor 構造實例,同時傳入 max_workers 參數來設置線程池中最多能同時運行的線程數目。微信

  2. 使用 submit 函數來提交線程須要執行的任務到線程池中,並返回該任務的句柄(相似於文件、畫圖),注意 submit() 不是阻塞的,而是當即返回。多線程

  3. 經過使用 done() 方法判斷該任務是否結束。上面的例子能夠看出,提交任務後當即判斷任務狀態,顯示四個任務都未完成。在延時2.5後,task1 和 task2 執行完畢,task3 仍在執行中。app

  4. 使用 result() 方法能夠獲取任務的返回值。echarts

✨ 主要方法:

wait

wait(fs, timeout=None, return_when=ALL_COMPLETED)
複製代碼

wait 接受三個參數: fs: 表示須要執行的序列 timeout: 等待的最大時間,若是超過這個時間即便線程未執行完成也將返回 return_when:表示wait返回結果的條件,默認爲 ALL_COMPLETED 所有執行完成再返回ide

仍是用上面那個例子來熟悉用法 示例:函數

from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED, ALL_COMPLETED
import time

def spider(page):
    time.sleep(page)
    print(f"crawl task{page} finished")
    return page

with ThreadPoolExecutor(max_workers=5) as t: 
    all_task = [t.submit(spider, page) for page in range(1, 5)]
    wait(all_task, return_when=FIRST_COMPLETED)
    print('finished')
    print(wait(all_task, timeout=2.5))

# 運行結果
crawl task1 finished
finished
crawl task2 finished
crawl task3 finished
DoneAndNotDoneFutures(done={<Future at 0x28c8710 state=finished returned int>, <Future at 0x2c2bfd0 state=finished returned int>, <Future at 0x2c1b7f0 state=finished returned int>}, not_done={<Future at 0x2c3a240 state=running>})
crawl task4 finished
複製代碼
  1. 代碼中返回的條件是:當完成第一個任務的時候,就中止等待,繼續主線程任務
  2. 因爲設置了延時, 能夠看到最後只有 task4 還在運行中

as_completed

上面雖然提供了判斷任務是否結束的方法,可是不能在主線程中一直判斷啊。最好的方法是當某個任務結束了,就給主線程返回結果,而不是一直判斷每一個任務是否結束。post

ThreadPoolExecutorThreadPoolExecutor 中 的 as_completed() 就是這樣一個方法,當子線程中的任務執行完後,直接用 result() 獲取返回結果學習

用法以下:

# coding: utf-8
from concurrent.futures import ThreadPoolExecutor, as_completed
import time


def spider(page):
    time.sleep(page)
    print(f"crawl task{page} finished")
    return page

def main():
    with ThreadPoolExecutor(max_workers=5) as t:
        obj_list = []
        for page in range(1, 5):
            obj = t.submit(spider, page)
            obj_list.append(obj)

        for future in as_completed(obj_list):
            data = future.result()
            print(f"main: {data}")

# 執行結果
crawl task1 finished
main: 1
crawl task2 finished
main: 2
crawl task3 finished
main: 3
crawl task4 finished
main: 4
複製代碼

as_completed() 方法是一個生成器,在沒有任務完成的時候,會一直阻塞,除非設置了 timeout。

當有某個任務完成的時候,會 yield 這個任務,就能執行 for 循環下面的語句,而後繼續阻塞住,循環到全部的任務結束。同時,先完成的任務會先返回給主線程。

map

map(fn, *iterables, timeout=None)
複製代碼

fn: 第一個參數 fn 是須要線程執行的函數; iterables:第二個參數接受一個可迭代對象; timeout: 第三個參數 timeout 跟 wait() 的 timeout 同樣,但因爲 map 是返回線程執行的結果,若是 timeout小於線程執行時間會拋異常 TimeoutError。

用法以下:

import time
from concurrent.futures import ThreadPoolExecutor

def spider(page):
    time.sleep(page)
    return page

start = time.time()
executor = ThreadPoolExecutor(max_workers=4)

i = 1
for result in executor.map(spider, [2, 3, 1, 4]):
    print("task{}:{}".format(i, result))
    i += 1

# 運行結果
task1:2
task2:3
task3:1
task4:4
複製代碼

使用 map 方法,無需提早使用 submit 方法,map 方法與 python 高階函數 map 的含義相同,都是將序列中的每一個元素都執行同一個函數。

上面的代碼對列表中的每一個元素都執行 spider() 函數,並分配各線程池。

能夠看到執行結果與上面的 as_completed() 方法的結果不一樣,輸出順序和列表的順序相同,就算 1s 的任務先執行完成,也會先打印前面提交的任務返回的結果。

✨ 實戰

以某網站爲例,演示線程池和單線程兩種方式爬取的差別

# coding: utf-8
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import json
from requests import adapters

from proxy import get_proxies

headers = {
    "Host": "splcgk.court.gov.cn",
    "Origin": "https://splcgk.court.gov.cn",
    "User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36",
    "Referer": "https://splcgk.court.gov.cn/gzfwww/ktgg",
}
url = "https://splcgk.court.gov.cn/gzfwww/ktgglist?pageNo=1"

def spider(page):
    data = {
        "bt": "",
        "fydw": "",
        "pageNum": page,
    }
    for _ in range(5):
        try:
            response = requests.post(url, headers=headers, data=data, proxies=get_proxies())
            json_data = response.json()
        except (json.JSONDecodeError, adapters.SSLError):
            continue
        else:
            break
    else:
        return {}

    return json_data

def main():
    with ThreadPoolExecutor(max_workers=8) as t:
        obj_list = []
        begin = time.time()
        for page in range(1, 15):
            obj = t.submit(spider, page)
            obj_list.append(obj)

        for future in as_completed(obj_list):
            data = future.result()
            print(data)
            print('*' * 50)
        times = time.time() - begin
        print(times)

if __name__ == "__main__":
    main()
複製代碼

運行結果以下:

多線程

能夠看到,14 頁只花了 2 秒鐘就爬完了

下面咱們可使用單線程來爬取,代碼基本和上面的同樣,加個單線程函數 代碼以下:

def single():
    begin = time.time()
    for page in range(1, 15):
        data = spider(page)
        print(data)
        print('*' * 50)

    times = time.time() - begin
    print(times)


if __name__ == "__main__":
    single()
複製代碼

運行結果:

單線程

能夠看到,總共花了 19 秒。真是肉眼可見的差距啊!若是數據量大的話,運行時間差距會更大!

下面是本人的公衆號,裏面有不少志同道合的朋友,歡迎關注學習加羣

歡迎關注

推薦閱讀

Python爬蟲之JS逆向入門篇

用python一鍵生成你的微信好友頭像牆

pyecharts可視化和微信的結合

python數據可視化神器--pyecharts 快速入門

相關文章
相關標籤/搜索