Python之路--Python基礎11--併發編程之線程

1、threading模塊介紹

multiprocess模塊的徹底模仿了threading模塊的接口,兩者在使用層面,有很大的類似性,於是再也不詳細介紹html

官網連接:https://docs.python.org/3/library/threading.html?highlight=threading#python

 

2、開啓線程的兩種方式

#方式一
from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.start() print('主線程')
#方式二
from threading import Thread import time class Sayhi(Thread): def __init__(self,name): super().__init__() self.name=name def run(self): time.sleep(2) print('%s say hello' % self.name) if __name__ == '__main__': t = Sayhi('egon') t.start() print('主線程')

 

3、在一個進程下開啓多個線程與在一個進程下開啓多個子進程的區別

在同一個進程下面,線程開啓的速度比進程開啓的速度快。mysql

from threading import Thread from multiprocessing import Process import os def work(): print('hello') if __name__ == '__main__': #在主進程下開啓線程
    t=Thread(target=work) t.start() print('主線程/主進程') ''' 打印結果: hello 主線程/主進程 '''

    #在主進程下開啓子進程
    t=Process(target=work) t.start() print('主線程/主進程') ''' 打印結果: 主線程/主進程 hello '''

線程的pid與主進程的pid相同,每一個子進程pid都跟主進程的pid不一樣git

from threading import Thread from multiprocessing import Process import os def work(): print('hello',os.getpid()) if __name__ == '__main__': #part1:在主進程下開啓多個線程,每一個線程都跟主進程的pid同樣
    t1=Thread(target=work) t2=Thread(target=work) t1.start() t2.start() print('主線程/主進程pid',os.getpid()) #part2:開多個進程,每一個進程都有不一樣的pid
    p1=Process(target=work) p2=Process(target=work) p1.start() p2.start() print('主線程/主進程pid',os.getpid()) # hello 11148 # hello 11148 # 主線程/主進程pid 11148 # 主線程/主進程pid 11148 # hello 11484 # hello 1208

同一進程內的線程共享該進程的數據github

from  threading import Thread from multiprocessing import Process import os def work(): global n n=0 if __name__ == '__main__': # n=100
    # p=Process(target=work)
    # p.start()
    # p.join()
    # print('主',n) #毫無疑問子進程p已經將本身的全局的n改爲了0,但改的僅僅是它本身的,查看父進程的n仍然爲100 n=1 t=Thread(target=work) t.start() t.join() print('',n) #查看結果爲0,由於同一進程內的線程之間共享進程內的數據

 

4、線程相關方法

Thread實例對象的方法web

  # isAlive(): 返回線程是否活動的。 sql

  # getName(): 返回線程名。 json

  # setName(): 設置線程名。 安全

threading模塊提供的一些方法: 服務器

   # threading.currentThread(): 返回當前的線程變量。

   # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。

   # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。

from threading import Thread import threading from multiprocessing import Process import os def work(): import time time.sleep(3) print(threading.current_thread().getName()) if __name__ == '__main__': #在主進程下開啓線程
    t=Thread(target=work) t.start() print(threading.current_thread().getName()) print(threading.current_thread()) #主線程
    print(threading.enumerate()) #連同主線程在內有兩個運行的線程
    print(threading.active_count()) print('主線程/主進程') ''' 打印結果: MainThread <_MainThread(MainThread, started 140735268892672)> [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>] 主線程/主進程 Thread-1 '''

主線程等待子線程結束

from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.start() t.join() print('主線程') print(t.is_alive()) ''' egon say hello 主線程 False '''

 

5、守護線程

不管是進程仍是線程,都遵循:守護xxx會等待主xxx運行完畢後被銷燬

須要強調的是:運行完畢並不是終止運行

對主進程來講,運行完畢指的是主進程代碼運行完畢

對主線程來講,運行完畢指的是主線程所在的進程內全部非守護線程通通運行完畢,主線程纔算運行完畢

主進程在其代碼結束後就已經算運行完畢了(守護進程在此時就被回收),而後主進程會一直等非守護的子進程都運行完畢後回收子進程的資源(不然會產生殭屍進程),纔會結束,

主線程在其餘非守護線程運行完畢後纔算運行完畢(守護線程在此時就被回收)。由於主線程的結束意味着進程的結束,進程總體的資源都將被回收,而進程必須保證非守護線程都運行完畢後才能結束。

from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('egon',)) t.setDaemon(True) #必須在t.start()以前設置
 t.start() print('主線程') print(t.is_alive()) ''' 主線程 True '''
#誘惑人的栗子

from threading import Thread import time def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") t1=Thread(target=foo) t2=Thread(target=bar) t1.daemon=True t1.start() t2.start() print("main-------")

 

6、Python GIL(Global Interpreter Lock)

  GIL本質就是一把互斥鎖,既然是互斥鎖,全部互斥鎖的本質都同樣,都是將併發運行變成串行,以此來控制同一時間內共享數據只能被一個任務所修改,進而保證數據安全。

有了GIL的存在,同一時刻同一進程中只有一個線程被執行,看到這裏,有的同窗立馬質問:進程能夠利用多核,可是開銷大,而python的多線程開銷小,但卻沒法利用多核優點。

要解決這個問題,咱們須要在幾個點上達成一致:

  1. cpu究竟是用來作計算的,仍是用來作I/O的?

  2. 多cpu,意味着能夠有多個核並行完成計算,因此多核提高的是計算性能

  3. 每一個cpu一旦遇到I/O阻塞,仍然須要等待,因此多核對I/O操做沒什麼用處 

 

  一個工人至關於cpu,此時計算至關於工人在幹活,I/O阻塞至關於爲工人幹活提供所需原材料的過程,工人幹活的過程當中若是沒有原材料了,則工人幹活的過程須要中止,直到等待原材料的到來。若是你的工廠乾的大多數任務都要有準備原材料的過程(I/O密集型),那麼你有再多的工人,意義也不大,還不如一我的,在等材料的過程當中讓工人去幹別的活,反過來說,若是你的工廠原材料都齊全,那固然是工人越多,效率越高

 

結論:

  對計算來講,cpu越多越好,可是對於I/O來講,再多的cpu也沒用

  固然對運行一個程序來講,隨着cpu的增多執行效率確定會有所提升(無論提升幅度多大,總會有所提升),這是由於一個程序基本上不會是純計算或者純I/O,因此咱們只能相對的去看一個程序究竟是計算密集型仍是I/O密集型,從而進一步分析python的多線程到底有無用武之地。

 

分析:

咱們有四個任務須要處理,處理方式確定是要玩出併發的效果,解決方案能夠是:

  方案一:開啓四個進程

  方案二:一個進程下,開啓四個線程

單核狀況下,分析結果: 

  若是四個任務是計算密集型,沒有多核來並行計算,方案一徒增了建立進程的開銷,方案二勝

  若是四個任務是I/O密集型,方案一建立進程的開銷大,且進程的切換速度遠不如線程,方案二勝

多核狀況下,分析結果:

  若是四個任務是計算密集型,多核意味着並行計算,在python中一個進程中同一時刻只有一個線程執行用不上多核,方案一勝

  若是四個任務是I/O密集型,再多的核也解決不了I/O問題,方案二勝

結論:如今的計算機基本上都是多核,python對於計算密集型的任務開多線程的效率並不能帶來多大性能上的提高,甚至不如串行(沒有大量切換),可是,對於IO密集型的任務效率仍是有顯著提高的。

 

多線程性能測試

#計算密集型:多進程效率高

from multiprocessing import Process from threading import Thread import os,time def work(): res=0 for i in range(100000000): res*=i if __name__ == '__main__': l=[] print(os.cpu_count()) #本機爲4核
    start=time.time() for i in range(4): p=Process(target=work) #耗時5s多
        #p=Thread(target=work) #耗時18s多
 l.append(p) p.start() for p in l: p.join() stop=time.time() print('run time is %s' %(stop-start))

 

#I/O密集型:多線程效率高

from multiprocessing import Process from threading import Thread import threading import os,time def work(): time.sleep(2) print('===>') if __name__ == '__main__': l=[] print(os.cpu_count()) #本機爲4核
    start=time.time() for i in range(400): # p=Process(target=work) #耗時12s多,大部分時間耗費在建立進程上
        p=Thread(target=work) #耗時2s多
 l.append(p) p.start() for p in l: p.join() stop=time.time() print('run time is %s' %(stop-start))

應用:

多線程用於IO密集型,如socket,爬蟲,web
多進程用於計算密集型,如金融分析

 

7、同步鎖

三個須要注意的點:

  1.線程搶的是GIL鎖,GIL鎖至關於執行權限,拿到執行權限後才能拿到互斥鎖Lock,其餘線程也能夠搶到GIL,但若是發現Lock仍然沒有被釋放則阻塞,即使是拿到執行權限GIL也要馬上交出來

  2.join是等待全部,即總體串行,而鎖只是鎖住修改共享數據的部分,即部分串行,要想保證數據安全的根本原理在於讓併發變成串行,join與互斥鎖均可以實現,毫無疑問,互斥鎖的部分串行效率要更高

  3. 必定要看本小節最後的GIL與互斥鎖的經典分析

 

GIL VS Lock

  機智的同窗可能會問到這個問題,就是既然你以前說過了,Python已經有一個GIL來保證同一時間只能有一個線程來執行了,爲何這裏還須要lock? 

首先咱們須要達成共識:鎖的目的是爲了保護共享的數據,同一時間只能有一個線程來修改共享的數據

而後,咱們能夠得出結論:保護不一樣的數據就應該加不一樣的鎖。

  最後,問題就很明朗了,GIL 與Lock是兩把鎖,保護的數據不同,前者是解釋器級別的(固然保護的就是解釋器級別的數據,好比垃圾回收的數據),後

者是保護用戶本身開發的應用程序的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock

過程分析:全部線程搶的是GIL鎖,或者說全部線程搶的是執行權限

  線程1搶到GIL鎖,拿到執行權限,開始執行,而後加了一把Lock,尚未執行完畢,即線程1還未釋放Lock,有可能線程2搶到GIL鎖,開始執行,執行過

程中發現Lock尚未被線程1釋放,因而線程2進入阻塞,被奪走執行權限,有可能線程1拿到GIL,而後正常執行到釋放Lock。。。這就致使了串行運行的效果

  既然是串行,那咱們執行

  t1.start()

  t1.join

  t2.start()

  t2.join()

  這也是串行執行啊,爲什麼還要加Lock呢,需知join是等待t1全部的代碼執行完,至關於鎖住了t1的全部代碼,而Lock只是鎖住一部分操做共享數據的代碼。

from threading import Thread import os,time def work(): global n temp=n time.sleep(0.1) n=temp-1
if __name__ == '__main__': n=100 l=[] for i in range(100): p=Thread(target=work) l.append(p) p.start() for p in l: p.join() print(n) #結果可能爲99

  鎖一般被用來實現對共享資源的同步訪問。爲每個共享資源建立一個Lock對象,當你須要訪問該資源時,調用acquire方法來獲取鎖對象(若是其它線程已經得到了該鎖,則當前線程需等待其被釋放),待資源訪問完後,再調用release方法釋放鎖:

#模板

import threading R=threading.Lock() R.acquire() ''' 對公共數據的操做 ''' R.release()

栗子:

from threading import Thread,Lock import os,time def work(): global n lock.acquire() temp=n time.sleep(0.1) n=temp-1 lock.release()
if __name__ == '__main__': lock=Lock() n=100 l=[] for i in range(100): p=Thread(target=work) l.append(p) p.start() for p in l: p.join() print(n) #結果確定爲0,由原來的併發執行變成串行,犧牲了執行效率保證了數據安全

分析: 

  1.100個線程去搶GIL鎖,即搶執行權限

  2. 確定有一個線程先搶到GIL(暫且稱爲線程1),而後開始執行,一旦執行就會拿到lock.acquire()

  3. 極有可能線程1還未運行完畢,就有另一個線程2搶到GIL,而後開始運行,但線程2發現互斥鎖lock還未被線程1釋放,因而阻塞,被迫交出執行權限,即釋放GIL

  4.直到線程1從新搶到GIL,開始從上次暫停的位置繼續執行,直到正常釋放互斥鎖lock,而後其餘的線程再重複2 3 4的過程

#不加鎖:併發執行,速度快,數據不安全
from threading import current_thread,Thread,Lock import os,time def task(): global n print('%s is running' %current_thread().getName()) temp=n time.sleep(0.5) n=temp-1


if __name__ == '__main__': n=100 lock=Lock() threads=[] start_time=time.time() for i in range(100): t=Thread(target=task) threads.append(t) t.start() for t in threads: t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:0.5216062068939209 n:99 '''

#不加鎖:未加鎖部分併發執行,加鎖部分串行執行,速度慢,數據安全
from threading import current_thread,Thread,Lock import os,time def task(): #未加鎖的代碼併發運行
    time.sleep(3) print('%s start to run' %current_thread().getName()) global n #加鎖的代碼串行運行
 lock.acquire() temp=n time.sleep(0.5) n=temp-1 lock.release() if __name__ == '__main__': n=100 lock=Lock() threads=[] start_time=time.time() for i in range(100): t=Thread(target=task) threads.append(t) t.start() for t in threads: t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:53.294203758239746 n:0 '''

#有的同窗可能有疑問:既然加鎖會讓運行變成串行,那麼我在start以後當即使用join,就不用加鎖了啊,也是串行的效果啊 #沒錯:在start以後馬上使用jion,確定會將100個任務的執行變成串行,毫無疑問,最終n的結果也確定是0,是安全的,但問題是 #start後當即join:任務內的全部代碼都是串行執行的,而加鎖,只是加鎖的部分即修改共享數據的部分是串行的 #單從保證數據安全方面,兩者均可以實現,但很明顯是加鎖的效率更高.
from threading import current_thread,Thread,Lock import os,time def task(): time.sleep(3) print('%s start to run' %current_thread().getName()) global n temp=n time.sleep(0.5) n=temp-1

if __name__ == '__main__': n=100 lock=Lock() start_time=time.time() for i in range(100): t=Thread(target=task) t.start() t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 start to run Thread-2 start to run ...... Thread-100 start to run 主:350.6937336921692 n:0 #耗時是多麼的恐怖 '''

 

8、死鎖現象與遞歸鎖

進程也有死鎖與遞歸鎖,在進程那裏忘記說了,放到這裏一塊兒說。

所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程,以下就是死鎖。

from threading import Thread,Lock import time mutexA=Lock() mutexB=Lock() class MyThread(Thread): def run(self): self.func1() self.func2() def func1(self): mutexA.acquire() print('\033[41m%s 拿到A鎖\033[0m' %self.name) mutexB.acquire() print('\033[42m%s 拿到B鎖\033[0m' %self.name) mutexB.release() mutexA.release() def func2(self): mutexB.acquire() print('\033[43m%s 拿到B鎖\033[0m' %self.name) time.sleep(2) mutexA.acquire() print('\033[44m%s 拿到A鎖\033[0m' %self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(10): t=MyThread() t.start() ''' Thread-1 拿到A鎖 Thread-1 拿到B鎖 Thread-1 拿到B鎖 Thread-2 拿到A鎖 而後就卡住,死鎖了 '''

解決方法,遞歸鎖,在Python中爲了支持在同一線程中屢次請求同一資源,python提供了可重入鎖RLock。

這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。上面的例子若是使

用RLock代替Lock,則不會發生死鎖:

  mutexA=mutexB=threading.RLock()

  #一個線程拿到鎖,counter加1,該線程內又碰到加鎖的狀況,則counter繼續加1,這期間全部其餘線程都只能等待,等待該線程釋放全部鎖,即counter遞減到0爲止。

 

9、信號量Semaphore

  同進程的同樣Semaphore管理一個內置的計數器,每當調用acquire()時內置計數器-1;調用release() 時內置計數器+1;計數器不能小於0;當計數器爲0時,

acquire()將阻塞線程直到其餘線程調用release()。

實例:(同時只有5個線程能夠得到semaphore,便可以限制最大鏈接數爲5):

from threading import Thread,Semaphore import threading import time # def func(): # if sm.acquire(): # print (threading.currentThread().getName() + ' get semaphore') # time.sleep(2) # sm.release()
def func(): sm.acquire() print('%s get sm' %threading.current_thread().getName()) time.sleep(3) sm.release()
if __name__ == '__main__': sm=Semaphore(5) for i in range(23): t=Thread(target=func) t.start()

 

與進程池是徹底不一樣的概念,進程池Pool(4),最大隻能產生4個進程,並且從頭至尾都只是這四個進程,不會產生新的,而信號量是產生一堆線程/進程

 

10、Event

  同進程的同樣,線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。若是程序中的其餘線程須要經過判斷某個線程的狀態來肯定本身下一步的操

做,這時線程同步問題就會變得很是棘手。爲了解決這些問題,咱們須要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它容許線程等待

某些事件的發生。在初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻

塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的

Event對象,那麼它將忽略這個事件, 繼續執行。

 

方法介紹:

  event.isSet():返回event的狀態值;

  event.wait():若是 event.isSet()==False將阻塞線程;

  event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度;

  event.clear():恢復event的狀態值爲False。

  例如,有多個工做線程嘗試連接MySQL,咱們想要在連接前確保MySQL服務正常才讓那些工做線程去鏈接MySQL服務器,若是鏈接不成功,都會去嘗試從新鏈接。那麼咱們就能夠採用threading.Event機制來協調各個工做線程的鏈接操做

from threading import Thread,Event import threading import time,random def conn_mysql(): count=1
    while not event.is_set(): if count > 3: raise TimeoutError('連接超時') print('<%s>第%s次嘗試連接' % (threading.current_thread().getName(), count)) event.wait(0.5) count+=1
    print('<%s>連接成功' %threading.current_thread().getName()) def check_mysql(): print('\033[45m[%s]正在檢查mysql\033[0m' % threading.current_thread().getName()) time.sleep(random.randint(2,4)) event.set()
if __name__ == '__main__': event=Event() conn1=Thread(target=conn_mysql) conn2=Thread(target=conn_mysql) check=Thread(target=check_mysql) conn1.start() conn2.start() check.start()

 

 

11、條件Condition

使得線程等待,只有知足某條件時,才釋放n個線程

import threading def run(n): con.acquire() con.wait() print("run the thread: %s" %n) con.release() if __name__ == '__main__': con = threading.Condition() for i in range(10): t = threading.Thread(target=run, args=(i,)) t.start() while True: inp = input('>>>') if inp == 'q': break con.acquire() con.notify(int(inp)) con.release()
import threading

def
condition_func(): ret = False inp = input('>>>') if inp == '1': ret = True return ret def run(n): con.acquire() con.wait_for(condition_func) print("run the thread: %s" %n) con.release() if __name__ == '__main__': con = threading.Condition() for i in range(10): t = threading.Thread(target=run, args=(i,)) t.start()

 

 

12、定時器

定時器,指定n秒後執行某操做

from threading import Timer def hello(): print("hello, world") t = Timer(1, hello) t.start() # after 1 seconds, "hello, world" will be printed

 

 

#驗證碼定時器

from threading import Timer import random,time class Code: def __init__(self): self.make_cache() def make_cache(self,interval=5): self.cache=self.make_code() print(self.cache) self.t=Timer(interval,self.make_cache) self.t.start() def make_code(self,n=4): res=''
        for i in range(n): s1=str(random.randint(0,9)) s2=chr(random.randint(65,90)) res+=random.choice([s1,s2]) return res def check(self): while True: inp=input('>>: ').strip() if inp.upper() == self.cache: print('驗證成功',end='\n') self.t.cancel() break


if __name__ == '__main__': obj=Code() obj.check()

 

 

 

十3、線程queue

queue隊列 :使用import queue,用法與進程Queue同樣。

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

import queue q=queue.Queue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 結果(先進先出): first second third '''

 

 

import queue q=queue.LifoQueue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 結果(後進先出): third second first '''

 

 

 

import queue q=queue.PriorityQueue() #put進入一個元組,元組的第一個元素是優先級(一般是數字,也能夠是非數字之間的比較),數字越小優先級越高
q.put((20,'a')) q.put((10,'b')) q.put((30,'c')) print(q.get()) print(q.get()) print(q.get()) ''' 結果(數字越小優先級越高,優先級高的優先出隊): (10, 'b') (20, 'a') (30, 'c') '''

 

 

 

十4、Python標準模塊--concurrent.futures

https://docs.python.org/dev/library/concurrent.futures.html

#1 介紹
  concurrent.futures模塊提供了高度封裝的異步調用接口
  ThreadPoolExecutor:線程池,提供異步調用
  ProcessPoolExecutor: 進程池,提供異步調用
  Both implement the same interface, which is defined by the abstract Executor class.

#2 基本方法
  #submit(fn, *args, **kwargs) 異步提交任務  

  #map(func, *iterables, timeout=None, chunksize=1) 取代for循環submit的操做

  #shutdown(wait=True) 至關於進程池的pool.close()+pool.join()操做wait=True,等待池內全部任務執行完畢回收完資源後才繼續wait=False,當即返回,並不

會等待池內的任務執行完畢但無論wait參數爲什麼值,整個程序都會等到全部任務執行完畢submit和map必須在shutdown以前

  #result(timeout=None) 取得結果

  #add_done_callback(fn) 回調函數

栗子:

#ProcessPoolExecutor

#介紹
The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None)
An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised.


#用法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ProcessPoolExecutor(max_workers=3)

    futures=[]
    for i in range(11):
        future=executor.submit(task,i)
        futures.append(future)
    executor.shutdown(True)
    print('+++>')
    for future in futures:
        print(future.result())

 

#ThreadPoolExecutor

#介紹
ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously.
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.

Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor.

New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging.

#用法
與ProcessPoolExecutor相同
#map的用法

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ThreadPoolExecutor(max_workers=3)

    # for i in range(11):
    #     future=executor.submit(task,i)

    executor.map(task,range(1,12)) #map取代了for+submit
#回調函數

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<進程%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def parse_page(res):
    res=res.result()
    print('<進程%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    # p=Pool(3)
    # for url in urls:
    #     p.apply_async(get_page,args=(url,),callback=pasrse_page)
    # p.close()
    # p.join()

    p=ProcessPoolExecutor(3)
    for url in urls:
        p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個future對象obj,須要用obj.result()拿到結果
相關文章
相關標籤/搜索