線程php
本節目錄html
以前咱們已經瞭解了操做系統中進程的概念,程序並不能單獨運行,只有將程序裝載到內存中,系統爲它分配資源才能運行,而這種執行的程序就稱之爲進程。程序和進程的區別就在於:程序是指令的集合,它是進程運行的靜態描述文本;進程是程序的一次執行活動,屬於動態概念。在多道編程中,咱們容許多個程序同時加載到內存中,在操做系統的調度下,能夠實現併發地執行。這是這樣的設計,大大提升了CPU的利用率。進程的出現讓每一個用戶感受到本身獨享CPU,所以,進程就是爲了在CPU上實現多道編程而提出的。java
2.有了進程爲何還要線程python
#什麼是線程: #指的是一條流水線的工做過程,關鍵的一句話:一個進程內最少自帶一個線程,其實進程根本不能執行,進程不是執行單位,是資源的單位,分配資源的單位 #線程纔是執行單位 #進程:作手機屏幕的工做過程,剛纔講的 #咱們的py文件在執行的時候,若是你站在資源單位的角度來看,咱們稱爲一個主進程,若是站在代碼執行的角度來看,它叫作主線程,只是一種形象的說法,其實整個代碼的執行過程成爲線程,也就是幹這個活兒的自己稱爲線程,可是咱們後面學習的時候,咱們就稱爲線程去執行某個任務,其實那某個任務的執行過程稱爲一個線程,一條流水線的執行過程爲線程 #進程vs線程 #1 同一個進程內的多個線程是共享該進程的資源的,不一樣進程內的線程資源確定是隔離的 #2 建立線程的開銷比建立進程的開銷要小的多 #併發三個任務:1啓動三個進程:由於每一個進程中有一個線程,可是我一個進程中開啓三個線程就夠了 #同一個程序中的三個任務須要執行,你是用三個進程好 ,仍是三個線程好? #例子: # pycharm 三個任務:鍵盤輸入 屏幕輸出 自動保存到硬盤 #若是三個任務是同步的話,你鍵盤輸入的時候,屏幕看不到 #我們的pycharm是否是一邊輸入你邊看啊,就是將串行變爲了三個併發的任務 #解決方案:三個進程或者三個線程,哪一個方案可行。若是是三個進程,進程的資源是否是隔離的而且開銷大,最致命的就是資源隔離,可是用戶輸入的數據還要給另一個進程發送過去,進程之間能直接給數據嗎?你是否是copy一份給他或者通訊啊,可是數據是同一份,咱們有必要搞多個進程嗎,線程是否是共享資源的,咱們是否是可使用多線程來搞,你線程1輸入的數據,線程2能不能看到,你之後的場景仍是應用多線程多,並且起線程咱們說是否是很快啊,佔用資源也小,還能共享同一個進程的資源,不須要將數據來回的copy!
進程有不少優勢,它提供了多道編程,讓咱們感受咱們每一個人都擁有本身的CPU和其餘資源,能夠提升計算機的利用率。不少人就不理解了,既然進程這麼優秀,爲何還要線程呢?其實,仔細觀察就會發現進程仍是有不少缺陷的,主要體如今兩點上:mysql
-
-
進程只能在一個時間幹一件事,若是想同時幹兩件事或多件事,進程就無能爲力了。linux
-
進程在執行的過程當中若是阻塞,例如等待輸入,整個進程就會掛起,即便進程中有些工做不依賴於輸入的數據,也將沒法執行。c++
-
若是這兩個缺點理解比較困難的話,舉個現實的例子也許你就清楚了:若是把咱們上課的過程當作一個進程的話,那麼咱們要作的是耳朵聽老師講課,手上還要記筆記,腦子還要思考問題,這樣才能高效的完成聽課的任務。而若是隻提供進程這個機制的話,上面這三件事將不能同時執行,同一時間只能作一件事,聽的時候就不能記筆記,也不能用腦子思考,這是其一;若是老師在黑板上寫演算過程,咱們開始記筆記,而老師忽然有一步推不下去了,阻塞住了,他在那邊思考着,而咱們呢,也不能幹其餘事,即便你想趁此時思考一下剛纔沒聽懂的一個問題都不行,這是其二。git
如今你應該明白了進程的缺陷了,而解決的辦法很簡單,咱們徹底可讓聽、寫、思三個獨立的過程,並行起來,這樣很明顯能夠提升聽課的效率。而實際的操做系統中,也一樣引入了這種相似的機制——線程。程序員
3.線程的出現github
在傳統操做系統中,每一個進程有一個地址空間,並且默認就有一個控制線程
線程顧名思義,就是一條流水線工做的過程,一條流水線必須屬於一個車間,一個車間的工做過程是一個進程
車間負責把資源整合到一塊兒,是一個資源單位,而一個車間內至少有一個流水線
流水線的工做須要電源,電源就至關於cpu
因此,進程只是用來把資源集中到一塊兒(進程只是一個資源單位,或者說資源集合),而線程纔是cpu上的執行單位。
多線程(即多個控制線程)的概念是,在一個進程中存在多個控制線程,多個控制線程共享該進程的地址空間,至關於一個車間內有多條流水線,都共用一個車間的資源。
例如,北京地鐵與上海地鐵是不一樣的進程,而北京地鐵裏的13號線是一個線程,北京地鐵全部的線路共享北京地鐵全部的資源,好比全部的乘客能夠被全部線路拉。
TCB包括如下信息: (1)線程狀態。 (2)當線程不運行時,被保存的現場資源。 (3)一組執行堆棧。 (4)存放每一個線程的局部變量主存區。 (5)訪問同一個進程中的主存和其它資源。 用於指示被執行指令序列的程序計數器、保留局部變量、少數狀態參數和返回地址等的一組寄存器和堆棧。
開啓一個字處理軟件進程,該進程確定須要辦不止一件事情,好比監聽鍵盤輸入,處理文字,定時自動將文字保存到硬盤,這三個任務操做的都是同一塊數據,於是不能用多進程。只能在一個進程裏併發地開啓三個線程,若是是單線程,那就只能是,鍵盤輸入時,不能處理文字和自動保存,自動保存時又不能輸入和處理文字。
以前咱們將的socket是否是經過多進程去實現過呀,若是有500我的同時和我聊天,那我是否是要起500進程啊,能行嗎?很差,對不對,那麼怎麼辦,我就能夠開幾個進程,而後每一個進程裏面開多個線程來處理多個請求和通訊。再舉例:我用qq是一個進程,而後我和一我的聊天的時候,是否是還能夠去接收別人給我發的消息啊,這個是否是並行的啊,就相似我一個進程開了多個線程來幫我併發接收消息。
多個線程共享同一個進程的地址空間中的資源,是對一臺計算機上多個進程的模擬,有時也稱線程爲輕量級的進程。
而對一臺計算機上多個進程,則共享物理內存、磁盤、打印機等其餘物理資源。多線程的運行也多進程的運行相似,是cpu在多個線程之間的快速切換。
不一樣的進程之間是充滿敵意的,彼此是搶佔、競爭cpu的關係,若是迅雷會和QQ搶資源。而同一個進程是由一個程序員的程序建立,因此同一進程內的線程是合做關係,一個線程能夠訪問另一個線程的內存地址,你們都是共享的,一個線程乾死了另一個線程的內存,那純屬程序員腦子有問題。
相似於進程,每一個線程也有本身的堆棧,不一樣於進程,線程庫沒法利用時鐘中斷強制線程讓出CPU,能夠調用thread_yield運行線程自動放棄cpu,讓另一個線程運行。
線程一般是有益的,可是帶來了不小程序設計難度,線程的問題是:
1. 父進程有多個線程,那麼開啓的子線程是否須要一樣多的線程
2. 在同一個進程中,若是一個線程關閉了文件,而另一個線程正準備往該文件內寫內容呢?
所以,在多線程的代碼中,須要更多的心思來設計程序的邏輯、保護程序的數據。
線程的實現能夠分爲兩類:用戶級線程(User-Level Thread)和內核線線程(Kernel-Level Thread),後者又稱爲內核支持的線程或輕量級進程。在多線程操做系統中,各個系統的實現方式並不相同,在有的系統中實現了用戶級線程,有的系統中實現了內核級線程。
1.用戶級線程
內核的切換由用戶態程序本身控制內核切換,不須要內核干涉,少了進出內核態的消耗,但不能很好的利用多核Cpu。
在用戶空間模擬操做系統對進程的調度,來調用一個進程中的線程,每一個進程中都會有一個運行時系統,用來調度線程。此時當該進程獲取cpu時,進程內再調度出一個線程去執行,同一時刻只有一個線程執行。
2.內核級線程
內核級線程:切換由內核控制,當線程進行切換的時候,由用戶態轉化爲內核態。切換完畢要從內核態返回用戶態;能夠很好的利用smp,即利用多核cpu。windows線程就是這樣的。
3.用戶級和內核級線程的對比
1 內核支持線程是OS內核可感知的,而用戶級線程是OS內核不可感知的。 2 用戶級線程的建立、撤消和調度不須要OS內核的支持,是在語言(如Java)這一級處理的;而內核支持線程的建立、撤消和調度都需OS內核提供支持,並且與進程的建立、撤消和調度大致是相同的。 3 用戶級線程執行系統調用指令時將致使其所屬進程被中斷,而內核支持線程執行系統調用指令時,只致使該線程被中斷。 4 在只有用戶級線程的系統內,CPU調度仍是以進程爲單位,處於運行狀態的進程中的多個線程,由用戶程序控制線程的輪換運行;在有內核支持線程的系統內,CPU調度則以線程爲單位,由OS的線程調度程序負責線程的調度。 5 用戶級線程的程序實體是運行在用戶態下的程序,而內核支持線程的程序實體則是能夠運行在任何狀態下的程序
內核級線程的優缺點:
優勢:當有多個處理機時,一個進程的多個線程能夠同時執行。
缺點:由內核進行調度。
用戶級線程的優缺點:
優勢:
線程的調度不須要內核直接參與,控制簡單。
能夠在不支持線程的操做系統中實現。
建立和銷燬線程、線程切換代價等線程管理的代價比內核線程少得多。
容許每一個進程定製本身的調度算法,線程管理比較靈活。
線程可以利用的表空間和堆棧空間比內核級線程多。
同一進程中只能同時有一個線程在運行,若是有一個線程使用了系統調用而阻塞,那麼整個進程都會被掛起。另外,頁面失效也會產生一樣的問題。
缺點:
資源調度按照進程進行,多個處理機下,同一個進程中的線程只能在同一個處理機下分時複用
3.混合實現
用戶級與內核級的多路複用,內核同一調度內核線程,每一個內核線程對應n個用戶線程,用戶和內核都能感知到的線程,用戶建立一個線程,那麼操做系統內核也跟着建立一個線程來專門執行你用戶的這個線程。
在linux操做系統上也實現了這種混合的方式NPTL,看下面的介紹。
4.linux操做系統的NPTL
歷史 在內核2.6之前的調度實體都是進程,內核並無真正支持線程。它是能過一個系統調用clone()來實現的,這個調用建立了一份調用進程的拷貝,跟fork()不一樣的是,這份進程拷貝徹底共享了調用進程的地址空間。LinuxThread就是經過這個系統調用來提供線程在內核級的支持的(許多之前的線程實現都徹底是在用戶態,內核根本不知道線程的存在)。很是不幸的是,這種方法有至關多的地方沒有遵循POSIX標準,特別是在信號處理,調度,進程間通訊原語等方面。 很顯然,爲了改進LinuxThread必須獲得內核的支持,而且須要重寫線程庫。爲了實現這個需求,開始有兩個相互競爭的項目:IBM啓動的NGTP(Next Generation POSIX Threads)項目,以及Redhat公司的NPTL。在2003年的年中,IBM放棄了NGTP,也就是大約那時,Redhat發佈了最初的NPTL。 NPTL最開始在redhat linux 9裏發佈,如今從RHEL3起內核2.6起都支持NPTL,而且徹底成了GNU C庫的一部分。 設計 NPTL使用了跟LinuxThread相同的辦法,在內核裏面線程仍然被看成是一個進程,而且仍然使用了clone()系統調用(在NPTL庫裏調用)。可是,NPTL須要內核級的特殊支持來實現,好比須要掛起而後再喚醒線程的線程同步原語futex. NPTL也是一個1*1的線程庫,就是說,當你使用pthread_create()調用建立一個線程後,在內核裏就相應建立了一個調度實體,在linux裏就是一個新進程,這個方法最大可能的簡化了線程的實現。 除NPTL的1*1模型外還有一個m*n模型,一般這種模型的用戶線程數會比內核的調度實體多。在這種實現裏,線程庫自己必須去處理可能存在的調度,這樣在線程庫內部的上下文切換一般都會至關的快,由於它避免了系統調用轉到內核態。然而這種模型增長了線程實現的複雜性,並可能出現諸如優先級反轉的問題,此外,用戶態的調度如何跟內核態的調度進行協調也是很難讓人滿意。
1.全局解釋器鎖GIL(用一下threading模塊以後再來看~~)
Python代碼的執行由Python虛擬機(也叫解釋器主循環)來控制。Python在設計之初就考慮到要在主循環中,同時只有一個線程在執行。雖然 Python 解釋器中能夠「運行」多個線程,但在任意時刻只有一個線程在解釋器中運行。
對Python虛擬機的訪問由全局解釋器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個線程在運行。
在多線程環境中,Python 虛擬機按如下方式執行:
a、設置 GIL;
b、切換到一個線程去運行;
c、運行指定數量的字節碼指令或者線程主動讓出控制(能夠調用 time.sleep(0));
d、把線程設置爲睡眠狀態;
e、解鎖 GIL;
d、再次重複以上全部步驟。
在調用外部代碼(如 C/C++擴展函數)的時候,GIL將會被鎖定,直到這個函數結束爲止(因爲在這期間沒有Python的字節碼被運行,因此不會作線程切換)編寫擴展的程序員能夠主動解鎖GIL。
2.python線程模塊的選擇
Python提供了幾個用於多線程編程的模塊,包括thread、threading和Queue等。thread和threading模塊容許程序員建立和管理線程。thread模塊提供了基本的線程和鎖的支持,threading提供了更高級別、功能更強的線程管理的功能。Queue模塊容許用戶建立一個能夠用於多個線程之間共享數據的隊列數據結構。
避免使用thread模塊,由於更高級別的threading模塊更爲先進,對線程的支持更爲完善,並且使用thread模塊裏的屬性有可能會與threading出現衝突;其次低級別的thread模塊的同步原語不多(實際上只有一個),而threading模塊則有不少;再者,thread模塊中當主線程結束時,全部的線程都會被強制結束掉,沒有警告也不會有正常的清除工做,至少threading模塊能確保重要的子線程退出後進程才退出。
就像咱們熟悉的time模塊,它比其餘模塊更加接近底層,越是接近底層,用起來越麻煩,就像時間日期轉換之類的就比較麻煩,可是後面咱們會學到一個datetime模塊,提供了更爲簡便的時間日期處理方法,它是創建在time模塊的基礎上來的。又如socket和socketserver(底層仍是用的socket)等等,這裏的threading就是thread的高級模塊。
thread模塊不支持守護線程,當主線程退出時,全部的子線程不論它們是否還在工做,都會被強行退出。而threading模塊支持守護線程,守護線程通常是一個等待客戶請求的服務器,若是沒有客戶提出請求它就在那等着,若是設定一個線程爲守護線程,就表示這個線程是不重要的,在進程退出的時候,不用等待這個線程退出。
multiprocess模塊的徹底模仿了threading模塊的接口,兩者在使用層面,有很大的類似性,於是再也不詳細介紹(官方連接)
咱們先簡單應用一下threading模塊來看看併發效果:
import time from threading import Thread #多線程併發,是否是看着和多進程很相似 def func(n): time.sleep(1) print(n) #併發效果,1秒打印出了全部的數字 for i in range(10): t = Thread(target=func,args=(i,)) t.start()
1.線程建立
from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('太白',)) t.start() print('主線程')
import time from threading import Thread class Sayhi(Thread): def __init__(self,name): super().__init__() self.name=name def run(self): time.sleep(2) print('%s say hello' % self.name) if __name__ == '__main__': t = Sayhi('太白') t.start() print('主線程')
2.多線程與多進程
from threading import Thread from multiprocessing import Process import os def work(): print('hello',os.getpid()) if __name__ == '__main__': #part1:在主進程下開啓多個線程,每一個線程都跟主進程的pid同樣 t1=Thread(target=work) t2=Thread(target=work) t1.start() t2.start() print('主線程/主進程pid',os.getpid()) #part2:開多個進程,每一個進程都有不一樣的pid p1=Process(target=work) p2=Process(target=work) p1.start() p2.start() print('主線程/主進程pid',os.getpid())
那麼哪些東西存在進程裏,那些東西存在線程裏呢?
進程:導入的模塊、執行的python文件的文件所在位置、內置的函數、文件裏面的這些代碼、全局變量等等,而後線程裏面有本身的堆棧(相似於一個列表,後進先出)和寄存器,裏面存着本身線程的變量,操做(add)等等,佔用的空間很小。
from threading import Thread from multiprocessing import Process import os import time def work(): print('hello') if __name__ == '__main__': s1 = time.time() #在主進程下開啓線程 t=Thread(target=work) t.start() t.join() t1 = time.time() - s1 print('進程的執行時間:',t1) print('主線程/主進程') ''' 打印結果: hello 進程的執行時間: 0.0 主線程/主進程 ''' s2 = time.time() #在主進程下開啓子進程 t=Process(target=work) t.start() t.join() t2 = time.time() - s2 print('線程的執行時間:', t2) print('主線程/主進程') ''' 打印結果: hello 線程的執行時間: 0.5216977596282959 主線程/主進程 '''
from threading import Thread from multiprocessing import Process import os def work(): global n #修改全局變量的值 n=0 if __name__ == '__main__': # n=100 # p=Process(target=work) # p.start() # p.join() # print('主',n) #毫無疑問子進程p已經將本身的全局的n改爲了0,但改的僅僅是它本身的,查看父進程的n仍然爲100 n=1 t=Thread(target=work) t.start() t.join() #必須加join,由於主線程和子線程不必定誰快,通常都是主線程快一些,全部咱們要等子線程執行完畢才能看出效果 print('主',n) #查看結果爲0,由於同一進程內的線程之間共享進程內的數據 # 經過一個global就實現了全局變量的使用,不須要進程的IPC通訊方法
在這裏咱們簡單總結一下:
進程是最小的內存分配單位
線程是操做系統調度的最小黨委
線程被CPU執行了
進程內至少含有一個線程
進程中能夠開啓多個線程
開啓一個線程所須要的時間要遠小於開啓一個進程
多個線程內部有本身的數據棧,數據不共享
全局變量在多個線程之間是共享的
3.多線程實現socket(練習)
import multiprocessing import threading import socket s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.bind(('127.0.0.1',8080)) s.listen(5) def action(conn): while True: data=conn.recv(1024) print(data) msg = input('服務端輸入:') #在多線程裏面可使用input輸入內容,那麼就能夠實現客戶端和服務端的聊天了,多進程不能輸入 conn.send(bytes(msg,encoding='utf-8')) if __name__ == '__main__': while True: conn,addr=s.accept() p=threading.Thread(target=action,args=(conn,)) p.start()
講一講代碼
mport socket s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue s.send(msg.encode('utf-8')) data=s.recv(1024) print(data)
在socket通訊裏面是否是有大量的I/O啊,recv、accept等等,咱們使用多線程效率更高,由於開銷小。
4.Thread類的其餘方法
Thread實例對象的方法 # isAlive(): 返回線程是否活動的。 # getName(): 返回線程名。 # setName(): 設置線程名。 threading模塊提供的一些方法: # threading.currentThread(): 返回當前的線程變量。 # threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。 # threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果
from threading import Thread import threading from multiprocessing import Process import os def work(): import time time.sleep(3) print(threading.current_thread().getName()) if __name__ == '__main__': #在主進程下開啓線程 t=Thread(target=work) t.start() print(threading.current_thread())#主線程對象 print(threading.current_thread().getName()) #主線程名稱 print(threading.current_thread().ident) #主線程ID print(threading.get_ident()) #主線程ID print(threading.enumerate()) #連同主線程在內有兩個運行的線程 print(threading.active_count()) print('主線程/主進程') ''' 打印結果: <_MainThread(MainThread, started 14104)> MainThread 14104 14104 [<_MainThread(MainThread, started 14104)>, <Thread(Thread-1, started 17976)>] 2 主線程/主進程 Thread-1 '''
from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('太白',)) t2=Thread(target=sayhi,args=('alex',)) t.start() t2.start() t.join() #由於這個線程用了join方法,主線程等待子線程的運行結束 print('主線程') print(t.is_alive()) #因此t這個線程確定是執行結束了,結果爲False print(t2.is_alive()) #有多是True,有多是False,看子線程和主線程誰執行的快 ''' egon say hello 主線程 False '''
5.守護線程
不管是進程仍是線程,都遵循:守護xx會等待主xx運行完畢後被銷燬。須要強調的是:運行完畢並不是終止運行
#1.對主進程來講,運行完畢指的是主進程代碼運行完畢 #2.對主線程來講,運行完畢指的是主線程所在的進程內全部非守護線程通通運行完畢,主線程纔算運行完畢
詳細解釋
#1 主進程在其代碼結束後就已經算運行完畢了(守護進程在此時就被回收),而後主進程會一直等非守護的子進程都運行完畢後回收子進程的資源(不然會產生殭屍進程),纔會結束, #2 主線程在其餘非守護線程運行完畢後纔算運行完畢(守護線程在此時就被回收)。由於主線程的結束意味着進程的結束,進程總體的資源都將被回收,而進程必須保證非守護線程都運行完畢後才能結束,由於進程執行結束是要回收資源的,全部必須確保你裏面的非守護子線程所有執行完畢。
from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('taibai',)) t.setDaemon(True) #必須在t.start()以前設置 t.start() print('主線程') print(t.is_alive()) ''' 主線程 True '''
from threading import Thread from multiprocessing import Process import time def func1(): while True: print(666) time.sleep(0.5) def func2(): print('hello') time.sleep(3) if __name__ == '__main__': # t = Thread(target=func1,) # t.daemon = True #主線程結束,守護線程隨之結束 # # t.setDaemon(True) #兩種方式,和上面設置守護線程是同樣的 # t.start() # t2 = Thread(target=func2,) #這個子線程要執行3秒,主線程的代碼雖然執行完了,可是一直等着子線程的任務執行完畢,主線程纔算完畢,由於經過結果你會發現我主線程雖然代碼執行完畢了,\ # 可是主線程的的守護線程t1還在執行,說明什麼,說明個人主線程尚未完畢,只不過是代碼執行完了,一直等着子線程t2執行完畢,我主線程的守護線程才中止,說明子線程執行完畢以後,個人主線程才執行完畢 # t2.start() # print('主線程代碼執行完啦!') p = Process(target=func1,) p.daemon = True p.start() p2 = Process(target=func2,) p2.start() time.sleep(1) #讓主進程等1秒,爲了能看到func1的打印效果 print('主進程代碼執行完啦!') #經過結果你會發現,若是主進程的代碼運行完畢了,那麼主進程就結束了,由於主進程的守護進程p隨着主進程的代碼結束而結束了,守護進程被回收了,這和線程是不同的,主線程的代碼完了並不表明主線程運行完畢了,須要等着全部其餘的非守護的子線程執行完畢纔算完畢
今天的內容就到這裏啦,同窗們整理整理前面的內容吧~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1.GIL鎖(Global Interpreter Lock)
首先,一些語言(java、c++、c)是支持同一個進程中的多個線程是能夠應用多核CPU的,也就是咱們會聽到的如今4核8核這種多核CPU技術的牛逼之處。那麼咱們以前說過應用多進程的時候若是有共享數據是否是會出現數據不安全的問題啊,就是多個進程同時一個文件中去搶這個數據,你們都把這個數據改了,可是還沒來得及去更新到原來的文件中,就被其餘進程也計算了,致使數據不安全的問題啊,因此咱們是否是經過加鎖能夠解決啊,多線程你們想一下是否是同樣的,併發執行就是有這個問題。可是python最先期的時候對於多線程也加鎖,可是python比較極端的(在當時電腦cpu確實只有1核)加了一個GIL全局解釋鎖,是解釋器級別的,鎖的是整個線程,而不是線程裏面的某些數據操做,每次只能有一個線程使用cpu,也就說多線程用不了多核,可是他不是python語言的問題,是CPython解釋器的特性,若是用Jpython解釋器是沒有這個問題的,Cpython是默認的,由於速度快,Jpython是java開發的,在Cpython裏面就是沒辦法用多核,這是python的弊病,歷史問題,雖然衆多python團隊的大神在致力於改變這個狀況,可是暫沒有解決。(這和解釋型語言(python,php)和編譯型語言有關係嗎???待定!,編譯型語言通常在編譯的過程當中就幫你分配好了,解釋型要邊解釋邊執行,因此爲了防止出現數據不安全的狀況加上了這個鎖,這是全部解釋型語言的弊端??)
可是有了這個鎖咱們就不能併發了嗎?當咱們的程序是偏計算的,也就是cpu佔用率很高的程序(cpu一直在計算),就不行了,可是若是你的程序是I/O型的(通常你的程序都是這個)(input、訪問網址網絡延遲、打開/關閉文件讀寫),在什麼狀況下用的到高併發呢(金融計算會用到,人工智能(阿爾法狗),可是通常的業務場景用不到,爬網頁,多用戶網站、聊天軟件、處理文件),I/O型的操做不多佔用CPU,那麼多線程仍是能夠併發的,由於cpu只是快速的調度線程,而線程裏面並無什麼計算,就像一堆的網絡請求,我cpu很是快速的一個一個的將你的多線程調度出去,你的線程就去執行I/O操做了,
詳細的GIL鎖介紹:連接:https://www.cnblogs.com/clschao/articles/9705317.html
2.同步鎖
三個須要注意的點: #1.線程搶的是GIL鎖,GIL鎖至關於執行權限,拿到執行權限後才能拿到互斥鎖Lock,其餘線程也能夠搶到GIL,但若是發現Lock仍然沒有被釋放則阻塞,即使是拿到執行權限GIL也要馬上交出來 #2.join是等待全部,即總體串行,而鎖只是鎖住修改共享數據的部分,即部分串行,要想保證數據安全的根本原理在於讓併發變成串行,join與互斥鎖均可以實現,毫無疑問,互斥鎖的部分串行效率要更高 #3. 必定要看本小節最後的GIL與互斥鎖的經典分析
GIL VS Lock
機智的同窗可能會問到這個問題,就是既然你以前說過了,Python已經有一個GIL來保證同一時間只能有一個線程來執行了,爲何這裏還須要lock?
首先咱們須要達成共識:鎖的目的是爲了保護共享的數據,同一時間只能有一個線程來修改共享的數據
而後,咱們能夠得出結論:保護不一樣的數據就應該加不一樣的鎖。
最後,問題就很明朗了,GIL 與Lock是兩把鎖,保護的數據不同,前者是解釋器級別的(固然保護的就是解釋器級別的數據,好比垃圾回收的數據),後者是保護用戶本身開發的應用程序的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock
過程分析:全部線程搶的是GIL鎖,或者說全部線程搶的是執行權限
線程1搶到GIL鎖,拿到執行權限,開始執行,而後加了一把Lock,尚未執行完畢,即線程1還未釋放Lock,有可能線程2搶到GIL鎖,開始執行,執行過程當中發現Lock尚未被線程1釋放,因而線程2進入阻塞,被奪走執行權限,有可能線程1拿到GIL,而後正常執行到釋放Lock。。。這就致使了串行運行的效果
既然是串行,那咱們執行
t1.start()
t1.join
t2.start()
t2.join()
這也是串行執行啊,爲什麼還要加Lock呢,需知join是等待t1全部的代碼執行完,至關於鎖住了t1的全部代碼,而Lock只是鎖住一部分操做共享數據的代碼。
詳解:
由於Python解釋器幫你自動按期進行內存回收,你能夠理解爲python解釋器裏有一個獨立的線程,每過一段時間它起wake up作一次全局輪詢看看哪些內存數據是能夠被清空的,此時你本身的程序 裏的線程和 py解釋器本身的線程是併發運行的,假設你的線程刪除了一個變量,py解釋器的垃圾回收線程在清空這個變量的過程當中的clearing時刻,可能一個其它線程正好又從新給這個還沒來及得清空的內存空間賦值了,結果就有可能新賦值的數據被刪除了,爲了解決相似的問題,python解釋器簡單粗暴的加了鎖,即當一個線程運行時,其它人都不能動,這樣就解決了上述的問題, 這能夠說是Python早期版本的遺留問題。
看一段代碼:解釋爲何要加鎖,若是下面代碼中work函數裏面的那個time.sleep(0.005),個人電腦用的這個時間片斷,每次運行都呈現不一樣的結果,咱們能夠改改時間試一下。
from threading import Thread,Lock import os,time def work(): global n # lock.acquire() #加鎖 temp=n time.sleep(0.1) #一會將下面循環的數據加大而且這裏的時間改的更小試試 n=temp-1 # time.sleep(0.02) # n = n - 1 '''若是這樣寫的話看不出來效果,由於這樣寫就至關於直接將n的指向改了,就比如從10,通過1次減1以後,n就直接指向了9,速度太快,看不出效果,那麼咱們怎麼辦呢,找一箇中間變量來接收n,而後對這個中間變量進行修改,而後再賦值給n,多一個給n賦值的過程,那麼在這個過程當中間,咱們加上一點阻塞時間,來看效果,就像讀文件修改數據以後再寫回文件的過程。那麼這個程序就會出現結果爲9的狀況,首先一個進程的全局變量對於全部線程是共享的,因爲咱們在程序給中間變量賦值,而後給n再次賦值的過程當中咱們加了一些I/O時間,遇到I/O就切換,那麼每一個線程都拿到了10,並對10減1了,而後你們都獲得了9,而後再賦值給n,全部n等於了9''' # lock.release() if __name__ == '__main__': lock=Lock() n=100 l=[] # for i in range(10000): #若是這裏變成了10000,你在運行一下看看結果 for i in range(100): #若是這裏變成了10000,你在運行一下看看結果 p=Thread(target=work) l.append(p) p.start() for p in l: p.join() print(n) #結果確定爲0,由原來的併發執行變成串行,犧牲了執行效率保證了數據安全
上面這個代碼示例,若是循環次數變成了10000,在個人電腦上就會出現不一樣的結果,由於在線程切換的那個time.sleep的時間內,有些線程尚未被切換到,也就是有些線程尚未拿到n的值,因此計算結果就沒準了。
鎖一般被用來實現對共享資源的同步訪問。爲每個共享資源建立一個Lock對象,當你須要訪問該資源時,調用acquire方法來獲取鎖對象(若是其它線程已經得到了該鎖,則當前線程需等待其被釋放),待資源訪問完後,再調用release方法釋放鎖:
import threading R=threading.Lock() R.acquire() #
#R.acquire()若是這裏還有一個acquire,你會發現,程序就阻塞在這裏了,由於上面的鎖已經被拿到了而且尚未釋放的狀況下,再去拿就阻塞住了 ''' 對公共數據的操做 ''' R.release()
經過上面的代碼示例1,咱們看到多個線程搶佔資源的狀況,能夠經過加鎖來解決,看代碼:
from threading import Thread,Lock import os,time def work(): global n lock.acquire() #加鎖 temp=n time.sleep(0.1) n=temp-1 lock.release() if __name__ == '__main__': lock=Lock() n=100 l=[] for i in range(100): p=Thread(target=work) l.append(p) p.start() for p in l: p.join() print(n) #結果確定爲0,由原來的併發執行變成串行,犧牲了執行效率保證了數據安全
看上面代碼的圖形解釋:
分析: #1.100個線程去搶GIL鎖,即搶執行權限 #2. 確定有一個線程先搶到GIL(暫且稱爲線程1),而後開始執行,一旦執行就會拿到lock.acquire() #3. 極有可能線程1還未運行完畢,就有另一個線程2搶到GIL,而後開始運行,但線程2發現互斥鎖lock還未被線程1釋放,因而阻塞,被迫交出執行權限,即釋放GIL #4.直到線程1從新搶到GIL,開始從上次暫停的位置繼續執行,直到正常釋放互斥鎖lock,而後其餘的線程再重複2 3 4的過程
#不加鎖:併發執行,速度快,數據不安全 from threading import current_thread,Thread,Lock import os,time def task(): global n print('%s is running' %current_thread().getName()) temp=n time.sleep(0.5) n=temp-1 if __name__ == '__main__': n=100 lock=Lock() threads=[] start_time=time.time() for i in range(100): t=Thread(target=task) threads.append(t) t.start() for t in threads: t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:0.5216062068939209 n:99 ''' #不加鎖:未加鎖部分併發執行,加鎖部分串行執行,速度慢,數據安全 from threading import current_thread,Thread,Lock import os,time def task(): #未加鎖的代碼併發運行 time.sleep(3) print('%s start to run' %current_thread().getName()) global n #加鎖的代碼串行運行 lock.acquire() temp=n time.sleep(0.5) n=temp-1 lock.release() if __name__ == '__main__': n=100 lock=Lock() threads=[] start_time=time.time() for i in range(100): t=Thread(target=task) threads.append(t) t.start() for t in threads: t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:53.294203758239746 n:0 ''' #有的同窗可能有疑問:既然加鎖會讓運行變成串行,那麼我在start以後當即使用join,就不用加鎖了啊,也是串行的效果啊 #沒錯:在start以後馬上使用jion,確定會將100個任務的執行變成串行,毫無疑問,最終n的結果也確定是0,是安全的,但問題是 #start後當即join:任務內的全部代碼都是串行執行的,而加鎖,只是加鎖的部分即修改共享數據的部分是串行的 #單從保證數據安全方面,兩者均可以實現,但很明顯是加鎖的效率更高. from threading import current_thread,Thread,Lock import os,time def task(): time.sleep(3) print('%s start to run' %current_thread().getName()) global n temp=n time.sleep(0.5) n=temp-1 if __name__ == '__main__': n=100 lock=Lock() start_time=time.time() for i in range(100): t=Thread(target=task) t.start() t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 start to run Thread-2 start to run ...... Thread-100 start to run 主:350.6937336921692 n:0 #耗時是多麼的恐怖 '''
3.死鎖與遞歸鎖
進程也有死鎖與遞歸鎖,在進程那裏忘記說了,放到這裏一切說了額,進程的死鎖和線程的是同樣的,並且通常狀況下進程之間是數據不共享的,不須要加鎖,因爲線程是對全局的數據共享的,因此對於全局的數據進行操做的時候,要加鎖。
所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程,以下就是死鎖
from threading import Lock as Lock import time mutexA=Lock() mutexA.acquire() mutexA.acquire() print(123) mutexA.release() mutexA.release()
from threading import Thread,Lock import time mutexA=Lock() mutexB=Lock() class MyThread(Thread): def run(self): self.func1() self.func2() def func1(self): mutexA.acquire() print('\033[41m%s 拿到A鎖>>>\033[0m' %self.name) mutexB.acquire() print('\033[42m%s 拿到B鎖>>>\033[0m' %self.name) mutexB.release() mutexA.release() def func2(self): mutexB.acquire() print('\033[43m%s 拿到B鎖???\033[0m' %self.name) time.sleep(2) #分析:當線程1執行完func1,而後執行到這裏的時候,拿到了B鎖,線程2執行func1的時候拿到了A鎖,那麼線程2還要繼續執行func1裏面的代碼,再去拿B鎖的時候,發現B鎖被人拿了,那麼就一直等着別人把B鎖釋放,那麼就一直等着,等到線程1的sleep時間用完以後,線程1繼續執行func2,須要拿A鎖了,可是A鎖被線程2拿着呢,尚未釋放,由於他在等着B鎖被釋放,那麼這倆人就尷尬了,你拿着個人老A,我拿着你的B,這就尷尬了,倆人就停在了原地 mutexA.acquire() print('\033[44m%s 拿到A鎖???\033[0m' %self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(10): t=MyThread() t.start() ''' Thread-1 拿到A鎖>>> Thread-1 拿到B鎖>>> Thread-1 拿到B鎖??? Thread-2 拿到A鎖>>> 而後就卡住,死鎖了 '''
解決方法,遞歸鎖,在Python中爲了支持在同一線程中屢次請求同一資源,python提供了可重入鎖RLock。
這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。上面的例子若是使用RLock代替Lock,則不會發生死鎖:
from threading import RLock as Lock import time mutexA=Lock() mutexA.acquire() mutexA.acquire() print(123) mutexA.release() mutexA.release()
典型問題:科學家吃麪 ,看下面代碼示例:
import time from threading import Thread,Lock noodle_lock = Lock() fork_lock = Lock() def eat1(name): noodle_lock.acquire() print('%s 搶到了麪條'%name) fork_lock.acquire() print('%s 搶到了叉子'%name) print('%s 吃麪'%name) fork_lock.release() noodle_lock.release() def eat2(name): fork_lock.acquire() print('%s 搶到了叉子' % name) time.sleep(1) noodle_lock.acquire() print('%s 搶到了麪條' % name) print('%s 吃麪' % name) noodle_lock.release() fork_lock.release() for name in ['taibai','egon','wulaoban']: t1 = Thread(target=eat1,args=(name,)) t2 = Thread(target=eat2,args=(name,)) t1.start() t2.start()
import time from threading import Thread,RLock fork_lock = noodle_lock = RLock() def eat1(name): noodle_lock.acquire() print('%s 搶到了麪條'%name) fork_lock.acquire() print('%s 搶到了叉子'%name) print('%s 吃麪'%name) fork_lock.release() noodle_lock.release() def eat2(name): fork_lock.acquire() print('%s 搶到了叉子' % name) time.sleep(1) noodle_lock.acquire() print('%s 搶到了麪條' % name) print('%s 吃麪' % name) noodle_lock.release() fork_lock.release() for name in ['taibai','wulaoban']: t1 = Thread(target=eat1,args=(name,)) t1.start() for name in ['alex','peiqi']: t2 = Thread(target=eat2,args=(name,)) t2.start()
遞歸鎖大體描述: 當咱們的程序中須要兩把鎖的時候,你就要注意,別出現死鎖,最好就去用遞歸鎖。
同進程的同樣
Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器-1;
調用release() 時內置計數器+1;
計數器不能小於0;當計數器爲0時,acquire()將阻塞線程直到其餘線程調用release()。
實例:(同時只有5個線程能夠得到semaphore,便可以限制最大鏈接數爲5):
from threading import Thread,Semaphore import threading import time # def func(): # if sm.acquire(): # print (threading.currentThread().getName() + ' get semaphore') # time.sleep(2) # sm.release() def func(): sm.acquire() print('%s get sm' %threading.current_thread().getName()) time.sleep(3) sm.release() if __name__ == '__main__': sm=Semaphore(5) for i in range(23): t=Thread(target=func) t.start()
你們還記得信號量和進程池的區別嗎,線程也有線程池,和信號量也是那點區別
互斥鎖與信號量推薦博客:http://url.cn/5DMsS9r
同進程的同樣
線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。若是程序中的其 他線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就會變得很是棘手。爲了解決這些問題,咱們須要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。在 初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行
事件的基本方法:
event.isSet():返回event的狀態值; event.wait():若是 event.isSet()==False將阻塞線程; event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度; event.clear():恢復event的狀態值爲False。
還記得咱們進程那裏的事件用的什麼例子嗎,是否是紅綠燈啊,此次咱們不講紅綠燈的例子了,換個新的!
例如,有多個工做線程嘗試連接MySQL,咱們想要在連接前確保MySQL服務正常才讓那些工做線程去鏈接MySQL服務器,若是鏈接不成功,都會去嘗試從新鏈接。那麼咱們就能夠採用threading.Event機制來協調各個工做線程的鏈接操做
MySQL是啥呢?簡單說一下:
mysql就是一個數據庫,存數據用的東西,它就像一個文件夾,裏面存着不少的excel表格,咱們能夠在表格裏面寫數據,存數據。可是若是咱們要使用數據庫,咱們必須先要去鏈接它,你和他創建了鏈接關係,你才能操做它裏面存放的數據。
咱們先模擬一個場景:
首先起兩個線程:
第一個線程的用處:鏈接數據庫,那麼我這個線程須要等待一個信號,告訴我咱們之間的網絡是能夠連通的。
第二個線程的用處:檢測與數據庫之間的網絡是否聯通,併發送一個可聯通或者不可聯通的信號。
from threading import Thread,Event import threading import time,random def conn_mysql(): count=1 while not event.is_set(): if count > 3: raise TimeoutError('連接超時') #本身發起錯誤 print('<%s>第%s次嘗試連接' % (threading.current_thread().getName(), count)) event.wait(0.5) # count+=1 print('<%s>連接成功' %threading.current_thread().getName()) def check_mysql(): print('\033[45m[%s]正在檢查mysql\033[0m' % threading.current_thread().getName()) t1 = random.randint(0,3) print('>>>>',t1) time.sleep(t1) event.set() if __name__ == '__main__': event=Event() check = Thread(target=check_mysql) conn1=Thread(target=conn_mysql) conn2=Thread(target=conn_mysql) check.start() conn1.start() conn2.start()
使得線程等待,只有知足某條件時,才釋放n個線程,看一下大概怎麼用就能夠啦~~
import time from threading import Thread,RLock,Condition,current_thread def func1(c): c.acquire(False) #固定格式 # print(1111) c.wait() #等待通知, time.sleep(3) #通知完成後你們是串行執行的,這也看出了鎖的機制了 print('%s執行了'%(current_thread().getName())) c.release() if __name__ == '__main__': c = Condition() for i in range(5): t = Thread(target=func1,args=(c,)) t.start() while True: num = int(input('請輸入你要通知的線程個數:')) c.acquire() #固定格式 c.notify(num) #通知num個線程別等待了,去執行吧 c.release() #結果分析: # 請輸入你要通知的線程個數:3 # 請輸入你要通知的線程個數:Thread-1執行了 #有時候你會發現的你結果打印在了你要輸入內容的地方,這是打印的問題,不要緊,不影響 # Thread-3執行了 # Thread-2執行了
定時器,指定n秒後執行某個操做,這個作定時任務的時候可能會用到。
import time from threading import Timer,current_thread #這裏就不須要再引入Timer import threading def hello(): print(current_thread().getName()) print("hello, world") # time.sleep(3) #若是你的子線程的程序執行時間比較長,那麼這個定時任務也會亂,固然了,主要仍是看業務需求 t = Timer(10, hello) #建立一個子線程去執行後面的函數 t.start() # after 1 seconds, "hello, world" will be printed # for i in range(5): # t = Timer(2, hello) # t.start() # time.sleep(3) #這個是建立一個t用的時間是2秒,建立出來第二個的時候,第一個已通過了兩秒了,因此你的5個t的執行結果基本上就是2秒中,這個延遲操做。 print(threading.active_count()) print('主進程',current_thread().getName())
線程之間的通訊咱們列表行不行呢,固然行,那麼隊列和列表有什麼區別呢?
queue隊列 :使用import queue,用法與進程Queue同樣
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
-
class
queue.
Queue
(maxsize=0) #先進先出
import queue #不須要經過threading模塊裏面導入,直接import queue就能夠了,這是python自帶的 #用法基本和咱們進程multiprocess中的queue是同樣的 q=queue.Queue() q.put('first') q.put('second') q.put('third') # q.put_nowait() #沒有數據就報錯,能夠經過try來搞 print(q.get()) print(q.get()) print(q.get()) # q.get_nowait() #沒有數據就報錯,能夠經過try來搞 ''' 結果(先進先出): first second third '''
class queue.
LifoQueue
(maxsize=0) #last in fisrt out
import queue q=queue.LifoQueue() #隊列,相似於棧,棧咱們提過嗎,是否是先進後出的順序啊 q.put('first') q.put('second') q.put('third') # q.put_nowait() print(q.get()) print(q.get()) print(q.get()) # q.get_nowait() ''' 結果(後進先出): third second first '''
class queue.
PriorityQueue
(maxsize=0) #存儲數據時可設置優先級的隊列
import queue q=queue.PriorityQueue() #put進入一個元組,元組的第一個元素是優先級(一般是數字,也能夠是非數字之間的比較),數字越小優先級越高 q.put((-10,'a')) q.put((-5,'a')) #負數也能夠 # q.put((20,'ws')) #若是兩個值的優先級同樣,那麼按照後面的值的acsii碼順序來排序,若是字符串第一個數元素相同,比較第二個元素的acsii碼順序 # q.put((20,'wd')) # q.put((20,{'a':11})) #TypeError: unorderable types: dict() < dict() 不能是字典 # q.put((20,('w',1))) #優先級相同的兩個數據,他們後面的值必須是相同的數據類型才能比較,能夠是元祖,也是經過元素的ascii碼順序來排序 q.put((20,'b')) q.put((20,'a')) q.put((0,'b')) q.put((30,'c')) print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get()) print(q.get()) ''' 結果(數字越小優先級越高,優先級高的優先出隊): '''
這三種隊列都是線程安全的,不會出現多個線程搶佔同一個資源或數據的狀況。
到這裏就差咱們的線程池沒有講了,咱們用一個新的模塊給你們講,早期的時候咱們沒有線程池,如今python提供了一個新的標準或者說內置的模塊,這個模塊裏面提供了新的線程池和進程池,以前咱們說的進程池是在multiprocessing裏面的,如今這個在這個新的模塊裏面,他倆用法上是同樣的。
爲何要將進程池和線程池放到一塊兒呢,是爲了統一使用方式,使用threadPollExecutor和ProcessPollExecutor的方式同樣,並且只要經過這個concurrent.futures導入就能夠直接用他們兩個了
concurrent.futures模塊提供了高度封裝的異步調用接口 ThreadPoolExecutor:線程池,提供異步調用 ProcessPoolExecutor: 進程池,提供異步調用 Both implement the same interface, which is defined by the abstract Executor class. #2 基本方法 #submit(fn, *args, **kwargs) 異步提交任務 #map(func, *iterables, timeout=None, chunksize=1) 取代for循環submit的操做 #shutdown(wait=True) 至關於進程池的pool.close()+pool.join()操做 wait=True,等待池內全部任務執行完畢回收完資源後才繼續 wait=False,當即返回,並不會等待池內的任務執行完畢 但無論wait參數爲什麼值,整個程序都會等到全部任務執行完畢 submit和map必須在shutdown以前 #result(timeout=None) 取得結果 #add_done_callback(fn) 回調函數
import time import os import threading from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor def func(n): time.sleep(2) print('%s打印的:'%(threading.get_ident()),n) return n*n tpool = ThreadPoolExecutor(max_workers=5) #默認通常起線程的數據不超過CPU個數*5 # tpool = ProcessPoolExecutor(max_workers=5) #進程池的使用只須要將上面的ThreadPoolExecutor改成ProcessPoolExecutor就好了,其餘都不用改 #異步執行 t_lst = [] for i in range(5): t = tpool.submit(func,i) #提交執行函數,返回一個結果對象,i做爲任務函數的參數 def submit(self, fn, *args, **kwargs): 能夠傳任意形式的參數 t_lst.append(t) # # print(t.result()) #這個返回的結果對象t,不能直接去拿結果,否則又變成串行了,能夠理解爲拿到一個號碼,等全部線程的結果都出來以後,咱們再去經過結果對象t獲取結果 tpool.shutdown() #起到原來的close阻止新任務進來 + join的做用,等待全部的線程執行完畢 print('主線程') for ti in t_lst: print('>>>>',ti.result()) # 咱們還能夠不用shutdown(),用下面這種方式 # while 1: # for n,ti in enumerate(t_lst): # print('>>>>', ti.result(),n) # time.sleep(2) #每一個兩秒去去一次結果,哪一個有結果了,就能夠取出哪個,想表達的意思就是說不用等到全部的結果都出來再去取,能夠輪詢着去取結果,由於你的任務須要執行的時間很長,那麼你須要等好久才能拿到結果,經過這樣的方式能夠將快速出來的結果先拿出來。若是有的結果對象裏面尚未執行結果,那麼你什麼也取不到,這一點要注意,不是空的,是什麼也取不到,那怎麼判斷我已經取出了哪個的結果,能夠經過枚舉enumerate來搞,記錄你是哪個位置的結果對象的結果已經被取過了,取過的就再也不取了 #結果分析: 打印的結果是沒有順序的,由於到了func函數中的sleep的時候線程會切換,誰先打印就沒準兒了,可是最後的咱們經過結果對象取結果的時候拿到的是有序的,由於咱們主線程進行for循環的時候,咱們是按順序將結果對象添加到列表中的。 # 37220打印的: 0 # 32292打印的: 4 # 33444打印的: 1 # 30068打印的: 2 # 29884打印的: 3 # 主線程 # >>>> 0 # >>>> 1 # >>>> 4 # >>>> 9 # >>>> 16
ProcessPoolExecutor的使用:
只須要將這一行代碼改成下面這一行就能夠了,其餘的代碼都不用變 tpool = ThreadPoolExecutor(max_workers=5) #默認通常起線程的數據不超過CPU個數*5 # tpool = ProcessPoolExecutor(max_workers=5) 你就會發現爲何將線程池和進程池都放到這一個模塊裏面了,用法同樣
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import threading import os,time,random def task(n): print('%s is runing' %threading.get_ident()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ThreadPoolExecutor(max_workers=3) # for i in range(11): # future=executor.submit(task,i) s = executor.map(task,range(1,5)) #map取代了for+submit print([i for i in s])
import time import os import threading from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor def func(n): time.sleep(2) return n*n def call_back(m): print('結果爲:%s'%(m.result())) tpool = ThreadPoolExecutor(max_workers=5) t_lst = [] for i in range(5): t = tpool.submit(func,i).add_done_callback(call_back)
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor from multiprocessing import Pool import requests import json import os def get_page(url): print('<進程%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text} def parse_page(res): res=res.result() print('<進程%s> parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text'])) with open('db.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] # p=Pool(3) # for url in urls: # p.apply_async(get_page,args=(url,),callback=pasrse_page) # p.close() # p.join() p=ProcessPoolExecutor(3) for url in urls: p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個future對象obj,須要用obj.result()拿到結果