1、線程池 1、concurrent.futures模塊 介紹 concurrent.futures模塊提供了高度封裝的異步調用接口 ThreadPoolExecutor:線程池,提供異步調用 ProcessPoolExecutor: 進程池,提供異步調用 在這個模塊中進程池和線程池的使用方法徹底同樣 這裏就只介紹ThreadPoolExecutor的使用方法,順便對比multiprocessing的Pool進程池 2、基本方法 submit(fn, *args, **kwargs):異步提交任務 map(func, *iterables, timeout=None, chunksize=1) :取代for循環submit的操做,iterables的每一個元素都做爲參數傳給func shutdown(wait=True) : 至關於進程池的pool.close()+pool.join()操做 wait=True,等待池內全部任務執行完畢回收完資源後才繼續 wait=False,當即返回,並不會等待池內的任務執行完畢 但無論wait參數爲什麼值,整個程序都會等到全部任務執行完畢 submit和map必須在shutdown以前 result(timeout=None):取得返回值 add_done_callback(fn):設置回調函數 3、例子:(對比multiprocessing的Pool模塊) 3-1、 import time from concurrent.futures import ThreadPoolExecutor def func(i): print('thread',i) time.sleep(1) print('thread %s end'%i) tp = ThreadPoolExecutor(5) # 至關於tp = Pool(5) tp.submit(func,1) # 至關於tp.apply_async(func,args=(1,)) tp.shutdown() # 至關於tp.close() + tp.join() print('主線程') 3-2、 import time from concurrent.futures import ThreadPoolExecutor from threading import currentThread def func(i): print('thread',i,currentThread().ident) time.sleep(1) print('thread %s end'%i) tp = ThreadPoolExecutor(5) for i in range(20): tp.submit(func,i) tp.shutdown() # shutdown一次就夠了,會自動把全部的線程都join() print('主線程') 3-3、返回值 import time from concurrent.futures import ThreadPoolExecutor from threading import currentThread def func(i): print('thread',i,currentThread().ident) time.sleep(1) print('thread %s end' %i) return i * '*' tp = ThreadPoolExecutor(5) ret_lst = [] for i in range(20): ret = tp.submit(func,i) ret_lst.append(ret) for ret in ret_lst: print(ret.result()) # 至關於ret.get() print('主線程') 3-4、map map接收一個函數和一個可迭代對象 可迭代對象的每個值就是函數接收的實參,可迭代對象的長度就是建立的線程數量 map能夠直接拿到返回值的可迭代對象(列表),循環就能夠獲取返回值 import time from concurrent.futures import ThreadPoolExecutor def func(i): print('thread',i) time.sleep(1) print('thread %s end'%i) return i * '*' tp = ThreadPoolExecutor(5) ret = tp.map(func,range(20)) for i in ret: print(i) 3-5、回調函數 回調函數在進程池是由主進程實現的 回調函數在線程池是由子線程實現的 import time from concurrent.futures import ThreadPoolExecutor from threading import currentThread def func(i): print('thread',i,currentThread().ident) time.sleep(1) print('thread %s end'%i) return i * '*' def call_back(arg): print('call back : ',currentThread().ident) print('ret : ',arg.result()) # multiprocessing的Pool回調函數中的參數不須要get(),這裏須要result() tp = ThreadPoolExecutor(5) ret_lst = [] for i in range(20): tp.submit(func,i).add_done_callback(call_back) # 使用add_done_callback()方法實現回調函數 print('主線程',currentThread().ident) 從結果能夠看出: 子線程func執行完畢以後纔去執行callback回調函數 子線程func的返回值會做爲回調函數的參數 4、總結 線程池 實例化線程池 ThreadPoolExcutor 5*cpu_count 異步提交任務 submit / map 阻塞直到任務完成 shutdown 獲取子線程的返回值 result 回調函數 add_done_callback 2、協程概念介紹 1、介紹 進程 :計算機中最小的資源分配單位 線程 :計算機中能被cpu執行的最小單位 協程(纖程):一條線程在多個任務之間來回切換就叫協程 切換這個動做是浪費時間的 對於CPU、操做系統來講協程是不存在的 他們只能看到線程 協程的本質就是一條線程在多個任務之間來回切換,因此徹底不會產生數據安全的問題 協程的理解: 把一個線程的執行明確的切分開 好比有兩個任務,使用協程它幫助你記住哪一個任務執行到哪一個位置上了,而且實現安全的切換 當一個任務陷入阻塞,在這個任務阻塞的過程當中切換到另外一個任務中執行另外一個任務 你的程序只要還有任務須要執行 你的當前線程永遠不會阻塞 利用協程在多個任務陷入阻塞的時候進行切換來保證一個線程在處理多個任務的時候老是忙碌的 可以更加充分的利用CPU,搶佔更多的時間片 不管是進程、仍是線程都是由操做系統來切換的,開啓過多的線程、進程會給操做系統的調度增長負擔 若是咱們是使用協程,協程在程序之間的切換操做系統感知不到,不管開啓多少個協程對操做系統來講老是一個線程 操做系統的調度不會有任何壓力 2、用生活中的例子來解釋: 協程: 好比你本身一我的 作飯(協程)須要半個小時,你能夠先洗米,而後把米飯交給電飯煲煮,這個時候煮飯就陷入阻塞了,你就能夠去作其餘家務了, 好比這個時候你能夠去洗衣服(協程),你把衣服放進洗衣機後,這個任務也陷入阻塞了,而後你又能夠去作其餘事情, 好比這個時候你能夠收拾屋子 (協程),而後在你收拾屋子的時候,米飯煮好了,你就去關電飯煲,過一段時間,衣服也洗好了, 你就去關洗衣機,晾衣服,其餘時間你都在收拾屋子,那麼這樣你的時間就利用得很充分了。 多線程(多進程): 上面的任務,你請了幾我的幫你一塊兒作,每一個人都只作本身的那件事, 好比你作飯,而後一我的洗衣服,另外一我的收拾屋子,這樣的話在阻塞的時間裏,每一個人都是在等待的狀態,沒有充分利用時間, 且成本高(你請人須要人工) 三、強調: 1. python的線程屬於內核級別的,即由操做系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其餘線程運行) 2. 單線程內開啓協程,一旦遇到io,就會從應用程序級別(而非操做系統)控制切換,以此來提高效率(非io操做的切換與效率無關)
四、對比操做系統控制線程的切換,用戶在單線程內控制協程的切換 優勢: 1. 協程的切換開銷更小,屬於程序級別的切換,操做系統徹底感知不到,於是更加輕量級 2. 單線程內就能夠實現併發的效果,最大限度地利用cpu 缺點: 1. 協程的本質是單線程下,沒法利用多核,能夠是一個程序開啓多個進程,每一個進程內開啓多個線程,每一個線程內開啓協程 2. 協程指的是單個線程,於是一旦協程出現阻塞,將會阻塞整個線程 5、總結: 必須在只有一個單線程裏實現併發 修改共享數據不需加鎖 用戶程序裏本身保存多個控制流的上下文棧 線程的調度是操做系統級別的
協程的調度是用戶級別的 3、greenlet模塊和gevent模塊(都是擴展模塊) 1、介紹 greenlet:是gevent的底層,協程切換的模塊,可是當切到一個任務執行時若是遇到io,那就原地阻塞,仍然是沒有解決遇到IO自動切換來提高效率的問題。 gevent:基於greenlet優化而來的一個模塊,gevent能提供更全面的功能,遇到io操做會自動切換任務(因此通常直接用這個模塊就好)。 gevent用法介紹: g1=gevent.spawn(func,1,,2,3,x=4,y=5)建立一個協程對象g1,spawn括號內第一個參數是函數名,後面能夠有多個參數,能夠是位置實參或關鍵字實參,都是傳給你指定的函數 g2=gevent.spawn(func2) g1.join() #等待g1結束 g2.join() #等待g2結束 上面兩個join能夠寫成一個:gevent.joinall([g1,g2]) # 參數是列表類型 g1.value #拿到func的返回值 注意:在gevent模塊中,有些阻塞它是不認識的,好比time模塊的sleep,若是直接導入time模塊,使用time.sleep(),在gevent模塊的協程中並不會阻塞,由於gevent不認識time模塊, 那麼如何解決呢? 在導入time模塊前先寫上下面這兩行代碼,再導入time模塊,這樣time模塊的內容它就認識了,time.sleep()也會阻塞了: from gevent import monkey monkey.patch_all() # patch_all就是把下面的模塊的阻塞打成一個包,認識他們 import time 而在greenlet模塊中,time的模塊的阻塞它自己就是認識的。 2、例子 2-1、greenlet例子: import time from greenlet import greenlet def cooking(): print('cooking 1') g2.switch() # 切換到g2,讓g2的函數工做 time.sleep(1) print('cooking 2') def watch(): print('watch TV 1') time.sleep(1) print('watch TV 2') g1.switch() # 切換到g1,讓g1的函數工做 g1 = greenlet(cooking) g2 = greenlet(watch) g1.switch() # 切換到g1,讓g1的函數工做 greenlet的缺陷:很顯然greenlet實現了協程的切換功能,能夠本身設置何時切,在哪切,可是它遇到阻塞並無自動切換, 所以並不能提升效率。因此通常咱們都使用gevent模塊實現協程 2-2、gevent例子: from gevent import monkey monkey.patch_all() import time import gevent def cooking(): print('cooking 1') time.sleep(1) print('cooking 2') def watch(): print('watch TV 1') time.sleep(1) print('watch TV 2') g1 = gevent.spawn(cooking) # 自動檢測阻塞事件,碰見阻塞了就會進行切換 g2 = gevent.spawn(watch) g1.join() # 阻塞直到g1結束 g2.join() # 阻塞直到g2結束
2-三、gevent例子2:
import gevent def cooking(i): print('%s號在煮飯' %i) return i g_lst = [] for i in range(10): g = gevent.spawn(cooking,i) # 函數名,參數 g_lst.append(g) # 把協程對象放入列表 for g in g_lst: g.join() print(g.value) # 打印返回值 # gevent.joinall(g_lst) # joinall一次性把所有對象都阻塞
2-4、協程名: from gevent import monkey monkey.patch_all() import time import gevent from threading import currentThread def cooking(): print('cooking name:',currentThread().getName()) print('cooking 1') time.sleep(1) print('cooking 2') def watch(): print('watch name:', currentThread().getName()) print('watch TV 1') time.sleep(1) print('watch TV 2') g1 = gevent.spawn(cooking) g2 = gevent.spawn(watch) g1.join() g2.join()
# gevent.joinall([g1,g2]) 結果: cooking name: DummyThread-1 # Dummy的意思是假的,即協程是假線程,它只是同一個線程在任務間來回切換 cooking 1 watch name: DummyThread-2 watch TV 1 cooking 2 watch TV 2 2-4、基於協程的爬蟲例子: from gevent import monkey monkey.patch_all() import time import gevent import requests # 擴展模塊 url_lst = [ 'http://www.baidu.com', 'http://www.4399.com', 'http://www.sohu.com', 'http://www.jd.com', 'http://www.sina.com', 'https://www.douban.com', 'http://www.sohu.com', 'http://www.baidu.com', 'http://www.4399.com', 'http://www.sohu.com', 'http://www.jd.com', 'http://www.sina.com', 'https://www.douban.com', 'http://www.sohu.com', 'http://www.baidu.com', 'http://www.4399.com', 'http://www.sohu.com', 'http://www.jd.com', 'http://www.sina.com', 'https://www.douban.com', 'http://www.sohu.com', 'http://www.baidu.com', 'http://www.4399.com', 'http://www.sohu.com', 'http://www.jd.com', 'http://www.sina.com', 'https://www.douban.com', 'http://www.sohu.com' ] def get_url(url): response = requests.get(url) if response.status_code == 200: # response.status_code的值是200的時候才表明爬取成功 print(url,len(response.text)) # response.text是爬取的網頁內容,這裏只打印一下內容的長度 # 普通方式爬取網頁 # start = time.time() # for url in url_lst: # get_url(url) # print(time.time()-start) # 使用時間:5.616770267486572 # 使用協程爬取網頁 start = time.time() g_lst = [] for url in url_lst: g = gevent.spawn(get_url,url) g_lst.append(g) gevent.joinall(g_lst) print(time.time()-start) # 使用時間:1.8181169033050537 2-5、基於gevent的socket 注意:from gevent import monkey;monkey.patch_all()必定要放到導入socket模塊以前,不然gevent沒法識別socket的阻塞 Server端: from gevent import monkey monkey.patch_all() import socket import gevent from threading import currentThread def talk(conn): print('當前協程:',currentThread()) while 1: conn.send(b'hello') print(conn.recv(1024)) # 接收的時候阻塞,切換到另外一個任務 sk = socket.socket() sk.bind(('127.0.0.1',8000)) sk.listen() while 1: conn,addr = sk.accept() gevent.spawn(talk,conn) # 這裏不須要join了,由於accept會阻塞,並且若是有join了以後, # 第一個協程沒有運行完畢這裏的循環就不會繼續走了 Client端: import socket from threading import Thread def client(): sk = socket.socket() sk.connect(('127.0.0.1',8000)) while 1: print(sk.recv(1024)) sk.send(b'hi') for i in range(500): # 開啓500個線程,即500個用戶 Thread(target=client).start()