multiprocess模塊的徹底模仿了threading模塊的接口,兩者在使用層面,有很大的類似性,於是再也不詳細介紹html
官網連接:點擊進入python
multiprocess模塊的徹底模仿了threading模塊的接口,兩者在使用層面,有很大的類似性正則表達式
import time, random # from multiprocessing import Process from threading import Thread def piao(name): print('%s piaoing' % name) time.sleep(random.randrange(1, 5)) print('%s piao end' % name) if __name__ == '__main__': t1 = Thread(target=piao, args=('egon', )) t1.start() # 主線程向操做系統發信號,又開了一個線程 print("主線程") # 執行角度看是主線程,從資源角度看是主進程 # 這個程序整體是一個進程、兩個線程 """ egon piaoing 主進程 egon piao end """
import time, random # from multiprocessing import Process from threading import Thread class MyThread(Thread): def __init__(self, name): super().__init__() self.name = name def run(self): print("%s piaoing" % self.name) time.sleep(random.randrange(1, 5)) print("%s piao end" % self.name) if __name__ == '__main__': t1 = MyThread('egon') t1.start() # 主線程向操做系統發信號,又開了一個線程 print("主線程") """ egon piaoing 主線程 egon piao end """
import time from multiprocessing import Process from threading import Thread def piao(name): print('%s piaoing' % name) time.sleep(2) print('%s piao end' % name) if __name__ == '__main__': # p1 = Process(target=piao, args=('進程', )) # p1.start() """ 主線程 進程 piaoing 進程 piao end """ t1 = Thread(target=piao, args=('線程', )) t1.start() """ 線程 piaoing 主線程 線程 piao end """ print("主線程") # 對比可知,線程開銷遠小於進程,由於進程須要申請內存空間。
from threading import Thread from multiprocessing import Process n = 100 def task(): global n n = 0 if __name__ == '__main__': """進程驗證: p1 = Process(target=task,) p1.start() # 會把子進程的n改成了0,看是否影響主進程 p1.join() print("主進程", n) # 主進程 100 # 因而可知進程間是隔離的,子進程變量修改不影響主進程 """ """線程驗證""" t1 = Thread(target=task, ) t1.start() t1.join() print("主線程", n) # 主線程 0
from threading import Thread from multiprocessing import Process, current_process # current_process查看進程ID號 import os # os.getpid()也能夠查看進程ID n = 100 def task(): # print(current_process().pid) print('子進程PID:%s 父進程的PID:%s' % (os.getpid(), os.getppid())) if __name__ == '__main__': p1 = Process(target=task,) p1.start() # print("主線程", current_process().pid) print("主線程", os.getpid()) """ 主線程 6455 子進程PID:6456 父進程的PID:6455 """
from threading import Thread import os # os.getpid()也能夠查看進程ID n = 100 def task(): # print(current_process().pid) print('線程的進程 PID:%s' % os.getpid()) if __name__ == '__main__': t1 = Thread(target=task,) t1.start() # print("主線程", current_process().pid) print("主線程", os.getpid()) """說明兩個線程是同一個進程: 線程的進程 PID:6493 主線程 6493 """
一、基於多線程實現併發的套接字通訊緩存
# -*- coding:utf-8 -*- __author__ = 'Qiushi Huang' from socket import * from threading import Thread # 通信和創建連接分開,啓動不一樣的線程,你們是併發執行。 def communicate(conn): while True: try: data = conn.recv(1024) if not data:break conn.send(data.upper()) except ConnectionResetError: break conn.close() def server(ip, port): server = socket(AF_INET, SOCK_STREAM) server.bind((ip, port)) server.listen(5) while True: conn, addr = server.accept() # 建連接 t = Thread(target=communicate, args=(conn,)) # 建一個連接創一個線程 t.start() # communicate(conn) server.close() if __name__ == '__main__': server('127.0.0.1', 8091) # 主線程 """ 這種解決方案的問題是:當客戶端愈來愈多後,線程也會愈來愈多,會帶來服務崩潰的問題。 """
# -*- coding:utf-8 -*- __author__ = 'Qiushi Huang' # 使用時,能夠一個程序運行屢次,這是多個不一樣的in from socket import * client = socket(AF_INET, SOCK_STREAM) client.connect(("127.0.0.1", 8091)) while True: msg = input(">>").strip() if not msg:continue client.send(msg.encode("utf-8")) data = client.recv(1024) print(data.decode("utf-8")) client.close()
二、編寫一個簡單的文本處理工具,具有三個任務,一個接收用戶輸入,一個將用戶輸入的內容格式化成大寫,一個將格式化後的結果存入文件安全
from threading import Thread msg_l=[] format_l=[] def talk(): while True: msg=input('>>: ').strip() if not msg:continue msg_l.append(msg) def format_msg(): while True: if msg_l: res=msg_l.pop() format_l.append(res.upper()) def save(): while True: if format_l: with open('db.txt','a',encoding='utf-8') as f: res=format_l.pop() f.write('%s\n' %res) if __name__ == '__main__': t1=Thread(target=talk) t2=Thread(target=format_msg) t3=Thread(target=save) t1.start() t2.start() t3.start()
一、Thread實例對象的方法
isAlive(): 返回線程是否活動的。
getName(): 返回線程名。
setName(): 設置線程名。
threading.currentThread(): 返回當前的線程變量。
threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。
from threading import Thread, currentThread # 獲得線程對象的方法 from threading import active_count # 獲得活躍進程數 from threading import enumerate # 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。 import time # 須要注意的是線程沒有子線程的概念,線程都是屬於進程的 def task(): print("%s is running" % currentThread().getName()) # 對象下有一個getName()方法 time.sleep(2) print("%s is done" % currentThread().getName()) if __name__ == '__main__': getName()方法返回線程名 t = Thread(target=task, name='子線程1') t.start() print("主進程", currentThread().getName()) """ 子線程1 is running 主進程 MainThread 子線程1 is done """
from threading import Thread, currentThread # 獲得線程對象的方法 from threading import active_count # 獲得活躍進程數 from threading import enumerate # 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。 import time def task(): print("%s is running" % currentThread().getName()) # 對象下有一個getName()方法 time.sleep(2) print("%s is done" % currentThread().getName()) if __name__ == '__main__': setName()方法設置線程名 t = Thread(target=task, name='子線程1') t.start() t.setName('兒子線程1') # 修改進程名稱 currentThread().setName("主線程") # 設置主線程名稱(默認是MainThread) print(t.isAlive()) # 判斷線程是否存活 print("主進程", currentThread().getName()) """ 子線程1 is running True 主進程 主線程 兒子線程1 is done """
from threading import Thread, currentThread # 獲得線程對象的方法 from threading import active_count # 獲得活躍進程數 from threading import enumerate # 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。 import time def task(): print("%s is running" % currentThread().getName()) # 對象下有一個getName()方法 time.sleep(2) print("%s is done" % currentThread().getName()) if __name__ == '__main__': t = Thread(target=task, name='子線程1') t.start() t.setName('兒子線程1') # 修改進程名稱 t.join() # 主線程等子進程運行完畢再執行 currentThread().setName("主線程") # 設置主線程名稱(默認是MainThread) print(t.isAlive()) # 判斷線程是否存活 print("主進程", currentThread().getName()) """ 子線程1 is running 兒子線程1 is done False 主進程 主線程 """
from threading import Thread, currentThread # 獲得線程對象的方法 from threading import active_count # 獲得活躍進程數 from threading import enumerate # 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。 import time def task(): print("%s is running" % currentThread().getName()) # 對象下有一個getName()方法 time.sleep(2) print("%s is done" % currentThread().getName()) if __name__ == '__main__': # 測試threading.active_count方法 t = Thread(target=task, name='子線程1') t.start() print(active_count()) """ 子線程1 is running 2 子線程1 is done """
from threading import Thread, currentThread # 獲得線程對象的方法 from threading import active_count # 獲得活躍進程數 from threading import enumerate # 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。 import time def task(): print("%s is running" % currentThread().getName()) # 對象下有一個getName()方法 time.sleep(2) print("%s is done" % currentThread().getName()) if __name__ == '__main__': # 對上面改寫添加一個join() t = Thread(target=task, name='子線程1') t.start() t.join() # 運行完才執行主線程,所以後面打印的活躍線程數是一個 print(active_count()) """ 子線程1 is running 子線程1 is done 1 """
from threading import Thread, currentThread # 獲得線程對象的方法 from threading import active_count # 獲得活躍進程數 from threading import enumerate # 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。 import time def task(): print("%s is running" % currentThread().getName()) # 對象下有一個getName()方法 time.sleep(2) print("%s is done" % currentThread().getName()) if __name__ == '__main__': # threading.enumerate()方法:返回一個包含正在運行的線程的list t = Thread(target=task, name='子線程1') t.start() print(enumerate()) """ 子線程1 is running [<_MainThread(MainThread, started 4320744256)>, <Thread(子線程1, started 123145383735296)>] 子線程1 is done """
一個進程內,若是不開線程,默認就是一個主線程,主線程代碼運行完畢,進程被銷燬。服務器
一個進程內,開多個線程的狀況下,主線程在代碼運行完畢後,還要等其餘線程工做完才死掉,進程銷燬。網絡
守護線程守護主線程,等到主線程死了纔會被銷燬。在有其餘線程的狀況下,主線程代碼運行完後,等其餘非守護線程結束,守護線程纔會死掉。多線程
不管是進程仍是線程,都遵循:守護xxx會等待主xxx運行完畢後被銷燬。併發
須要強調的是:運行完畢並不是終止運行。運行完畢的真正含義:app
一、對主進程來講,運行完畢指的是主進程代碼運行完畢。
二、對主線程來講,運行完畢指的是主線程所在的進程內全部非守護線程通通運行完畢,主線程才能運行完畢。
詳細解釋
一、主進程在其代碼結束後就已經算運行完畢了(守護進程在此時就被回收),而後主進程會一直等非守護的子進程都運行完畢後回收子進程的資源(不然會產生殭屍進程),纔會結束
二、主線程在其餘非守護線程運行完畢後纔算運行完畢(守護線程在此時就被回收)。由於主線程的結束意味着進程的結束,進程總體的資源都將被回收,而進程必須保證非守護線程都運行完畢後才能結束。
from threading import Thread import time def sayhi(name): time.sleep(2) print("%s say hello" % name) if __name__ == '__main__': t = Thread(target=sayhi, args=('egon',)) # 守護線程必須在t.start()前設置 # 守護線程設置方式一: t.daemon=True # 守護線程設置方式二: # t.setDaemon(True) t.start() # 立馬建立子線程,但須要等待兩秒,所以程序會先執行下面的代碼 print("主線程") print(t.is_alive()) # 這一行代碼執行完後,主線程執行完畢,因爲主線程以外,只有一個守護線程,主線程不須要等守護線程執行結束,所以主線程和守護進程終止,進程結束。 """ 主線程 True """
練習:思考下述代碼的執行結果有多是哪些狀況?爲何?
from threading import Thread import time def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") if __name__ == '__main__': t1=Thread(target=foo) t2=Thread(target=bar) t1.daemon=True # t1是守護線程 t1.start() t2.start() print("main-------") # 主線程結束後,會等待非守護線程結束 # 因爲非守護線程須要等待的時間比守護線程長,所以線程都會獲得執行 """ 123 456 main------ end123 end456 """
連接:http://www.cnblogs.com/linhaifeng/articles/7449853.html
後期須要詳細分析這個部分的內容。
一、線程搶的是GIL鎖,GIL鎖至關於執行權限,拿到執行權限後才能拿到互斥鎖Lock,其餘線程也能夠搶到GIL,但若是發現Lock仍然沒有被釋放則阻塞,即使是拿到執行權限GIL也要馬上交出來。
二、join是等待全部,即總體串行,而鎖只是鎖住修改共享數據的部分,即部分串行,要想保證數據安全的根本原理在於讓併發變成串行,join與互斥鎖均可以實現,毫無疑問,互斥鎖的部分串行效率要更高。
三、必定要主要本小節最後GIL和互斥鎖的經典分析。
Python已經有了一個GIL來保證同一時間只能有一個線程來執行,爲何還須要lock?
鎖的目的是爲了保護共享的數據,同一時間只能有一個線程來修改共享的數據。
保護不一樣的數據就應該加不一樣的鎖。
GIL 與Lock是兩把鎖,保護的數據不同,GIL是解釋器級別的(固然保護的就是解釋器級別的數據,好比垃圾回收的數據),Lock是保護用戶本身開發的應用程序的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock。
過程分析:全部線程搶的是GIL鎖,或者說全部線程搶的是執行權限
線程1搶到GIL鎖,拿到執行權限,開始執行,而後加了一把Lock,尚未執行完畢,即線程1還未釋放Lock,有可能線程2搶到GIL鎖,開始執行,執行過程當中發現Lock尚未被線程1釋放,因而線程2進入阻塞,被奪走執行權限,有可能線程1拿到GIL,而後正常執行到釋放Lock。。。這就致使了串行運行的效果
既然是串行,那咱們執行
t1.start()
t1.join
t2.start()
t2.join()
這也是串行執行啊。
需知join是等待t1全部的代碼執行完,至關於鎖住了t1的全部代碼,而Lock只是鎖住一部分操做共享數據的代碼。
由於Python解釋器幫你自動按期進行內存回收,你能夠理解爲python解釋器裏有一個獨立的線程,每過一段時間它起wake up作一次全局輪詢看看哪些內存數據是能夠被清空的,此時你本身的程序 裏的線程和 py解釋器本身的線程是併發運行的,假設你的線程刪除了一個變量,py解釋器的垃圾回收線程在清空這個變量的過程當中的clearing時刻,可能一個其它線程正好又從新給這個還沒來及得清空的內存空間賦值了,結果就有可能新賦值的數據被刪除了,爲了解決相似的問題,python解釋器簡單粗暴的加了鎖,即當一個線程運行時,其它人都不能動,這樣就解決了上述的問題, 這能夠說是Python早期版本的遺留問題。
鎖一般被用來實現對共享資源的同步訪問。爲每個共享資源建立一個Lock對象,當你須要訪問該資源時,調用acquire方法來獲取鎖對象(若是其它線程已經得到了該鎖,則當前線程需等待其被釋放),待資源訪問完後,再調用release方法釋放鎖:
import threading R=threading.Lock() R.acquire() ''' 對公共數據的操做 ''' R.release()
from threading import Thread import os,time def work(): global n temp=n time.sleep(0.1) n=temp-1 if __name__ == '__main__': n=100 l=[] for i in range(100): p=Thread(target=work) l.append(p) p.start() for p in l: p.join() print(n) #結果可能爲99
from threading import Thread,Lock import os,time def work(): global n lock.acquire() temp=n time.sleep(0.1) n=temp-1 lock.release() if __name__ == '__main__': lock=Lock() n=100 l=[] for i in range(100): p=Thread(target=work) l.append(p) p.start() for p in l: p.join() print(n) #結果確定爲0,由原來的併發執行變成串行,犧牲了執行效率保證了數據安全
一、100個線程去搶GIL鎖,即搶執行權限。
二、確定有一個線程先搶到GIL(暫且稱爲線程1),而後開始執行,一旦執行就會拿到lock.acquire()
三、極有可能線程1還未運行完畢,就有另一個線程2搶到GIL,而後開始運行,但線程2發現互斥鎖lock尚未被線程1釋放,因而阻塞,被迫交出執行權限,即釋放GIL。
四、直到線程1從新搶到GIL,開始從上次暫停的位置繼續執行,直到正常釋放互斥鎖lock,而後其餘線程再重複234的過程。
#不加鎖:併發執行,速度快,數據不安全 from threading import current_thread,Thread,Lock import os,time def task(): global n print('%s is running' %current_thread().getName()) temp=n time.sleep(0.5) n=temp-1 if __name__ == '__main__': n=100 lock=Lock() threads=[] start_time=time.time() for i in range(100): t=Thread(target=task) threads.append(t) t.start() for t in threads: t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:0.5216062068939209 n:99 '''
#加鎖:未加鎖部分併發執行,加鎖部分串行執行,速度慢,數據安全 from threading import current_thread,Thread,Lock import os,time def task(): #未加鎖的代碼併發運行 time.sleep(3) print('%s start to run' %current_thread().getName()) global n #加鎖的代碼串行運行 lock.acquire() temp=n time.sleep(0.5) n=temp-1 lock.release() if __name__ == '__main__': n=100 lock=Lock() threads=[] start_time=time.time() for i in range(100): t=Thread(target=task) threads.append(t) t.start() for t in threads: t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:53.294203758239746 n:0 '''
加鎖會將運行變成串行,一樣適用join也能夠獲得串行的效果,數據也是安全的,可是start後當即join,任務內的全部代碼都是串行執行的,而加鎖只是加鎖的部分(修改共享數據的部分)是串行的,二者從保護數據安全方面來講是同樣的,可是加鎖的效率更高。
from threading import current_thread,Thread,Lock import os,time def task(): time.sleep(3) print('%s start to run' %current_thread().getName()) global n temp=n time.sleep(0.5) n=temp-1 if __name__ == '__main__': n=100 lock=Lock() start_time=time.time() for i in range(100): t=Thread(target=task) t.start() t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 start to run Thread-2 start to run ...... Thread-100 start to run 主:350.6937336921692 n:0 #耗時是多麼的恐怖 '''
死鎖: 是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程。
from threading import Thread, Lock import time # 實例化兩把鎖 mutexA = Lock() mutexB = Lock() class MyThread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print("%s 拿到A鎖" % self.name) mutexB.acquire() print("%s 拿到了B鎖" % self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print("%s 拿到B鎖" % self.name) time.sleep(0.1) # 線程1在此休息0.1秒 mutexA.acquire() print("%s 拿到了A鎖" % self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(10): t = MyThread() t.start() # 信號提交,就幾乎立馬啓動了 """程序輸出下面內容後,卡住了 Thread-1 拿到A鎖 Thread-1 拿到了B鎖 Thread-1 拿到B鎖 Thread-2 拿到A鎖 ————》線程1睡着時,線程2拿到A鎖,要去拿B鎖,B鎖在線程1手裏,線程1睡完要去拿A鎖,A鎖在線程2手裏,所以產生死鎖。 """
線程1睡着時,線程2拿到A鎖,要去拿B鎖,B鎖在線程1手裏,線程1睡完要去拿A鎖,A鎖在線程2手裏,所以產生死鎖。上述例子也說明:本身處理鎖其實很是繁瑣也很是危險,必定要在適當的時候考慮把鎖釋放掉。處理不當就會出現死鎖,整個程序就會卡在原地。
這是因爲互斥鎖只能acquire一次,使用方法以下:
from threading import Thread, Lock mutexA = Lock() mutexA.acquire() mutexA.release()
解決辦法——遞歸鎖,能夠連續acquire屢次,每acquire一次計數器加1;只要計數不爲0,就不能被其餘線程搶到(只有計數爲0,才能被搶到acquire)
在Python中爲了支持同一線程中屢次請求同一資源,提供了可重入鎖RLock。
這個RLock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源能夠被屢次require。直到一個線程全部的acquire都被release,其餘的線程才能得到資源。上面的例子若是使用RLock代替Lock,則不會發生死鎖:
mutexA=mutexB=threading.RLock() #一個線程拿到鎖,counter加1,該線程內又碰到加鎖的狀況,則counter繼續加1
,這期間全部其餘線程都只能等待,等待該線程釋放全部鎖,即counter遞減到0爲止
from threading import Thread, RLock import time """鏈式賦值""" mutexA=mutexB=RLock() # 使用遞歸鎖能夠 class MyThread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() # 遞歸鎖計數器加1 print("%s 拿到A鎖" % self.name) mutexB.acquire() print("%s 拿到了B鎖" % self.name) mutexB.release() # 遞歸鎖計數器減1 mutexA.release() def f2(self): mutexB.acquire() print("%s 拿到B鎖" % self.name) time.sleep(1) # 線程1在此休息0.1秒 mutexA.acquire() print("%s 拿到了A鎖" % self.name) mutexA.release() mutexB.release() # 計數爲0,其餘線程能夠搶acquire if __name__ == '__main__': for i in range(10): t = MyThread() t.start() # 信號提交,就幾乎立馬啓動了 """ 第一個線程計數器爲0 後,其餘線程能夠開始搶acquire,所以順序是不固定的。 Thread-1 拿到A鎖 Thread-1 拿到了B鎖 Thread-1 拿到B鎖 Thread-1 拿到了A鎖 Thread-2 拿到A鎖 Thread-2 拿到了B鎖 Thread-2 拿到B鎖 Thread-2 拿到了A鎖 Thread-4 拿到A鎖 Thread-4 拿到了B鎖 Thread-4 拿到B鎖 Thread-4 拿到了A鎖 Thread-6 拿到A鎖 Thread-6 拿到了B鎖 Thread-6 拿到B鎖 Thread-6 拿到了A鎖 Thread-8 拿到A鎖 Thread-8 拿到了B鎖 Thread-8 拿到B鎖 Thread-8 拿到了A鎖 Thread-10 拿到A鎖 Thread-10 拿到了B鎖 Thread-10 拿到B鎖 Thread-10 拿到了A鎖 Thread-5 拿到A鎖 Thread-5 拿到了B鎖 Thread-5 拿到B鎖 Thread-5 拿到了A鎖 Thread-9 拿到A鎖 Thread-9 拿到了B鎖 Thread-9 拿到B鎖 Thread-9 拿到了A鎖 Thread-7 拿到A鎖 Thread-7 拿到了B鎖 Thread-7 拿到B鎖 Thread-7 拿到了A鎖 Thread-3 拿到A鎖 Thread-3 拿到了B鎖 Thread-3 拿到B鎖 Thread-3 拿到了A鎖 """
信號量也是一把鎖,能夠指定信號量爲5,對比互斥鎖同一時間只能有一個任務搶到鎖去執行,信號量同一時間能夠有5個任務拿到鎖去執行,若是說互斥鎖是合租房屋的人去搶一個廁所,那麼信號量就至關於一羣路人爭搶公共廁所,公共廁全部多個坑位,這意味着同一時間能夠有多我的上公共廁所,但公共廁所容納的人數是必定的,這即是信號量的大小。
from threading import Thread, Semaphore, currentThread import time, random sm = Semaphore(3) # 定義出坑的個數 def task(): # sm.acquire() # print("%s in" % currentThread().getName()) # sm.release() with sm: print("%s in " % currentThread().getName()) time.sleep(random.randint(1, 3)) if __name__ == '__main__': for i in range(10): t = Thread(target=task) t.start() """ Thread-1 in Thread-2 in Thread-3 in Thread-5 in Thread-6 in Thread-4 in Thread-7 in Thread-8 in Thread-9 in Thread-10 in """
Semaphore管理一個內置的計算器,每當調用acquire()時內置計數器-1;調用release()時內置計數器+1;計數器不能小於0;當計數器爲0時,acquire()將阻塞線程直到其餘線程調用release()。
與進程池是徹底不一樣的概念,進程池Pool(4),最大隻能產生4個進程,並且從頭至尾都只是這四個進程,不會產生新的,而信號量是產生一堆線程/進程
互斥鎖與信號量推薦博客:http://url.cn/5DMsS9r
同進程的同樣線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。若是程序中的其餘線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就會變得很是棘手。
爲了解決這些問題,咱們須要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。在 初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行。
from threading import Event :調用event event.isSet():返回event的狀態值; event.wait():若是 event.isSet()==False將阻塞線程; event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度; event.clear():恢復event的狀態值爲False。 ————set()後再clear(),重置到初始狀態。
例如,有多個工做線程嘗試連接MySQL,咱們想要在連接前確保MySQL服務正常才讓那些工做線程去鏈接MySQL服務器,若是鏈接不成功,都會去嘗試從新鏈接。那麼咱們就能夠採用threading.Event機制來協調各個工做線程的鏈接操做。
from threading import Thread, Event import time event = Event() def student(name): print("學生%s正在聽課" % name) event.wait() # 在原地等待 print("學生%s課間活動" % name) def teacher(name): print("老師%s 正在授課" % name) time.sleep(7) event.set() # 這個運行後等待結束 if __name__ == '__main__': stu1 = Thread(target=student, args=('alex',)) stu2 = Thread(target=student, args=('wxx',)) stu3 = Thread(target=student, args=('yxx',)) t1 = Thread(target=teacher, args=('egon',)) stu1.start() stu2.start() stu3.start() t1.start() """ 學生alex正在聽課 學生wxx正在聽課 學生yxx正在聽課 老師egon 正在授課 ------->在這裏等7秒後,學生開始作課間活動 學生alex課間活動 學生wxx課間活動 學生yxx課間活動 """
將上例改寫:有的學生線程,須要在老師發出結束信號前就去作其餘工做。
from threading import Thread, Event import time event = Event() def student(name): print("學生%s正在聽課" % name) event.wait(2) print("學生%s課間活動" % name) def teacher(name): print("老師%s 正在授課" % name) time.sleep(7) event.set() if __name__ == '__main__': stu1 = Thread(target=student, args=('alex',)) stu2 = Thread(target=student, args=('wxx',)) stu3 = Thread(target=student, args=('yxx',)) t1 = Thread(target=teacher, args=('egon',)) stu1.start() stu2.start() stu3.start() t1.start() """ 學生alex正在聽課 學生wxx正在聽課 學生yxx正在聽課 老師egon 正在授課 -------》等兩秒後,學生就去作課間活動了,等滿七秒,程序才結束 學生alex課間活動 學生yxx課間活動 學生wxx課間活動 """
有不少時候,屢次檢測不成功,須要設置超時時間:
from threading import Thread, Event, currentThread import time event = Event() def conn(): n=0 while not event.is_set(): # 尚未set()過,值爲False if n == 3: print("%s try too many times!" % currentThread().getName()) return # 結束整個函數,若是break,則連接成功了 print("%s try %s" % (currentThread().getName(), n)) event.wait(0.5) # 原地等待,並設置了超時時間 n += 1 print("%s is connected" % currentThread().getName()) def check(): print("%s is checking" % currentThread().getName()) time.sleep(5) # 模擬檢測 event.set() # 檢測OK if __name__ == '__main__': for i in range(3): t = Thread(target=conn) t.start() t = Thread(target=check) # 檢測線程 t.start() """ Thread-1 try 0 Thread-2 try 0 Thread-3 try 0 Thread-4 is checking Thread-1 try 1 Thread-2 try 1 Thread-3 try 1 Thread-1 try 2 Thread-3 try 2 Thread-2 try 2 Thread-1 try too many times! Thread-2 try too many times! Thread-3 try too many times! """
使得線程等待,只有知足某條件時,才釋放n個線程
import threading def run(n): con.acquire() con.wait() print("run the thread: %s" %n) con.release() if __name__ == '__main__': con = threading.Condition() for i in range(10): t = threading.Thread(target=run, args=(i,)) t.start() while True: inp = input('>>>') if inp == 'q': break con.acquire() con.notify(int(inp)) con.release()
函數化:
def condition_func(): ret = False inp = input('>>>') if inp == '1': ret = True return ret def run(n): con.acquire() con.wait_for(condition_func) print("run the thread: %s" %n) con.release() if __name__ == '__main__': con = threading.Condition() for i in range(10): t = threading.Thread(target=run, args=(i,)) t.start()
定時器:指定時間後,隔多少時間以後觸發去執行某操做。
from threading import Timer def task(name): print("hello %s" % name) t = Timer(5, task, args=('egon',)) # 建立對象,Timer是Thread的子類,其實就是一個線程 t.start() # hello egon ---->在等五秒後打印
import random def make_code(n=4): # 設置默認值爲4 res = '' for i in range(n): s1 = str(random.randint(0,9)) # 隨機數字字符 s2 = chr(random.randint(65, 90)) # 隨機字母 ,注意瞭解chr()內置函數 res += random.choice([s1, s2]) return res print(make_code()) """ 6HS8 \ 6S38 ————》結果隨機產生 """
將定時器改寫爲類:
from threading import Timer import random class Code: def __init__(self): self.make_cache() def make_cache(self, interval=8): self.cache = self.make_code() # 緩存驗證碼 print(self.cache) self.t = Timer(interval, self.make_cache) # 建立定時器,到時間刷新一次 self.t.start() def make_code(self, n=4): # 設置默認值爲4 res = '' for i in range(n): s1 = str(random.randint(0,9)) # 隨機數字字符 s2 = chr(random.randint(65, 90)) # 隨機字母 ,注意瞭解chr()內置函數 res += random.choice([s1, s2]) return res def check(self): while True: code = input("請輸入你的驗證碼>>:").strip() if code.upper() == self.cache: print("驗證碼輸入正確") self.t.cancel() break obj = Code() obj.check() """ E8I2 請輸入你的驗證碼>>:E8I2 驗證碼輸入正確 """
queue隊列 :使用import queue,用法與進程Queue同樣
q.put方法用以插入數據到隊列中。
q.get方法能夠從隊列讀取而且刪除一個元素。
一、class queue.Queue(maxsize) 定義好隊列存數據的最大值,隊列存取規則是先進先出
import queue q = queue.Queue(3) # 生成隊列,存數據最大值是3 先進先出 q.put("first") # 放值進去 q.put(2) q.put("third") # q.put(4) # 隊列滿了,阻塞 print(q.get()) # 取數據 print(q.get()) print(q.get()) """ first 2 third """
隊列放滿了繼續放,會形成阻塞;
若是把默認的block=True改成False,程序會直接報錯:raise full;
若是不修改block,而是設置timeout,程序會等到timeout時間以後報錯raise full。
隊列爲空繼續取,和上述的狀況很是相似:
import queue q = queue.Queue(3) # 生成隊列,存數據最大值是3 先進先出 q.put("first") # 放值進去 q.put(2) q.put("third") # q.put(4) # 隊列滿了,阻塞 # q.put(4, block=False) # 默認是block=True, 改成False後,隊列滿了還加數據,程序報錯raise Full queue.Full # q.put(4, block=True, timeout=3) # 設置了block=True隊列滿不會直接報錯了,可是還加上了timeout=3,程序會等3秒後提示報錯queue.Full # 同理get()方法也有這些參數 print(q.get()) # 取數據 print(q.get()) print(q.get()) # print(q.get(block=False)) # 在隊列空,還取時,通常是卡住,但加入了block=False參數的話,會提示報錯queue.Empty print(q.get_nowait()) # 這個效果同上 print(q.get(block=True, timeout=3)) # 隊列空,還取數據時,會按照timeout時間等待,到時間後提示queue.Empty
二、class queue.LifeQueue(maxsize) 定義爲堆棧,堆棧的存取規則是後進先出
import queue q = queue.LifoQueue(3) # 堆棧 q.put("first") q.put(2) q.put("third") print(q.get()) # third print(q.get()) # 2 print(q.get()) # first
三、class queue.PriorityQueue(maxsize) 存儲數據時可設置優先級的隊列
import queue q = queue.PriorityQueue(3) q.put(10, 'one') q.put(40, 'two') q.put(30, 'three') print(q.get()) # 10 print(q.get()) # 30 print(q.get()) # 40
數字越小優先級越高(取出的優先級)。
在剛開始學多進程或多線程時,咱們火燒眉毛地基於多進程或多線程實現併發的套接字通訊:
# -*- coding:utf-8 -*- __author__ = 'Qiushi Huang' from socket import * from threading import Thread # 通信和創建連接分開,啓動不一樣的線程,你們是併發執行。 def communicate(conn): while True: try: data = conn.recv(1024) if not data:break conn.send(data.upper()) except ConnectionResetError: break conn.close() def server(ip, port): server = socket(AF_INET, SOCK_STREAM) server.bind((ip, port)) server.listen(5) while True: conn, addr = server.accept() # 建連接 t = Thread(target=communicate, args=(conn,)) # 建一個連接創一個線程 t.start() # communicate(conn) server.close() if __name__ == '__main__': server('127.0.0.1', 8091) # 主線程 """ 這種解決方案的問題是:當客戶端愈來愈多後,線程也會愈來愈多,會帶來服務崩潰的問題。 """
# -*- coding:utf-8 -*- __author__ = 'Qiushi Huang' # 使用時,能夠一個程序運行屢次,這是多個不一樣的in from socket import * client = socket(AF_INET, SOCK_STREAM) client.connect(("127.0.0.1", 8091)) while True: msg = input(">>").strip() if not msg:continue client.send(msg.encode("utf-8")) data = client.recv(1024) print(data.decode("utf-8")) client.close()
這種實現方式的致命缺陷是:服務的開啓的進程數或線程數都會隨着併發的客戶端數目地增多而增多,這會對服務端主機帶來巨大的壓力,甚至於不堪重負而癱瘓,因而咱們必須對服務端開啓的進程數或線程數加以控制,讓機器在一個本身能夠承受的範圍內運行,這就是進程池或線程池的用途,例如進程池,就是用來存放進程的池子,本質仍是基於多進程,只不過是對開啓進程的數目加上了限制。
進程池和線程池的接口如出一轍,用法也徹底相同。池就是要對數目加以限制,保證機器一個可承受的範圍,以一個健康的狀態保證它的運行。
一、介紹:
官網:https://docs.python.org/dev/library/concurrent.futures.html concurrent.futures 模塊提供了高度封裝的異步調用接口 ThreadPoolExecutor: 線程池,提供異步調用 ProcessPoolExecutor: 進程池,提供異步調用 Both implement the same interface, which is defined by the abstract Executor class.
基本方法:
一、submit(fn, *args, **kwargs) 異步提交任務 二、map(func, *iterables, timeout=None, chunksize=1) 取代for循環submit的操做 三、shutdown(wait=True) 至關於進程池的pool.close()+pool.join()操做 wait=True,等待池內全部任務執行完畢回收完資源後才繼續 wait=False,當即返回,並不會等待池內的任務執行完畢 但無論wait參數爲什麼值,整個程序都會等到全部任務執行完畢 submit和map必須在shutdown以前 四、result(timeout=None) 取得結果 5、add_done_callback(fn) 回調函數
二、進程池:
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import os, time, random def task(name): print("name: %s pid: %s run" % (name, os.getpid())) time.sleep(random.randint(1,3)) if __name__ == '__main__': pool = ProcessPoolExecutor(4) # 指定進程池大小,最大進程數,若是不指定默認是CPU核數 for i in range(10): """從始至終四個進程解決這10個任務,誰沒事了接新任務""" pool.submit(task, 'egon%s' %i) # 提交任務的方式————異步調用:提交完任務,不用在原地等任務執行拿到結果。 print("主進程") """ name: egon0 pid: 12445 run name: egon1 pid: 12444 run name: egon2 pid: 12446 run name: egon3 pid: 12447 run 主進程 name: egon4 pid: 12445 run name: egon5 pid: 12444 run name: egon6 pid: 12446 run name: egon7 pid: 12445 run name: egon8 pid: 12446 run name: egon9 pid: 12447 run """
shutdown()方法的使用:
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import os, time, random def task(name): print("name: %s pid: %s run" % (name, os.getpid())) time.sleep(random.randint(1,3)) if __name__ == '__main__': pool = ProcessPoolExecutor(4) # 指定進程池大小,最大進程數,若是不指定默認是CPU核數 for i in range(10): """從始至終四個進程解決這10個任務,誰沒事了接新任務""" pool.submit(task, 'egon%s' %i) # 提交任務的方式————異步調用:提交完任務,不用在原地等任務執行拿到結果。 pool.shutdown() # 把提交任務入口關閉,默認參數wait=True;同時還進行了pool.join()操做,等任務提交結束,再結束主進程 print("主進程") """ name: egon0 pid: 12502 run name: egon1 pid: 12503 run name: egon2 pid: 12504 run name: egon3 pid: 12505 run name: egon4 pid: 12502 run name: egon5 pid: 12503 run name: egon6 pid: 12505 run name: egon7 pid: 12504 run name: egon8 pid: 12503 run name: egon9 pid: 12505 run 主進程 """
三、針對線程的狀況:
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import os, time, random def task(name): print("name: %s pid: %s run" % (name, os.getpid())) time.sleep(random.randint(1,3)) if __name__ == '__main__': pool = ThreadPoolExecutor(4) for i in range(10): """從始至終四個進程解決這10個任務,誰沒事了接新任務""" pool.submit(task, 'egon%s' %i) # 提交任務的方式————異步調用:提交完任務,不用在原地等任務執行拿到結果。 pool.shutdown(wait=True) # 把提交任務入口關閉,默認參數wait=True;同時還進行了pool.join()操做,等任務提交結束,再結束主進程 print("主進程") """ name: egon0 pid: 12528 run name: egon1 pid: 12528 run name: egon2 pid: 12528 run name: egon3 pid: 12528 run name: egon4 pid: 12528 run name: egon5 pid: 12528 run name: egon6 pid: 12528 run name: egon7 pid: 12528 run name: egon8 pid: 12528 run name: egon9 pid: 12528 run 主進程 """
currentThread()方法查看線程名:
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor from threading import currentThread # 查看線程名 import os, time, random def task(): print("name: %s pid: %s run" % (currentThread().getName(), os.getpid())) time.sleep(random.randint(1,3)) if __name__ == '__main__': pool = ThreadPoolExecutor(4) for i in range(10): """從始至終四個進程解決這10個任務,誰沒事了接新任務""" pool.submit(task,) # 提交任務的方式————異步調用:提交完任務,不用在原地等任務執行拿到結果。 pool.shutdown(wait=True) # 把提交任務入口關閉,默認參數wait=True;同時還進行了pool.join()操做,等任務提交結束,再結束主進程 print("主進程") """ name: <concurrent.futures.thread.ThreadPoolExecutor object at 0x10401af28>_0 pid: 12554 run name: <concurrent.futures.thread.ThreadPoolExecutor object at 0x10401af28>_1 pid: 12554 run name: <concurrent.futures.thread.ThreadPoolExecutor object at 0x10401af28>_2 pid: 12554 run name: <concurrent.futures.thread.ThreadPoolExecutor object at 0x10401af28>_3 pid: 12554 run name: <concurrent.futures.thread.ThreadPoolExecutor object at 0x10401af28>_2 pid: 12554 run name: <concurrent.futures.thread.ThreadPoolExecutor object at 0x10401af28>_0 pid: 12554 run name: <concurrent.futures.thread.ThreadPoolExecutor object at 0x10401af28>_3 pid: 12554 run name: <concurrent.futures.thread.ThreadPoolExecutor object at 0x10401af28>_2 pid: 12554 run name: <concurrent.futures.thread.ThreadPoolExecutor object at 0x10401af28>_1 pid: 12554 run name: <concurrent.futures.thread.ThreadPoolExecutor object at 0x10401af28>_0 pid: 12554 run 主進程 """
四、map方法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ThreadPoolExecutor(max_workers=3) # for i in range(11): # future=executor.submit(task,i) executor.map(task,range(1,12)) #map取代了for+submit
五、異步調用和回調機制
同步調用:提交完任務後,就在原地等待任務執行完畢,拿到執行結果,再執行下一行。
致使程序是串行執行。
from concurrent.futures import ThreadPoolExecutor import time import random def la(name): print("%s is laing" % name) time.sleep(random.randint(3,5)) res = random.randint(7, 13)*'#' return {'name':name, 'res':res} def weigh(shit): name = shit['name'] size = len(shit['res']) print('%s 拉了 《%s》kg' % (name, size)) if __name__ == '__main__': pool = ThreadPoolExecutor(13) shit1 = pool.submit(la, 'alex').result() weigh(shit1) shit2 = pool.submit(la, 'wupeiqi').result() weigh(shit2) shit3 = pool.submit(la, "yuanhao").result() weigh(shit3) """ alex is laing alex 拉了 《7》kg wupeiqi is laing wupeiqi 拉了 《9》kg yuanhao is laing yuanhao 拉了 《11》kg """
異步調用:與同步相對,一個異步功能調用提交完任務後,調用者不會馬上獲得結果,而在完成後,經過狀態、通知或回調函數來通知調用者。
from concurrent.futures import ThreadPoolExecutor import time import random def la(name): print("%s is laing" % name) time.sleep(random.randint(3,5)) res = random.randint(7, 13)*'#' # return {'name':name, 'res':res} weigh({'name':name, 'res':res}) # 直接把字典傳給稱重weigh(),但形成了程序耦合 def weigh(shit): name = shit['name'] size = len(shit['res']) print('%s 拉了 《%s》kg' % (name, size)) if __name__ == '__main__': pool = ThreadPoolExecutor(13) pool.submit(la, 'alex') pool.submit(la, 'wupeiqi') pool.submit(la, "yuanhao") """併發執行拉的任務,誰執行完,誰把結果傳給稱重功能。 alex is laing wupeiqi is laing yuanhao is laing alex 拉了 《10》kg wupeiqi 拉了 《7》kg yuanhao 拉了 《7》kg """
能夠爲進程池或線程池內的每一個進程或線程綁定一個函數,該函數在進程或線程的任務執行完畢後自動觸發,並接收任務的返回值看成參數,該函數稱爲回調函數。
from concurrent.futures import ThreadPoolExecutor import time import random def la(name): print("%s is laing" % name) time.sleep(random.randint(3,5)) res = random.randint(7, 13)*'#' return {'name':name, 'res':res} # weigh({'name':name, 'res':res}) # 直接把字典傳給稱重weigh(),形成了程序耦合 def weigh(shit): shit = shit.result() # 對象.result()拿到結果並賦值給shit name = shit['name'] size = len(shit['res']) print('%s 拉了 《%s》kg' % (name, size)) if __name__ == '__main__': pool = ThreadPoolExecutor(13) # 回調函數,前面任務執行完,return返回值就會自動觸發weith功能執行,把pool.submit(la, 'alex')對象當作參數傳給weigh() pool.submit(la, 'alex').add_done_callback(weigh) pool.submit(la, 'wupeiqi').add_done_callback(weigh) pool.submit(la, "yuanhao").add_done_callback(weigh) """ alex is laing wupeiqi is laing yuanhao is laing alex 拉了 《10》kg wupeiqi 拉了 《7》kg yuanhao 拉了 《7》kg """
阻塞和非阻塞的區別?
阻塞是進程運行的一種狀態,遇到I/O就會進入阻塞狀態,會被剝奪走CPU的執行權限。
阻塞調用在調用結果返回前,當前線程會被掛起。直到獲得返回結果,纔會將阻塞線程激活。
非阻塞調用:指在不能馬上獲得結果以前也會馬上返回,同時函數不會阻塞當前線程。
阻塞和同步調用的區別?
同步調用:調用會一直等待,直到任務返回結果爲止,即便被搶走cpu也是處於就緒狀態。
阻塞調用:socket工做在阻塞模式時,若是沒有數據下調用recv(),當前進程掛起,直到有數據爲止。
同步異步針對的是函數/任務的調用方式;阻塞非阻塞針對的是進程或線程。
# -*- coding:utf-8 -*- __author__ = 'Qiushi Huang' # 多I/O的問題採用線程池 from concurrent.futures import ThreadPoolExecutor import requests import time def get(url): print('get %s' % url) # requests.get()就是目標頁面下載一個文件到本地來 response = requests.get(url) # 對象 time.sleep(3) # 模擬網絡延遲 # print(response.text) # 網頁內容 return {'url':url, 'content':response.text} def paese(res): # 解析,正則表達式 res = res.result() print('%s parse res is %s' % (res['url'], len(res['content']))) if __name__ == '__main__': urls = [ 'http://www.cnblogs.com/linhaifeng', 'http://www.python.org', 'http://www.openstack.org' ] pool = ThreadPoolExecutor(2) for url in urls: pool.submit(get, url).add_done_callback(paese) # 回調函數 """ get http://www.cnblogs.com/linhaifeng get http://www.python.org ——————》明顯的等的效果 http://www.cnblogs.com/linhaifeng parse res is 16320 get http://www.openstack.org http://www.python.org parse res is 49014 http://www.openstack.org parse res is 63429 """
多IO問題採用線程池。
由上例能夠分析出異步調用加回調機制使用的場景。
六、線程池優化實現併發的套接字
# -*- coding:utf-8 -*- __author__ = 'Qiushi Huang' """ 原先多線程解決方案的問題是:當客戶端愈來愈多後,線程也會愈來愈多,會帶來服務崩潰的問題。 不該該隨着客戶端數量增長不斷地增長線程,須要基於線程池實現,限制線程數量 """ from socket import * from concurrent.futures import ThreadPoolExecutor def communicate(conn): while True: try: data = conn.recv(1024) # if not data:continue # 這裏卡了好久,須要注意,這種狀況下關閉客戶端,線程池沒有減小 # if data.decode('utf-8') == 'q':break # 測試,這種狀況下,線程池減小,新進程加入進程池 if not data:break conn.send(data.upper()) except ConnectionResetError: break conn.close() def server(ip, port): server = socket(AF_INET, SOCK_STREAM) server.bind((ip, port)) server.listen(5) while True: conn, addr = server.accept() # 建連接 pool.submit(communicate, conn) server.close() if __name__ == '__main__': pool = ThreadPoolExecutor(2) # 通常寫機器可承受的範圍內 server('127.0.0.1', 8092) # 主線程
# -*- coding:utf-8 -*- __author__ = 'Qiushi Huang' from socket import * client = socket(AF_INET, SOCK_STREAM) client.connect(("127.0.0.1", 8092)) while True: msg = input(">>").strip() if not msg:continue client.send(msg.encode("utf-8")) data = client.recv(1024) print(data.decode("utf-8")) client.close()