Python多線程、多進程和協程的實例講解

線程、進程和協程是什麼

線程、進程和協程的詳細概念解釋和原理剖析不是本文的重點,本文重點講述在Python中怎樣實際使用這三種東西python

參考: 進程、線程、協程之概念理解shell

進程(Process)是計算機中的程序關於某數據集合上的一次運行活動,是系統進行資源分配和調度的基本單位,是操做系統結構的基礎。
線程,有時被稱爲輕量級進程(Lightweight Process,LWP),是程序執行流的最小單元。
協程:一個程序能夠包含多個協程,能夠對比於一個進程包含多個線程,於是下面咱們來比較協程和線程:咱們知道多個線程相對獨立,有本身的上下文,切換受系統控制;而協程也相對獨立,有本身的上下文,可是其切換由本身控制,由當前協程切換到其餘協程由當前協程來控制。編程

準備工做

磨刀不誤砍柴工,在用實例講解線程、進程和協程怎麼使用以前,先準備一些工具:session

實際上多線程、多進程和協程,都屬於併發編程,併發編程的最重要的目標就是提升程序運行的效率,那麼咱們須要一個計算一個函數耗時長度的工具,用於對比不一樣方式程序的運行時間,這裏咱們寫一個函數計時裝飾器fn_timer來完成這件事:多線程

def fn_timer(function):
    '''
    函數計時裝飾器
    :param function: 函數對象
    :return: 裝飾器
    '''
    @wraps(function)
    def function_timer(*args,**kwargs):
        # 起始時間
        t0 = time.time()
        # 調用函數
        result = function(*args,**kwargs)
        # 結束時間
        t1 = time.time()
        # 打印函數耗時
        print '[finished function:{func_name} in {time:.2f}s]'.format(func_name = function.__name__,time = t1 - t0)
        return result
    return function_timer

該裝飾器的用法示例:併發

# 測試
@fn_timer
def add(x,y):
    time.sleep(1.22)
    return x + y

if __name__ == '__main__':
    # 測試
    sum = add(1,2)
    print sum

運行代碼輸出:app

[finished function:test in 1.23s]
3

實際使用中,大規模爬蟲程序很適合用併發來實現,因此咱們再準備一些網址放在urls列表中,用於測試爬蟲程序的效率(都是百度百科的一些詞條頁面):dom

# 20個網頁
urls = ['https://baike.baidu.com/item/%E8%87%AA%E7%94%B1%E8%BD%AF%E4%BB%B6',
        'https://baike.baidu.com/item/%E8%AE%A1%E7%AE%97%E6%9C%BA%E7%A8%8B%E5%BA%8F%E8%AE%BE%E8%AE%A1%E8%AF%AD%E8%A8%80',
        'https://baike.baidu.com/item/%E5%9F%BA%E9%87%91%E4%BC%9A',
        'https://baike.baidu.com/item/%E5%88%9B%E6%96%B02.0',
        'https://baike.baidu.com/item/%E5%95%86%E4%B8%9A%E8%BD%AF%E4%BB%B6',
        'https://baike.baidu.com/item/%E5%BC%80%E6%94%BE%E6%BA%90%E4%BB%A3%E7%A0%81',
        'https://baike.baidu.com/item/OpenBSD',
        'https://baike.baidu.com/item/%E8%A7%A3%E9%87%8A%E5%99%A8',
        'https://baike.baidu.com/item/%E7%A8%8B%E5%BA%8F/71525',
        'https://baike.baidu.com/item/%E7%BC%96%E7%A8%8B%E8%AF%AD%E8%A8%80',
        'https://baike.baidu.com/item/C%2B%2B',
        'https://baike.baidu.com/item/%E8%B7%A8%E5%B9%B3%E5%8F%B0',
        'https://baike.baidu.com/item/Web/150564',
        'https://baike.baidu.com/item/%E7%88%B1%E5%A5%BD%E8%80%85',
        'https://baike.baidu.com/item/%E6%95%99%E5%AD%A6',
        'https://baike.baidu.com/item/Unix%20shell',
        'https://baike.baidu.com/item/TIOBE',
        'https://baike.baidu.com/item/%E8%AF%BE%E7%A8%8B',
        'https://baike.baidu.com/item/MATLAB',
        'https://baike.baidu.com/item/Perl']

整合:把函數計時裝飾器和urls列表封裝在一個類:utils.py中:異步

# coding:utf-8
from functools import wraps
import time

# 20個網頁
urls = ['https://baike.baidu.com/item/%E8%87%AA%E7%94%B1%E8%BD%AF%E4%BB%B6',
        'https://baike.baidu.com/item/%E8%AE%A1%E7%AE%97%E6%9C%BA%E7%A8%8B%E5%BA%8F%E8%AE%BE%E8%AE%A1%E8%AF%AD%E8%A8%80',
        'https://baike.baidu.com/item/%E5%9F%BA%E9%87%91%E4%BC%9A',
        'https://baike.baidu.com/item/%E5%88%9B%E6%96%B02.0',
        'https://baike.baidu.com/item/%E5%95%86%E4%B8%9A%E8%BD%AF%E4%BB%B6',
        'https://baike.baidu.com/item/%E5%BC%80%E6%94%BE%E6%BA%90%E4%BB%A3%E7%A0%81',
        'https://baike.baidu.com/item/OpenBSD',
        'https://baike.baidu.com/item/%E8%A7%A3%E9%87%8A%E5%99%A8',
        'https://baike.baidu.com/item/%E7%A8%8B%E5%BA%8F/71525',
        'https://baike.baidu.com/item/%E7%BC%96%E7%A8%8B%E8%AF%AD%E8%A8%80',
        'https://baike.baidu.com/item/C%2B%2B',
        'https://baike.baidu.com/item/%E8%B7%A8%E5%B9%B3%E5%8F%B0',
        'https://baike.baidu.com/item/Web/150564',
        'https://baike.baidu.com/item/%E7%88%B1%E5%A5%BD%E8%80%85',
        'https://baike.baidu.com/item/%E6%95%99%E5%AD%A6',
        'https://baike.baidu.com/item/Unix%20shell',
        'https://baike.baidu.com/item/TIOBE',
        'https://baike.baidu.com/item/%E8%AF%BE%E7%A8%8B',
        'https://baike.baidu.com/item/MATLAB',
        'https://baike.baidu.com/item/Perl']


def fn_timer(function):
    '''
    函數計時裝飾器
    :param function: 函數對象
    :return: 裝飾器
    '''
    @wraps(function)
    def function_timer(*args,**kwargs):
        # 起始時間
        t0 = time.time()
        # 調用函數
        result = function(*args,**kwargs)
        # 結束時間
        t1 = time.time()
        # 打印函數耗時
        print '[finished function:{func_name} in {time:.2f}s]'.format(func_name = function.__name__,time = t1 - t0)
        return result
    return function_timer

# 測試
@fn_timer
def add(x,y):
    time.sleep(1.22)
    return x + y

if __name__ == '__main__':
    # 測試
    sum = add(1,2)
    print sum
    # 輸出:
    '''
    [finished function:test in 1.23s]
    3
    '''

實例講解多線程的用法

從聽音樂、看電影講起socket

我如今要作兩件事情:聽音樂、看電影,聽一首音樂假如耗時1秒,看一部電影假如耗時5秒,用兩個函數定義這兩個任務以下:

# 耗時任務:聽音樂
def music(name):
    print 'I am listening to music {0}'.format(name)
    time.sleep(1)

# 耗時任務:看電影
def movie(name):
    print 'I am watching movie {0}'.format(name)
    time.sleep(5)

假如我如今要聽10首音樂、看2部電影,那麼我就有以下幾種方案:

方案一:先一個個聽完10首音樂,再一個個看完2部電影,順序完成,代碼以下:

# 單線程操做:順序執行聽10首音樂,看2部電影
@fn_timer
def single_thread():
    for i in range(10):
        music(i)
    for i in range(2):
        movie(i)

讓咱們執行一下這段代碼,輸出以下:

    I am listening to music 0
    I am listening to music 1
    I am listening to music 2
    I am listening to music 3
    I am listening to music 4
    I am listening to music 5
    I am listening to music 6
    I am listening to music 7
    I am listening to music 8
    I am listening to music 9
    I am watching movie 0
    I am watching movie 1
    [finished function:single_thread in 20.14s]

能夠看到,老老實實嚴格按照前後順序來一件件作這些事情,所需的總時間和每件事情耗時加起來是同樣多的。

方案二:剛剛的方案不太好,太費時間了,那麼能不能同時進行一些事情呢?答案是能夠的,能夠同時聽多首音樂,同時看多部電影進行,代碼以下:

# 多線程執行:聽10首音樂,看2部電影
@fn_timer
def multi_thread():
    # 線程列表
    threads = []
    for i in range(10):
        # 建立一個線程,target參數爲任務處理函數,args爲任務處理函數所需的參數元組
        threads.append(threading.Thread(target = music,args = (i,)))
    for i in range(2):
        threads.append(threading.Thread(target = movie,args = (i,)))

    for t in threads:
        # 設爲守護線程
        t.setDaemon(True)
        # 開始線程
        t.start()
    for t in threads:
        t.join()

執行上述代碼,運行結果:

    I am listening to music 0
    I am listening to music 1
    I am listening to music 2
    I am listening to music 3
    I am listening to music 4
    I am listening to music 5
    I am listening to music 6
    I am listening to music 7
    I am listening to music 8
    I am listening to music 9
    I am watching movie 0
    I am watching movie 1
    [finished function:multi_thread in 5.02s]

此次只用了5秒就完成了,完成效率顯著提高。此次試用多線程執行多個任務,全部任務最終的總耗時 = 耗時最長的那個單個任務的耗時,即看一部電影的5秒鐘時間。

方案三:使用線程池。上面使用多線程的方式比較繁瑣,下面使用線程池來實現:

# 使用線程池執行:聽10首音樂,看2部電影
@fn_timer
def use_pool():
    # 設置線程池大小爲20,若是不設置,默認值是CPU核心數
    pool = Pool(20)
    pool.map(movie,range(2))
    pool.map(music,range(10))
    pool.close()
    pool.join()

執行結果:

    I am listening to music 0
    I am listening to music 1
    I am listening to music 2
    I am listening to music 3
    I am listening to music 4
    I am listening to music 5
    I am listening to music 6
    I am listening to music 7
    I am listening to music 8
    I am listening to music 9
    I am watching movie 0
    I am watching movie 1
    [finished function:use_pool in 6.12s]

能夠看出使用線程池反而比手工調度線程多耗時一秒鐘,多是由於線程池內部對線程的調度和線程切換的耗時形成的。

實例:使用多線程下載網頁

話很少說,直接上代碼,用多線程併發下載20個百度百科網頁的實例代碼及運行結果以下:

# coding:utf-8
# 測試多線程
import threading
import time
from utils import fn_timer
from multiprocessing.dummy import Pool
import requests
from utils import urls

# 應用:使用單線程下載多個網頁的內容
@fn_timer
def download_using_single_thread(urls):
    resps = []
    for url in urls:
        resp = requests.get(url)
        resps.append(resp)
    return resps

# 應用:使用多線程下載多個網頁的內容
@fn_timer
def download_using_multi_thread(urls):
    threads = []
    for url in urls:
        threads.append(threading.Thread(target = requests.get,args = (url,)))
    for t in threads:
        t.setDaemon(True)
        t.start()
    for t in threads:
        t.join()

# 應用:使用線程池下載多個網頁的內容
@fn_timer
def download_using_pool(urls):
    pool = Pool(20)
    # 第一個參數爲函數名,第二個參數一個可迭代對象,爲函數所需的參數列表
    resps = pool.map(requests.get,urls)
    pool.close()
    pool.join()
    return resps

def main():
    # 1.使用單線程
    resps = download_using_single_thread(urls)
    print len(resps)
    # 輸出:
    '''
    [finished function:download_using_single_thread in 6.18s]
    20
    '''
    # 2. 使用多線程
    download_using_multi_thread(urls)
    # 輸出:
    '''
    [finished function:download_using_multi_thread in 0.73s]
    '''

    # 3.使用線程池
    resps = download_using_pool(urls)
    print len(resps)
    # 輸出:
    '''
    [finished function:download_using_pool in 0.84s]
    20
    '''

if __name__ == '__main__':
    main()

實例講解多進程的用法

多進程和進程池的使用

實例代碼以下:

# coding:utf-8
# 測試多進程
import os
import time
from multiprocessing import Process,Pool,Queue
from utils import fn_timer
import random

# 簡單的任務
@fn_timer
def do_simple_task(task_name):
    print 'Run child process {0}, task name is: {1}'.format(os.getpid(),task_name)
    time.sleep(1.2)
    return task_name

@fn_timer
# 1. 測試簡單的多進程
def test_simple_multi_process():
    p1 = Process(target=do_simple_task, args=('task1',))
    p2 = Process(target=do_simple_task, args=('task2',))
    print 'Process will start...'
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print 'Process end.'

@fn_timer
# 2. 測試使用進程池
def test_use_process_pool():
    # 建立一個進程池,數字表示一次性同時執行的最大子進程數
    pool = Pool(5)
    # 任務返回值列表
    results = []
    # 任務名稱列表
    task_names = []
    for i in range(7):
        task_names.append('task{0}'.format(i))
    # 併發執行多個任務,並獲取任務返回值
    results = pool.map_async(do_simple_task,task_names)
    print 'Many processes will start...'
    pool.close()
    pool.join()

    print 'All processes end, results is: {0}'.format(results.get())

def main():
    test_simple_multi_process()
    # 輸出:
    '''
    Process will start...
    Run child process 1524, task name is: task2
    Run child process 1728, task name is: task1
    [finished function:do_simple_task in 1.20s]
    [finished function:do_simple_task in 1.20s]
    Process end.
    [finished function:test_simple_multi_process in 1.34s]
    '''

    test_use_process_pool()
    # 輸出:
    '''
    Many processes will start...
    Run child process 7568, task name is: task0
    Run child process 7644, task name is: task1
    Run child process 7628, task name is: task2
    Run child process 7620, task name is: task3
    Run child process 7660, task name is: task4
    [finished function:do_simple_task in 1.20s]
    Run child process 7568, task name is: task5
    [finished function:do_simple_task in 1.20s]
    Run child process 7644, task name is: task6
    [finished function:do_simple_task in 1.20s]
    [finished function:do_simple_task in 1.20s]
    [finished function:do_simple_task in 1.20s]
    [finished function:do_simple_task in 1.20s]
    [finished function:do_simple_task in 1.20s]
    All processes end, results is: ['task0', 'task1', 'task2', 'task3', 'task4', 'task5', 'task6']
    [finished function:test_use_process_pool in 2.62s]
    '''
if __name__ == '__main__':
    main()

進程之間的通訊

進程間的通訊採用隊列來實現,實例代碼以下:

# coding:utf-8
# 測試進程間的通訊
import os
import time
from multiprocessing import Process,Pool,Queue
from utils import fn_timer
import random

# 寫進程執行的任務
def write(q):
    for value in ['A','B','C']:
        print 'Put value: {0} to queue.'.format(value)
        q.put(value)
        time.sleep(random.random())

# 讀進程執行的任務
def read(q):
    while True:
        value = q.get(True)
        print 'Get value: {0} from queue.'.format(value)

# 測試進程間的通訊
def test_communication_between_process():
    q = Queue()
    # 寫進程
    pw = Process(target = write,args = (q,))
    # 讀進程
    pr = Process(target = read,args = (q,))
    pw.start()
    pr.start()
    pw.join()
    # 由於讀任務是死循環,因此要強行結束
    pr.terminate()

def main():
    test_communication_between_process()
    # 輸出
    '''
    Put value: A to queue.
    Get value: A from queue.
    Put value: B to queue.
    Get value: B from queue.
    Put value: C to queue.
    Get value: C from queue.
    '''

if __name__ == '__main__':
    main()

實例講解協程的用法

下面用協程下載一樣的20個網頁,實例代碼以下:

# coding:utf-8
# 測試協程
import requests
import gevent
import utils
from utils import fn_timer
from gevent.pool import Pool
from gevent import monkey
# 打動態補丁,把標準庫中的thread/socket等替換掉,讓它們變成非阻塞的
monkey.patch_all()

session = requests.Session()
session.headers['User-Agent'] = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.131 Safari/537.36'

@fn_timer
def download_using_single_thread(urls):
    '''
    順序執行下載多個網頁
    :param urls: 要下載的網頁內容
    :return: 響應列表
    '''
    resps = []
    for url in urls:
        resps.append(session.get(url))
    return resps

@fn_timer
def download_using_coroutine(urls):
    '''
    使用協程下載
    :param urls: 要下載的網頁內容
    :return: 響應列表
    '''
    spawns = []
    for url in urls:
        spawns.append(gevent.spawn(session.get,url))
    # 在遇到IO操做時,gevent會自動切換,併發執行(異步IO)
    rets = gevent.joinall(spawns)
    # joinall函數會返回gevent.greenlet.Greenlet對象的列表,若是想要得到每次調用session.get函數的返回結果,還需分別調用每一個Greenlet對象的get函數
    results = [ret.get() for ret in rets]

@fn_timer
def download_using_coroutine_pool(urls):
    # 建立協程池,並設置最大併發量
    pool = Pool(20)
    # pool.map函數直接返回每次調用session.get函數返回的結果列表
    rets = pool.map(session.get,urls)

def main():
    # 1.使用單線程下載20個網頁
    download_using_single_thread(utils.urls)
    # 輸出:
    '''
    [finished function:download_using_single_thread in 1.83s]
    '''

    # 2.使用協程下載20個網頁
    download_using_coroutine(utils.urls)
    # 輸出:
    '''
    [finished function:download_using_coroutine in 0.69s]
    '''

    # 3.使用協程池下載20個網頁
    download_using_coroutine_pool(utils.urls)
    # 輸出:
    '''
    [finished function:download_using_coroutine_pool in 0.78s]
    '''

if __name__ == '__main__':
    main()

能夠發現,協程的效率也是很是高的。

多線程、多進程和協程併發效率的對比

下面分別使用線程池、進程池和協程池下載100個相同的網頁,來對比其效率:

# coding:utf-8
# 對比多線程、多進程和協程下載網頁
import requests
import utils
from utils import fn_timer
from multiprocessing.dummy import Pool as thread_pool
from multiprocessing import Pool as process_pool
from gevent.pool import Pool as gevent_pool
from gevent import monkey
# 打動態補丁,把標準庫中的thread/socket等替換掉,讓它們變成非阻塞的
monkey.patch_all()

session = requests.Session()
session.headers['User-Agent'] = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.131 Safari/537.36'

# 1. 使用線程池下載多個網頁的內容
@fn_timer
def download_using_thread_pool(urls):
    pool = thread_pool(100)
    # 第一個參數爲函數名,第二個參數一個可迭代對象,爲函數所需的參數列表
    resps = pool.map(session.get,urls)
    pool.close()
    pool.join()
    return resps

# 2. 測試使用進程池
@fn_timer
def download_using_process_pool(urls):
    # 建立一個進程池,數字表示一次性同時執行的最大子進程數
    pool = process_pool(100)
    # 任務返回值列表
    results = []
    # 併發執行多個任務,並獲取任務返回值
    results = pool.map_async(session.get,urls)
    pool.close()
    pool.join()
    return results.get()

# 3. 使用協程池下載
@fn_timer
def download_using_coroutine_pool(urls):
    # 建立協程池,並設置最大併發量
    pool = gevent_pool(100)
    pool.map(session.get,urls)

def main():
    # 1. 使用線程池下載100個網頁
    download_using_thread_pool(utils.urls * 5)
    # 輸出:
    '''
    [finished function:download_using_thread_pool in 3.68s]
    '''

    # 2. 使用進程池下載100個網頁
    download_using_process_pool(utils.urls * 5)
    # 輸出:
    '''
    卡死了
    '''

    # 3.使用協程池下載20個網頁
    download_using_coroutine_pool(utils.urls * 5)
    # 輸出:
    '''
    [finished function:download_using_coroutine_pool in 3.46s]
    '''

if __name__ == '__main__':
    main()

從結果來看,使用協程池的效率仍是略高一點。

做者:m2fox
連接:

https://www.jianshu.com/p/857e0780946b


識別圖中二維碼,領取python全套視頻資料

相關文章
相關標籤/搜索