Thread實例對象的方法 # isAlive(): 返回線程是不是活動的。 # getName(): 返回線程名。 # setName(): 設置線程名。 threading模塊提供的一些方法: # threading.currentThread(): 返回當前的線程變量對象。 # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。 # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果
import threading import time from threading import Thread,current_thread def f1(n): time.sleep(1) print('子線程對象', current_thread()) # <Thread(Thread-1, started 123145336967168)> print('子線程名稱', current_thread().getName()) # 當前線程對象 Thread-1 print('子線程ID', current_thread().ident) # 123145336967168 print('%s號線程任務'%n) if __name__ == '__main__': t1 = Thread(target=f1,args=(1,)) t1.start() print('主線程對象',current_thread()) # <_MainThread(MainThread, started 140734833878464)> print('主線程名稱',current_thread().getName()) # 當前線程對象(是主線程對象) MainThread print('主線程ID',current_thread().ident) # 當前線程ID 140734833878464 print(threading.enumerate()) # [<_MainThread(MainThread, started 140734833878464)>, <Thread(Thread-1, started 123145336967168)>] print(threading.active_count()) # 2 """ 結果: 主線程對象 <_MainThread(MainThread, started 140734833878464)> 主線程名稱 MainThread 主線程ID 140734833878464 [<_MainThread(MainThread, started 140734833878464)>, <Thread(Thread-1, started 123145336967168)>] 2 子線程對象 <Thread(Thread-1, started 123145336967168)> 子線程名稱 Thread-1 子線程ID 123145336967168 1號線程任務 # 小結: threading.current_thread() <==等效於==> Thread(target=f1) #這兩個等效的前提是: 左邊 的位置要跟 右邊target(目標函數)所在位置 同樣,即左邊的是獲取當前位置的線程變量對象,右邊的是在target(目標函數)所在位置建立線程對象. """
線程隊列,不須要從threading模塊裏面導入,直接import queue就能夠了,這是python自帶的python
queue隊列 :使用import queue,用法與進程隊列 multiprocessing.Queue 同樣,也有如下方法:nginx
# put,put_nowait,get,get_nowait,full,empty,qsize q = Queue(5) # 5是size q.put() #放入數據,滿了會等待(阻塞) q.get() #獲取數據,沒有數據了會等待(阻塞) q.qsize() # 當前放進去的元素的個數 q.empty() # 是否爲空,不可靠(由於多線程) q.full() # 是否滿了,不可靠(由於多線程) q.put_nowait() #添加數據,不等待,可是隊列滿了報錯 q.get_nowait() #獲取數據,不等待,可是隊列空了報錯
class queue.
Queue
(maxsize=0) #先進先出(FIFO: fisrt in fisrt out)數組
import queue # 線程中的隊列使用的是這個,等效於進程中的隊列 put,put_nowait,get,get_nowait,full,empty q = queue.Queue(4) # FIFO先進先出 first in first out q.put(1) q.put(2) print(q.full()) # 不滿 # print('當前隊列內容的長度',q.qsize()) q.put(3) print(q.full()) # 滿 # q.put(4) # 不報錯,會阻塞 print(q.qsize()) try: q.put_nowait(4) # 報錯queue.Full except Exception: print('queue full') print(q.get()) print(q.get()) print(q.empty()) # 不空 print(q.get()) print(q.empty()) # 空 print(q.get()) # 不報錯,會阻塞 try: print(q.get_nowait()) # 報錯queue.Empty except Exception: print('queue empty')
class queue.
LifoQueue
(maxsize=0) #先進後出隊列(或者後進先出(LIFO: last in fisrt out)),相似於棧網絡
q = queue.LifoQueue(3) # Lifo q.put(1) q.put(2) print(q.full()) # 不滿 # print('當前隊列內容的長度',q.qsize()) q.put(3) print(q.full()) # 滿 # q.put(4) # 不報錯,會阻塞 print(q.qsize()) try: q.put_nowait(4) # 報錯queue.Full except Exception: print('queue full') print(q.get()) print(q.get()) print(q.empty()) # 不空 print(q.get()) print(q.empty()) # 空 print(q.get()) # 不報錯,會阻塞 try: print(q.get_nowait()) # 報錯queue.Empty except Exception: print('queue empty')
class queue.
PriorityQueue
(maxsize=0) #優先級的隊列(存儲數據時可設置優先級)多線程
# 優先級隊列 PriorityQueue # put的數據是一個元組,元組的第一個參數是優先級數字(一般是數字,也能夠是非數字之間的比較),數字越小優先級越高,越先被get拿到被取出來,第二個參數是put進去的值(能夠是任意的數據類型) # 若是說優先級(第一個參數)相同,那麼比較值(第二個參數),值必須是相同的數據類型(不包括字典),不然報錯 # 比較第二個參數: # 若是第二個參數(或者其參數的元素)是數字: 數字==直接拿總體的數字==>比較大小, # 若是第二個參數(或者其參數的元素)是字符串:字符串=依次取到每一個字符=>比較每一個字符的ASCII碼. q = queue.PriorityQueue(10) q.put((-5, 'alex')) # 放入元組,第一個元素是優先級(能夠爲負數,越小,優先級越高),第二個是真正的數據(數據類型隨意) q.put((2, 'blex')) q.put((3, 'clex')) q.put((3, '111')) print(q.get()) print(q.get()) print(q.get()) print(q.get()) print('=======================') q.put(('x', 123)) q.put(('y', 345)) print(q.get()) print(q.get()) print('=======================') """ ('x', 123) ('y', 345) """ q.put((5, 'alex')) # 放入元組,第一個元素是優先級(能夠爲負數,越小,優先級越高),第二個是真正的數據(數據類型隨意) q.put((2, 1)) q.put((3, (1,))) # q.put((7, {1,2})) # 優先級相同數據類型不一樣,報錯TypeError: '<' not supported between instances of 'dict' and 'set' q.put((7, {1:2})) q.put((7, {1:'a'})) # 優先級相同數據類型都是字典,報錯TypeError: '<' not supported between instances of 'dict' and 'dict' print(q.get()) print(q.get()) print(q.get()) print(q.get()) print('=======================')
統一使用方式,使用threadPollExecutor和ProcessPollExecutor的方式同樣,並且只要經過這個concurrent.futures導入就能夠直接用他們兩個了併發
concurrent.futures模塊提供了高度封裝的異步調用接口 ThreadPoolExecutor:線程池,提供異步調用 ProcessPoolExecutor: 進程池,提供異步調用 Both implement the same interface, which is defined by the abstract Executor class. 二者實現相同的接口,該接口由抽象Executor類定義。 #2 基本方法 #submit(fn, *args, **kwargs) 異步提交任務(萬能傳參,傳入的實參能夠是任意數據類型,注意fn的形參數量要和這裏的實參數量對應) #map(func, *iterables, timeout=None, chunksize=1) 取代for循環submit的操做(參數1是函數,參數2是可迭代對象) #shutdown(wait=True) ==>close()+join() 至關於進程池的multiprocessing.Pool().close()+multiprocessing.Pool().join()操做 wait=True,等待池內全部任務執行完畢回收完資源後才繼續 wait=False,當即返回,並不會等待池內的任務執行完畢 但無論wait參數爲什麼值,整個程序都會等到全部任務執行完畢 submit和map必須在shutdown以前 #result(timeout=None) 取得結果(至關於pool.get()) #add_done_callback(fn) 回調函數(功能相似於pool的callback,可是顯然用法不一樣) """ multiprocessing.Pool和concurrent.futures.ThreadPoolExecutor,ProcessPoolExecutor中回調函數的區別: 進程的回調函數res = pool.apply_async(f1,args=(5,),callback=call_back_func) (這裏的callback是默認的關鍵字,call_back_func是自定義的回調函數名)==>做爲異步對象的參數調用 線程的回調函數res = tp.submit(f1,11,12).add_done_callback(f2) (這裏的add_done_callback是默認的回調函數名,f2是自定義的回調函數)==>做爲異步對象的方法調用) """
上栗子:app
import time import os import threading from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor def func(n): time.sleep(2) print('%s打印的:'%(threading.get_ident()),n) return n*n tpool = ThreadPoolExecutor(max_workers=5) #默認通常起線程的數據不超過CPU個數*5 # tpool = ProcessPoolExecutor(max_workers=5) #進程池的使用只須要將上面的ThreadPoolExecutor改成ProcessPoolExecutor就好了,其餘都不用改 #異步執行 t_lst = [] for i in range(5): t = tpool.submit(func,i) #提交執行函數,返回一個結果對象,i做爲任務函數的參數 def submit(self, fn, *args, **kwargs): 能夠傳任意形式的參數 t_lst.append(t) # # print(t.result()) #這個返回的結果對象t,不能直接去拿結果,否則又變成串行了,能夠理解爲拿到一個號碼,等全部線程的結果都出來以後,咱們再去經過結果對象t獲取結果 tpool.shutdown() #起到原來的close阻止新任務進來 + join的做用,等待全部的線程執行完畢 print('主線程') for ti in t_lst: print('>>>>',ti.result()) # 咱們還能夠不用shutdown(),用下面這種方式 # while 1: # for n,ti in enumerate(t_lst): # print('>>>>', ti.result(),n) # time.sleep(2) #每一個兩秒去去一次結果,哪一個有結果了,就能夠取出哪個,想表達的意思就是說不用等到全部的結果都出來再去取,能夠輪詢着去取結果,由於你的任務須要執行的時間很長,那麼你須要等好久才能拿到結果,經過這樣的方式能夠將快速出來的結果先拿出來。若是有的結果對象裏面尚未執行結果,那麼你什麼也取不到,這一點要注意,不是空的,是什麼也取不到,那怎麼判斷我已經取出了哪個的結果,能夠經過枚舉enumerate來搞,記錄你是哪個位置的結果對象的結果已經被取過了,取過的就再也不取了 #結果分析: 打印的結果是沒有順序的,由於到了func函數中的sleep的時候線程會切換,誰先打印就沒準兒了,可是最後的咱們經過結果對象取結果的時候拿到的是有序的,由於咱們主線程進行for循環的時候,咱們是按順序將結果對象添加到列表中的。 # 37220打印的: 0 # 32292打印的: 4 # 33444打印的: 1 # 30068打印的: 2 # 29884打印的: 3 # 主線程 # >>>> 0 # >>>> 1 # >>>> 4 # >>>> 9 # >>>> 16
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import time def f1(n,s): # 要與 萬能傳參 的參數數量一致 time.sleep(1) # print(n,s) return n * n if __name__ == '__main__': tp = ThreadPoolExecutor(4) # 線程 默認的線程個數是cpu個數 * 5 # tp = ProcessPoolExecutor(4) # 進程 默認的進程個數是cpu個數 這兩個的方法一致 # tp.map(f1, range(10)) # 異步提交任務,參數是(任務名,可迭代對象) res_lis = [] for i in range(10): res = tp.submit(f1,i,'baobao') # submit是給線程池異步提交任務,萬能傳參 # print(res) # <Future at 0x10617a208 state=running> res_lis.append(res) for t in res_lis: # 4個4個的打印 print(t.result()) tp.shutdown() # ==等效於==> close + join 主線程等待全部提交給線程池的任務所有執行完畢 for t in res_lis: # 所有一塊兒打印 print(t.result()) # 結果對象.result,#和get方法同樣,若是沒有結果,會等待,阻塞程序 print('主線程') """ 只須要將這一行代碼改成下面這一行就能夠了,其餘的代碼都不用變 tpool = ThreadPoolExecutor(max_workers=5) #默認通常起線程的數據不超過CPU個數*5 # tpool = ProcessPoolExecutor(max_workers=5)#默認通常起線程的數據不超過CPU個數 你就會發現爲何將線程池和進程池都放到這一個模塊裏面了,由於用法同樣,因此不論是線程池仍是進程池,更推薦使用這個from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor """
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor def func(n): time.sleep(2) return n*n def call_back(m): print('結果爲:%s'%(m.result())) # 注意回調函數拿到的是線程(進程)對象,想要拿到值須要調用result方法 tpool = ThreadPoolExecutor(max_workers=5) t_lst = [] for i in range(5): t = tpool.submit(func,i).add_done_callback(call_back) """ 結果爲:0 結果爲:1 結果爲:4 結果爲:9 結果爲:16 """
協程:是單線程下的併發,又稱微線程,纖程。英文名Coroutine。一句話說明什麼是線程:協程是一種用戶態的輕量級線程,即協程是由用戶程序本身控制調度的。負載均衡
須要強調的是:異步
#1. python的線程屬於內核級別的,即由操做系統控制調度(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其餘線程運行) #2. 單線程內開啓協程,一旦遇到io,就會從應用程序級別(而非操做系統)控制切換,以此來提高效率(!!!非io操做的切換與效率無關)
操做系統控制線程的切換 <==對比==> 用戶在單線程內控制協程的切換socket
#1. 協程的切換開銷更小,屬於程序級別的切換,操做系統徹底感知不到,於是更加輕量級 #2. 單線程內就能夠實現併發的效果,最大限度地利用cpu
#1. 協程的本質是單線程下,沒法利用多核,能夠是一個程序開啓多個進程,每一個進程內開啓多個線程,每一個線程內開啓協程 #2. 協程指的是單個線程,於是一旦協程出現阻塞,將會阻塞整個線程
# 1.必須在只有一個單線程裏實現併發 # 2.修改共享數據不需加鎖 # 3.用戶程序裏本身保存多個控制流的上下文棧 # 4.附加:一個協程遇到IO操做自動切換到其它協程(如何實現檢測IO,yield、greenlet都沒法實現,就用到了gevent模塊(select機制))
協程就是告訴Cpython解釋器,你不是nb嗎,不是搞了個GIL鎖嗎,那好,我就本身搞成一個線程讓你去執行,省去你切換線程的時間,我本身切換比你切換要快不少,避免了不少的開銷,對於單線程下,咱們不可避免程序中出現io操做,但若是咱們能在本身的程序中(即用戶程序級別,而非操做系統級別)控制單線程下的多個任務能在一個任務遇到io阻塞時就切換到另一個任務去計算,這樣就保證了該線程可以最大限度地處於就緒態,即隨時均可以被cpu執行的狀態,至關於咱們在用戶程序級別將本身的io操做最大限度地隱藏起來,從而能夠迷惑操做系統,讓其看到:該線程好像是一直在計算,io比較少,從而更多的將cpu的執行權限分配給咱們的線程。
協程的本質就是在單線程下,由用戶本身控制一個任務遇到io阻塞了就切換另一個任務去執行,以此來提高效率。爲了實現它,咱們須要找尋一種能夠同時知足如下條件的解決方案:
#1. 能夠控制多個任務之間的切換,切換以前將任務的狀態保存下來,以便從新運行時,能夠基於暫停的位置繼續執行。 #2. 做爲1的補充:能夠檢測io操做,在遇到io操做的狀況下才發生切換
併發的本質:任務切換+保存狀態,yield自己就是一種在單線程下能夠保存任務運行狀態的方法,
#1 yield能夠保存狀態,yield的狀態保存與操做系統的保存線程狀態很像,可是yield是代碼級別控制的,更輕量級 #2 send能夠把一個函數的結果傳給另一個函數,以此實現單線程內程序之間的切換
import time #基於yield併發執行,多任務之間來回切換,這就是個簡單的協程的體現,可是他可以節省I/O時間嗎?不能,yield不能檢測IO,不能實現遇到IO自動切換 def f1(): for i in range(3): time.sleep(0.5) # 發現什麼?只是進行了切換,可是並無節省I/O時間 print('f1>>',i) # yield def f2(): # g = f1() for i in range(3): time.sleep(0.5) print('f2>>', i) # next(g) #不寫yield,下面兩個任務是執行完func1裏面全部的程序纔會執行func2裏面的程序,有了yield,咱們實現了兩個任務的切換+保存狀態 #基於yield保存狀態,實現兩個任務直接來回切換,即併發的效果 #PS:若是每一個任務中都加上打印,那麼明顯地看到兩個任務的打印是你一次我一次,即併發執行的. f1() f2() """ f1>> 0 f1>> 1 f1>> 2 f2>> 0 f2>> 1 f2>> 2 有了yield: f2>> 0 f1>> 0 f2>> 1 f1>> 1 f2>> 2 f1>> 2
#安裝==>在終端輸入如下代碼 pip3 install greenlet
import time from greenlet import greenlet # 真正的協程模塊就是使用greenlet完成的切換 def f1(s): print('第一次f1==>'+s) g2.switch('taibai') #切換到g2這個對象的任務去執行 time.sleep(1) print('第一次f1==>'+s) g2.switch() def f2(s): print('第一次f2==>'+s) g1.switch() time.sleep(1) print('第二次f2==>'+s) g1 = greenlet(f1) #實例化一個greenlet對象,並將任務名稱做爲參數傳進去 g2 = greenlet(f2) g1.switch('alex') #執行g1對象裏面的任務,能夠在第一次switch時傳入參數,之後都不須要 """ greenlet只是提供了一種比generator更加便捷的切換方式,當切到一個任務執行時若是遇到io,那就原地阻塞,仍然是沒有解決遇到IO自動切換來提高效率的問題。 """
通常在工做中咱們都是進程+線程+協程的方式來實現併發,以達到最好的併發效果,若是是4核的cpu,通常起5個進程,每一個進程中20個線程(5倍cpu數量),每一個線程能夠起500個協程,大規模爬取頁面的時候,等待網絡延遲的時間的時候,咱們就能夠用協程去實現併發。 併發數量 = 5 * 20 * 500 = 50000個併發,這是通常一個4cpu的機器最大的併發數。nginx在負載均衡的時候最大承載量就是5w個。
#安裝==>在終端輸入如下代碼 pip3 install gevent
from gevent import monkey;monkey.patch_all() # 必須寫在最上面,這句話後面的全部阻塞所有可以識別了 import gevent import time import threading # 遇到IO阻塞時會自動切換任務 def f1(name): print(f'{name}==第一次f1') print(threading.current_thread().getName()) # DummyThread-1 假線程,虛擬線程 # gevent.sleep(1) # gevent默承認以識別的io阻塞 time.sleep(2) # 加上mokey就可以識別到time模塊的sleep了 print(f'{name}==第二次f1') return name def f2(name): print(threading.current_thread().getName()) # DummyThread-2 print(f'{name}==第一次f2') # gevent.sleep(2) time.sleep(2) # 來回切換,直到一個I/O的時間結束,這裏都是咱們的gevent作得,再也不是控制不了的操做系統了。 print(f'{name}==第二次f2') s = time.time() g1 = gevent.spawn(f1,'alex') #異步提交了f1任務 g2 = gevent.spawn(f2,name='egon') #建立一個協程對象g2,spawn括號內第一個參數是函數名,如f2,後面能夠有多個參數,能夠是位置實參或關鍵字實參,都是傳給函數f2的,spawn是異步提交任務 # g1.join() # 等待g1結束,上面只是建立協程對象,這個join纔是去執行 # g2.join() # 等待g2結束 有人測試的時候會發現,不寫第二個join也能執行g2,是的,協程幫你切換執行了,可是你會發現,若是g2裏面的任務執行的時間長,可是不寫join的話,就不會執行完等到g2剩下的任務了 gevent.joinall([g1,g2]) # 這裏等價於上述join兩步合做一步 print(g1.value)#拿到func1的返回值 e = time.time() print('執行時間:',e-s) # 測試執行時間 print('主程序任務') """ 結果: alex==第一次f1 DummyThread-1 DummyThread-2 egon==第一次f2 alex==第二次f1 egon==第二次f2 alex 執行時間: 2.004991054534912 主程序任務 """ ''' # spawn是類方法,參數是萬能的 @classmethod def spawn(cls, *args, **kwargs): # 萬能形參==>實參能夠隨便傳入 g = cls(*args, **kwargs) g.start() return g ''' # 咱們能夠用threading.current_thread().getName()來查看每一個g1和g2,查看的結果爲DummyThread-n,即假線程,虛擬線程,其實都在一個線程裏面 # 進程線程的任務切換是由操做系統自行切換的,你本身不能控制 # 協程是經過本身的程序(代碼)來進行切換的,本身可以控制,只有遇到協程模塊可以識別的IO操做的時候,程序纔會進行任務切換,實現併發效果,若是全部程序都沒有IO操做,那麼就基本屬於串行執行了。
from gevent import spawn,joinall,monkey;monkey.patch_all() import time def task(pid): """ Some non-deterministic task """ time.sleep(0.5) print('Task %s done' % pid) def synchronous(): for i in range(10): task(i) def asynchronous(): g_l=[spawn(task,i) for i in range(10)] joinall(g_l) if __name__ == '__main__': print('Synchronous:') synchronous() print('Asynchronous:') asynchronous() #上面程序的重要部分是將task函數封裝到greenlet內部線程的gevent.spawn。 初始化的greenlet列表存放在數組threads中,此數組被傳給gevent.joinall 函數,後者阻塞當前流程,並執行全部給定的greenlet。執行流程只會在 全部greenlet執行完後纔會繼續向下走。 """ # 結果: Synchronous:同步,一個一個的打印 Task 0 done Task 1 done Task 2 done Task 3 done Task 4 done Task 5 done Task 6 done Task 7 done Task 8 done Task 9 done Asynchronous:異步,一塊兒打印 Task 0 done Task 1 done Task 2 done Task 3 done Task 4 done Task 5 done Task 6 done Task 7 done Task 8 done Task 9 done """
1,線程的其餘方法
Threading.current_thread() #當前線程對象
GetName() 獲取線程名
Ident 獲取線程id
Threading.Enumerate() #當前正在運行的線程對象的一個列表
Threading.active_count() #當前正在運行的線程數量
2,線程隊列(重點)
Import queue
先進先出隊列:queue.Queue(3)
先進後出\後進先出隊列:queue.LifoQueue(3)
優先級隊列:queue.priorityQueue(3)
Put的數據是一個元組,元組的第一個參數是優先級數字,數字越小優先級越高,越先被get到被取出來,第二個參數是put進去的值,若是說優先級相同,那麼值別忘了應該是相同的數據類型,字典不行
From concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
P = ThreadPoolExecutor(4) #默認的線程個數是cpu個數 * 5
P = ProcessPoolExecutor(4) #默認的進程個數是cpu個數
P.map(f1,可迭代的對象) #異步執行
Def f1(n1,n2):
Print(n1,n2)
P.submit(f1,11,12) #異步提交任務
Res = P.submit(f1,11,12)
Res.result() #和get方法同樣,若是沒有結果,會等待,阻塞程序
Shutdown() #close+join,鎖定線程池,等待線程池中全部已經提交的任務所有執行完畢
今日做業
明天默寫: