multiprocess模塊的徹底模仿了threading模塊的接口,兩者在使用層面,有很大的類似性,於是再也不詳細介紹
官網連接:https://docs.python.org/3/library/threading.html?highlight=threading#html
# @Time : 2018/9/10 14:48 # @Author : Jame from threading import Thread import time #方法1 # def task(name): # print('%s is running'%name) # time.sleep(2) # print('%s is done'%name) # # # # if __name__ == '__main__': # t=Thread(target=task,args=('線程1',)) # t.start() #幾乎是線程信號發送的同時,線程就當即開啓了,證實線程的建立開銷遠遠小雨進程 # print('主.....') # #主線程的生命週期就是所在進程的生命週期,進程應該再進程內全部線程都運行完畢後才應該結束
# @Time : 2018/9/10 14:48 # @Author : Jame from threading import Thread import time #方法2 class Mythread(Thread): def run(self): print('%s is running'%self.name) time.sleep(2) print('%s is done'%self.name) #能夠不在main方法下,由於在windows下因此必需要在main纔不報錯。 if __name__ == '__main__': t=Mythread() t.start() print('主')
# @Time : 2018/9/12 9:28 # @Author : Jame from multiprocessing import Process,current_process from threading import Thread,current_thread import time def work1(): print('hello ',current_thread().getName()) def work2(): print('hello',current_process().name) if __name__ == '__main__': #在進程下開啓線程 t=Thread(target=work1) start=time.time() t.start() t.join() print('主線程/主進程:%s,耗時:%s'%(current_thread().getName(),time.time()-start)) ''' hello Thread-1 主線程/主進程:MainThread,耗時:0.0010001659393310547 ''' p=Process(target=work2) start1=time.time() p.start() p.join() print('主線程/主進程:%s,耗時:%s'%(current_process().name,time.time()-start1)) ''' hello Process-1 主線程/主進程:MainProcess,耗時:0.1890106201171875 ''' ''' 總結:由上例子能夠看出,建立線程的消耗比建立進程要少的多,幾乎在10-100倍之間。 由於建立進程須要申請空間,複製父進程等操做,而建立線程則沒有這樣的開銷,只須要啓動一個流水線便可,內存共享該進程資源。 '''
# @Time : 2018/9/12 9:28 # @Author : Jame from multiprocessing import Process,current_process from threading import Thread,current_thread import time import os def work1(): print('子Name:%s,pid:%s '%(current_thread().getName(),os.getpid())) def work2(): print('子Name:%s,pid:%s '%(current_thread().getName(),os.getpid())) if __name__ == '__main__': #在進程下開啓線程 t=Thread(target=work1) start=time.time() t.start() t.join() print('主線程/主進程:%s,耗時:%s,主pid:%s'%(current_thread().getName(),time.time()-start,os.getpid())) p=Process(target=work2) start1=time.time() p.start() p.join() print('主線程/主進程:%s,耗時:%s,主pid:%s'%(current_process().name,time.time()-start1,os.getpid())) ''' 子Name:Thread-1,pid:7340 主線程/主進程:MainThread,耗時:0.0009999275207519531,主pid:7340 ''' ''' 子Name:MainThread,pid:7892 主線程/主進程:MainProcess,耗時:0.14300823211669922,主pid:7340 '''
# @Time : 2018/9/10 16:29 # @Author : Jame from threading import Thread,current_thread from multiprocessing import Process import time #1.join() # def task(name): # print('%s is running'%name) # time.sleep(2) # print('%s is done'%name) # # # # if __name__ == '__main__': # t=Thread(target=task,args=('線程1',)) # t.start() # t.join() #主線程等着線程task結束後,才運行主...... # print('主.....') #2.線程和進程內的的數據是共享內存仍是數據隔離的呢? # n=100 # # def task(): # global n # n=0 # print('子',n) # # # if __name__ == '__main__': # t=Thread(target=task) # #t=Process(target=task) # # t.start() # t.join() # print('主',n) ''' 輸出: 線程:0,修改爲功,說明線程內共享同一內存空間,因此線程是數據不安全的。 進程:100,修改失敗,由於每一個子進程複製了主進程數據,而且進程間隔離,因此進程是數據較安全的。 '''
總結:python
1.建立線程的開銷遠小於建立進程。mysql
2.一個進程建立,至少會有一個主線程/主進程存在。sql
3.子線程共享建立它的主進程內資源,且各子線程之間能夠互相通訊。子進程複製主進程內存狀態,且各子進程之間相互隔離,進程間通訊困難。編程
1.用多線程來改造socket實現多併發效果json
# @Time : 2018/9/12 9:53 # @Author : Jame import multiprocessing import threading import socket server=socket.socket(socket.AF_INET,socket.SOCK_STREAM) server.bind(('127.0.0.1',8080)) server.listen(2) print('server start.....') def action(conn): while True: data=conn.recv(1024) print('client send:',data) conn.send(data.upper()) if __name__ == '__main__': while True: conn,addr=server.accept() p=threading.Thread(target=action,args=(conn,)) p.start()
# @Time : 2018/9/12 9:53 # @Author : Jame import socket client=socket.socket(socket.AF_INET,socket.SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('Please input:').strip() if not msg:continue client.send(msg.encode('utf-8')) data=client.recv(1024) print(data.decode('utf-8'))
# @Time : 2018/9/12 9:53 # @Author : Jame import socket client=socket.socket(socket.AF_INET,socket.SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('Please input:').strip() if not msg:continue client.send(msg.encode('utf-8')) data=client.recv(1024) print(data.decode('utf-8'))
2.三個任務,一個接收用戶輸入,一個將用戶輸入的內容格式化成大寫,一個將格式化後的結果存入文件windows
# @Time : 2018/9/12 10:05 # @Author : Jame from threading import Thread msg_l=[] format_l=[] #1.接受用戶輸入 def talk(): while True: msg=input('Please input:').strip() if not msg:continue msg_l.append(msg) #2.將輸入的轉換成大寫 def format_msg(): while True: if msg_l: res=msg_l.pop() format_l.append(res.upper()) #3.將轉換後的大寫存入文件中去 def save(): while True: if format_l: with open('db3.txt','a',encoding='utf-8') as f: res=format_l.pop() f.write('%s\n'%res) if __name__ == '__main__': t1=Thread(target=talk) t2=Thread(target=format_msg) t3=Thread(target=save) t1.start() t2.start() t3.start() ''' Please input:abc Please input:nihap Please input:woshishui Please input:wocongnalilai Please input:woyao dao nali qu 查看:db3.txt會發現裏面所有是大寫的存儲了 '''
1.thread實例對象的方法:t=Thread(target=work)
# isAlive(): 返回線程是否活動的。
#getName(): 返回線程名。
#setName(): 設置線程名。
#join():等待該子線程對象執行完畢。
2. threading模塊提供的一些方法:
# threading.currentThread(): 返回當前的線程變量。
# threading.enumerate(): 返回一個包含正在運行的線程的list。正在運行指線程啓動後、結束前,不包括啓動前和終止後的線程。
# threading.activeCount(): 返回正在運行的線程數量,與len(threading.enumerate())有相同的結果。安全
3.實例演示服務器
# @Time : 2018/9/10 16:29 # @Author : Jame from threading import Thread,current_thread import threading def task(): print('%s is running'%current_thread().name) time.sleep(2) print('%s is done'%current_thread().name) if __name__ == '__main__': t=Thread(target=task,name=('線程1')) t.start() print(t.is_alive(),t.getName()) t.join() print('主Name:%s'%(current_thread().name)) print(threading.activeCount) #<function active_count at 0x00000000029F57B8> print(threading.enumerate()) #[<_MainThread(MainThread, started 6460)>] print(threading.currentThread) #<function current_thread at 0x0000000002963620> ''' 線程1 is running True 線程1 線程1 is done 主Name:MainThread <function active_count at 0x00000000029F57B8> [<_MainThread(MainThread, started 3540)>] <function current_thread at 0x00000000029F3620> '''
1.守護線程和守護進程的概念詳解多線程
不管是進程仍是線程,都遵循主xx運行完畢,守護xx 隨着銷燬。
須要強調的是:運行完畢,並非運行被終止。
對於主進程來講,守護進程隨着主進程代碼結束,守護進程隨之銷燬。
對主線程來講,主線程要等所在進程內全部非守護線程運行完畢,主線程纔算運行完畢,這時候守護線程纔會隨之銷燬。
守護線程/進程詳解:
1.主進程是在其代碼結束後就算已經運行結束(守護進程隨之回收),而後主進程會一直等非守護的子進程都運行完畢後,纔回收子進程資源(不然會產生殭屍進程),最後才結束。
2.主線程在其餘非守護線程運行完畢纔算運行完畢(守護線程隨之回收)。由於主線程的結束意味着所在進程的結束,進程總體的資源都將被回收,而進程又必須保證非守護線程都運行完畢才能結束。
2.實例演示
# @Time : 2018/9/10 16:41 # @Author : Jame from threading import Thread,current_thread import time #1。守護線程隨着主線程運行完畢後,就即刻銷燬結束! # def task(name): # print('%s is running'%name) # time.sleep(2) # print('%s is done'%name) # # # if __name__ == '__main__': # t=Thread(target=task,args=('線程1')) # # t.daemon=Thread #守護線程,必定要在線程的start()方法開啓以前進行開啓操做纔有意義! # # t.start() # print('主--->',current_thread().name) ''' 主---> MainThread #這裏看主打印出來後,線程task沒有來得及開啓就跟隨主線程一塊兒銷燬了 '''
#2.守護線程會在該進程內的全部非守護線程都運行完畢後才結束銷燬 def foo(): print(123) time.sleep(1) print('end123') def bar(): print(456) time.sleep(3) print('end456') if __name__ == '__main__': t1=Thread(target=foo) t2=Thread(target=bar) t1.daemon=True #守護線程 t1.start() t2.start() #非守護線程的子線程 print('main---------->>') ''' 123 456 main---------->> #這裏能夠看到,主線程執行完畢後foo 並無隨之銷燬,而是等非守護線程bar執行完畢後才銷燬。 end123 end456 思考:守護線程與守護進程的不一樣? 守護進程隨着主進程代碼執行完畢,隨之銷燬,守護進程沒有等待其餘非守護子進程執行完畢,主進程等待其餘非守護子進程執行完畢才結束。 守護線程隨着主線程代碼執行完畢,若是有其餘非守護子線程還沒有執行完畢,守護線程要等待其執行完畢後銷燬,而後主線程結束意味着進程結束。 '''
請參考Egon:http://www.cnblogs.com/linhaifeng/articles/7449853.html
1.注意
1).線程搶的是的GIL鎖,GIL鎖至關於拿到了執行權限(院子大門鑰匙),拿到執行權限後才能拿到互斥鎖Lock(院子裏面的防盜門鑰匙),其餘線程也能夠搶到GIL。可是若是發現Lock任然沒有被釋放則阻塞,即使是拿到執行權限GIL也要馬上交出來。
2).join 是等待全部,便是總體串行,而鎖只是鎖住了修改共享數據的部分,便是部分串行。想要保證數據安全的根本原理在於讓併發變成串行,join與互斥鎖均可以實現,毫無疑問,互斥鎖的部分串行效率更高點。
3).必定要看下本小節GIL與互斥鎖的經典分析。
2.GIL VS Lock
1).Python已經有一個GIL來保證同一時間只能有一個線程來執行了,爲何這裏還須要lock ?
首先咱們須要達成共識:鎖的目的是爲了保護共享的數據,同一時間只能有一個線程來修改共享的數據
而後,咱們能夠得出結論:保護不一樣的數據就應該加不一樣的鎖。
最後,問題就很明朗了,GIL與Lock是兩把鎖,保護的數據不同,前者是解釋器級別的(固然保護的就是解釋器級別的數據,好比垃圾回收的數據)
後者是保護用戶本身開發的應用程序的數據,很明顯GIL不負責這件事,只能用戶自定義加鎖處理,即Lock。
2).分析過程:全部線程搶的是GIL鎖,或者說全部線程先搶的執行權限
線程1搶到GIL鎖,拿到執行權限,開始執行,而後加了一把Lock,尚未執行完畢,即線程1還未釋放Lock,有可能線程2搶到GIL鎖,開始執行,執行過程當中發現Lock尚未被線程1釋放,因而線程2進入阻塞,被奪走執行權限,有可能線程1拿到GIL,而後 正常執行到釋放Lock。。。這就致使了串行運行的效果
既然是串行,那咱們執行
t1.start()
t1.join
t2.start()
t2.join()
這也是串行執行,爲什麼還要加Lock呢,需知join是等待t1全部的代碼執行完,至關於鎖住了t1的全部代碼,而Lock只是鎖住一部分操做共享數據的代碼。
由於Python解釋器幫你自動按期進行內存回收,你能夠理解爲python解釋器裏有一個獨立的線程,每過一段時間它起wake up作一次全局輪詢看看哪些內存數據是能夠被清空的,此時你本身的程序 裏的線程和 py解釋器本身的線程是併發運行的,假設你的線程刪除了一個變量,py解釋器的垃圾回收線程在清空這個變量的過程當中的clearing時刻,可能一個其它線程正好又從新給這個還沒來及得清空的內存空間賦值了,結果就有可能新賦值的數據被刪除了,爲了解決相似的問題,python解釋器簡單粗暴的加了鎖,即當一個線程運行時,其它人都不能動,這樣就解決了上述的問題, 這能夠說是Python早期版本的遺留問題。
3).實例演示
from threading import Thread import os,time def work(): global n temp=n time.sleep(0.1) n=temp-1 if __name__ == '__main__': n=100 l=[] for i in range(100): p=Thread(target=work) l.append(p) p.start() for p in l: p.join() print(n) #結果可能爲99
from threading import Thread,Lock import os,time def work(): global n lock.acquire() temp=n time.sleep(0.1) n=temp-1 lock.release() if __name__ == '__main__': lock=Lock() n=100 l=[] for i in range(100): p=Thread(target=work) l.append(p) p.start() for p in l: p.join() print(n) #結果確定爲0,由原來的併發執行變成串行,犧牲了執行效率保證了數據安全
3.GIL與Lock綜合分析(重點理解)
分析:
#1.100個線程去搶GIL鎖,即搶執行權限
#2. 確定有一個線程先搶到GIL(暫且稱爲線程1),而後開始執行,一旦執行就會拿到lock.acquire()
#3. 極有可能線程1還未運行完畢,就有另一個線程2搶到GIL,而後開始運行,但線程2發現互斥鎖lock還未被線程1釋放,因而阻塞,被迫交出執行權限,即釋放GIL
#4.直到線程1從新搶到GIL,開始從上次暫停的位置繼續執行,直到正常釋放互斥鎖lock,而後其餘的線程再重複2 3 4的過程
4.互斥鎖與join的區別(重點理解)
# @Time : 2018/9/11 9:27 # @Author : Jame from threading import Thread import os,time def work(): global n temp=n time.sleep(0.1) n=temp-1 print('子線程:',n) if __name__ == '__main__': n=100 l=[] for i in range(100): p=Thread(target=work) l.append(p) p.start() for p in l: p.join() print('-----主線程:',n) ''' 子線程: 99 子線程: 99 子線程: 99 子線程: 99 -----主線程: 99 由於線程速度太快了,幾乎都是同時進行修改了. '''
# @Time : 2018/9/11 9:32 # @Author : Jame from threading import Thread,Lock,current_thread import os,time def work(lock): global n lock.acquire() temp=n time.sleep(0.1) n=temp-1 print('子線程:%s,%s'%(current_thread().name,n)) lock.release() if __name__ == '__main__': lock = Lock() n=100 l=[] for i in range(100): p=Thread(target=work,args=(lock,)) l.append(p) p.start() for p in l: p.join() print('---->>主',n) #結果確定爲0,由原來的併發執行變成了竄行,犧牲了執行效率保證了數據安全。 ''' 子線程99 . . . 子線程:1 子線程:0 ---->>主 0 分析:1.100個線程去搶GIL鎖,即搶執行權限 2.確定有一個線程先搶到GIL(暫且稱爲線程1),而後開始執行,一旦執行就會拿到lock.acquire() 3.既有可能線程1 尚未運行完畢,就會有另一個線程x搶到了GIL,而後開始運行,可是線程x 發現互斥鎖lock還未被線程1釋放,因而阻塞 ,被迫交出執行權限,即釋放GIL 4.知道線程1 從新搶到GIL,開始上次暫停的位置繼續執行,知道正常釋放互斥鎖lock,而後其餘線程再重複2 3 4 的過程。 '''
# @Time : 2018/9/11 10:10 # @Author : Jame from threading import Thread,current_thread,Lock import os,time def task(lock): #未加鎖的代碼併發運行 time.sleep(1) print('%s start to running'%current_thread().getName()) global n #加鎖運行 lock.acquire() temp=n time.sleep(0.1) n=temp-1 print('子線程:%s n=%s' % (current_thread().name, n)) lock.release() if __name__ == '__main__': n=100 lock=Lock() threads=[] start=time.time() for i in range(100): t=Thread(target=task,args=(lock,)) threads.append(t) t.start() for t in threads: t.join() stop=time.time() print('主:%s,n=%s,耗時:%s'%(current_thread().name,n,stop-start)) ''' . . . . 子線程:Thread-97 n=3 子線程:Thread-98 n=2 子線程:Thread-99 n=1 子線程:Thread-100 n=0 主:MainThread,n=0,耗時:11.002629518508911 既然加鎖會讓程序變成串行,那麼在start運行後馬上使用join,就不是至關於串行了嗎,爲什麼還要lock加鎖呢? 總結:由於start後馬上join會使代碼總體串行,而lock加鎖的部分只是修改共享數據的部分的那一刻是串行的, #保證了數據的一致性和安全性,同時其餘部分又是併發的相對於單純使用Join提升了併發的效果。 ''' #串行的join # def task(): # time.sleep(1) # print('%s start to running'%current_thread().getName()) # global n # temp=n # time.sleep(0.1) # n=temp-1 # print('子線程:%s n=%s' % (current_thread().name, n)) # # # if __name__ == '__main__': # n=100 # start=time.time() # for i in range(100): # t=Thread(target=task) # t.start() # t.join() # # stop=time.time() # # print('主:%s,n=%s,耗時:%s'%(current_thread().name,n,stop-start)) ''' ......... 子線程:Thread-97 n=3 Thread-98 start to running 子線程:Thread-98 n=2 Thread-99 start to running 子線程:Thread-99 n=1 Thread-100 start to running 子線程:Thread-100 n=0 主:MainThread,n=0,耗時:110.0262930393219 總結:每一個線程都是串行join執行,耗時比加鎖lock的併發效果高不少, #因此局部加鎖並實現併發效果就很重要,既保證了數據的安全性又比徹底串行耗時小不少。 '''
1.所謂死鎖
是指兩個或兩個以上的進程或線程在執行過程當中,因爭奪資源而形成的一種互相等待的現象,若無外力做用,它們都將沒法推動下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱爲死鎖進程,以下就是死鎖
# @Time : 2018/9/11 8:59 # @Author : Jame from threading import Thread,Lock,RLock import time mutexA=mutexB=Lock() #死鎖,線程卡死在第一個鎖的時候 #mutexA=mutexB=RLock() #遞歸鎖,不會產生死鎖 class Mythread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print('%s 搶到了A鎖'%self.name) mutexB.acquire() print('%s 搶到了B鎖'%self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print('%s 搶到了B鎖'%self.name) mutexA.acquire() print('%s 搶到了A鎖'%self.name) mutexA.release() mutexB.release() if __name__ == '__main__': for i in range(10): t=Mythread() t.start() ''' Thread-1 搶到了A鎖 ...卡死中 '''
2.解決死鎖問題
死鎖的解決方法是遞歸鎖,在python只呢個爲了製成同一個線程中屢次請求同一個資源,python提供了可重入鎖Rlock
Rlock內部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,從而使用資源能夠被屢次requre。
直到一個線程全部的acquire被release,其餘線程才能得到資源。
若是上面的例子使用RLock代替Lock,則不會發生死鎖:
mutexA=mutexB=threading.RLock() #一個線程拿到鎖,counter加1,該線程內又碰到加鎖的狀況,則counter繼續加1,這期間全部其餘線程都只能等待,等待該線程釋放全部鎖,即counter遞減到0爲止
1.信號量原理概念
Semaphore管理一個內置的計數器,
每當調用acquire()時內置計數器-1;
調用release() 時內置計數器+1;
計數器不能小於0;當計數器爲0時,acquire()將阻塞線程直到其餘線程調用release()。
2.實例:(同時只有5個線程能夠得到semaphore,便可以限制最大鏈接數爲5):
# @Time : 2018/9/11 10:51 # @Author : Jame from threading import Thread,Semaphore import threading import time def func(sm): sm.acquire() print('%s get sm'%threading.current_thread().getName()) time.sleep(1) print('-------------->>>') sm.release() if __name__ == '__main__': sm=Semaphore(5) for i in range(20): t=Thread(target=func,args=(sm,)) t.start() ''' Thread-1 get sm Thread-2 get sm Thread-3 get sm Thread-4 get sm Thread-5 get sm -------------->>> -------------->>> Thread-6 get sm Thread-7 get sm -------------->>> -------------->>> Thread-9 get sm -------------->>> Thread-8 get sm Thread-10 get sm -------------->>> Thread-11 get sm -------------->>> Thread-12 get sm -------------->>> -------------->>> Thread-14 get sm -------------->>> Thread-13 get sm Thread-15 get sm -------------->>> -------------->>> Thread-16 get sm Thread-17 get sm -------------->>> -------------->>> Thread-19 get sm Thread-18 get sm -------------->>> Thread-20 get sm -------------->>> -------------->>> -------------->>> -------------->>> -------------->>> 總結:信號量相似廁所門口的鑰匙,掛了5把,同一時間只能5我的進行上廁所,當裏面出來掛上鑰匙後,才能進去。 因此互斥鎖,是信號量的特殊狀況n=1。可是mutex互斥鎖較爲簡單,且高效,因此在必須保障資源獨佔的狀況下,仍是採用該設計方式。 '''
3.信號量與進程池的區別
與進程池是徹底不一樣的概念,進程池Pool(4),最大隻能產生4個進程,並且從頭至尾都只是這四個進程,不會產生新的,而信號量是產生一堆線程/進程
信號量與互斥鎖推薦看博客:http://url.cn/5DMsS9r
1.Event原理概念
同進程的同樣,線程的一個關鍵特性是每一個線程都是獨立運行且狀態不可預測。
若是程序中的其 ,他線程須要經過判斷某個線程的狀態來肯定本身下一步的操做,這時線程同步問題就會變得很是棘手。
爲了解決這些問題,咱們須要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它容許線程等待某些事件的發生。
在 初始狀況下,Event對象中的信號標誌被設置爲假。若是有線程等待一個Event對象, 而這個Event對象的標誌爲假,那麼這個線程將會被一直阻塞直至該標誌爲真。
一個線程若是將一個Event對象的信號標誌設置爲真,它將喚醒全部等待這個Event對象的線程。若是一個線程等待一個已經被設置爲真的Event對象,那麼它將忽略這個事件, 繼續執行。
event.isSet():返回event的狀態值; event.wait():若是 event.isSet()==False將阻塞線程; event.set(): 設置event的狀態值爲True,全部阻塞池的線程激活進入就緒狀態, 等待操做系統調度; event.clear():恢復event的狀態值爲False。
2.Event應用
例如,有多個工做線程嘗試連接MySQL,咱們想要在連接前確保MySQL服務正常才讓那些工做線程去鏈接MySQL服務器,若是鏈接不成功,都會去嘗試從新鏈接。
那麼咱們就能夠採用threading.Event機制來協調各個工做線程的鏈接操做:
# @Time : 2018/9/11 14:31 # @Author : Jame from threading import Thread,Event import threading import time,random def conn_mysql(): count=1 while not event.is_set(): if count>3: raise TimeoutError('鏈接超時') print('<%s> 第%s 次嘗試鏈接'%(threading.current_thread().getName(),count)) event.wait(0.5) count+=1 print('<%s> 鏈接成功!'%threading.current_thread().getName()) def check_mysql(): print('\033[45m[%s] 正在檢查mysql\033[0m'%threading.current_thread().getName()) time.sleep(random.randint(2,4)) event.set() if __name__ == '__main__': event=Event() conn1=Thread(target=conn_mysql) conn2=Thread(target=conn_mysql) check=Thread(target=check_mysql) conn1.start() conn2.start() check.start()
使得線程等待,只有知足某條件時,才釋放n個線程
# @Time : 2018/9/12 15:03 # @Author : Jame import threading def condition_func(): ret = False inp = input('>>>') if inp == '1': ret = True return ret def run(n): con.acquire() con.wait_for(condition_func) print("run the thread: %s" %n) con.release() if __name__ == '__main__': con = threading.Condition() for i in range(5): t = threading.Thread(target=run, args=(i,)) t.start() ''' >>>1 run the thread: 0 >>>1 run the thread: 1 >>>1 run the thread: 2 >>>1 run the thread: 3 >>>1 run the thread: 4 '''
定時器,指定n秒後執行某操做
# @Time : 2018/9/12 15:17 # @Author : Jame from threading import Timer def hello(): print("hello, world") t = Timer(3, hello) t.start() # after 3 seconds, "hello, world" will be printed
from threading import Timer import random,time class Code: def __init__(self): self.make_cache() def make_cache(self,interval=5): self.cache=self.make_code() print(self.cache) self.t=Timer(interval,self.make_cache) self.t.start() def make_code(self,n=4): res='' for i in range(n): s1=str(random.randint(0,9)) s2=chr(random.randint(65,90)) res+=random.choice([s1,s2]) return res def check(self): while True: inp=input('>>: ').strip() if inp.upper() == self.cache: print('驗證成功',end='\n') self.t.cancel() break if __name__ == '__main__': obj=Code() obj.check()
queue隊列 :使用import queue,用法與進程Queue同樣
queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.
queue.
Queue
(maxsize=0) #先進先出
mport queue q=queue.Queue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 結果(先進先出): first second third '''
2.class queue.
LifoQueue
(maxsize=0) #last in fisrt out
# @Time : 2018/9/12 15:33 # @Author : Jame import queue q=queue.LifoQueue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 結果(後進先出): third second first '''
queue.
PriorityQueue
(maxsize=0) #存儲數據時可設置優先級的隊列,數字越小優先級越高
import queue q=queue.PriorityQueue() #put進入一個元組,元組的第一個元素是優先級(一般是數字,也能夠是非數字之間的比較),數字越小優先級越高 q.put((20,'a')) q.put((10,'b')) q.put((30,'c')) print(q.get()) print(q.get()) print(q.get()) ''' 結果(數字越小優先級越高,優先級高的優先出隊): (10, 'b') (20, 'a') (30, 'c') '''
4.其餘方法
Constructor for a priority queue. maxsize is an integer that sets the upperbound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite. The lowest valued entries are retrieved first (the lowest valued entry is the one returned by sorted(list(entries))[0]). A typical pattern for entries is a tuple in the form: (priority_number, data). exception queue.Empty Exception raised when non-blocking get() (or get_nowait()) is called on a Queue object which is empty. exception queue.Full Exception raised when non-blocking put() (or put_nowait()) is called on a Queue object which is full. Queue.qsize() Queue.empty() #return True if empty Queue.full() # return True if full Queue.put(item, block=True, timeout=None) Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case). Queue.put_nowait(item) Equivalent to put(item, False). Queue.get(block=True, timeout=None) Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case). Queue.get_nowait() Equivalent to get(False). Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads. Queue.task_done() Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete. If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue). Raises a ValueError if called more times than there were items placed in the queue. Queue.join() block直到queue被消費完畢
1.官方文檔參考:https://docs.python.org/dev/library/concurrent.futures.html
2.ProcessPoolExecutor 基本用法
# @Time : 2018/9/12 15:49 # @Author : Jame from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os import time import random def task(n): print('%s is running'%os.getpid()) time.sleep(random.randint(1,3)) print('子---->>>') return n**2 if __name__ == '__main__': executor=ProcessPoolExecutor(max_workers=3) #每次運行最多3個 futures=[] for i in range(10): future=executor.submit(task,i) futures.append(future) executor.shutdown() print('+++++>>>>>>>>>>>>') for future in futures: print(future.result()) ''' 9148 is running 7508 is running 8652 is running 子---->>> 8652 is running 子---->>> 7508 is running 子---->>> 9148 is running 子---->>> 8652 is running 子---->>> 9148 is running 子---->>> 7508 is running 子---->>> 8652 is running 子---->>> 子---->>> 子---->>> +++++>>>>>>>>>>>> 0 1 4 9 16 25 36 49 64 81 '''
3.ThreadPoolExecutor 基本用法
#介紹 ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously. class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='') An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously. Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor. New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging. #用法 與ProcessPoolExecutor相同
4.map的用法
9012 is running 9012 is running 9012 is running 子----->>> 9012 is running 子----->>> 9012 is running 子----->>> 子----->>> 9012 is running 9012 is running 子----->>> 9012 is running 子----->>> 9012 is running 子----->>> 子----->>> 子----->>>
5.回調函數
# @Time : 2018/9/12 16:04 # @Author : Jame from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor from multiprocessing import pool import requests import json import os def get_page(url): print('進程%s get %s '%(os.getpid(),url)) respone=requests.get(url) if respone.status_code==200: return {'url':url,'text':respone.text} def parse_page(res): res=res.result() print('進程:%s parse %s'%(os.getpid(),res['url'])) parse_res='url:%s sieze:%s'%(res['url'],len(res['text'])) with open('db4.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.taobao.com', 'https://www.jd.com', 'https://www.python.org', 'https://www.sina.com.cn' ] p=ProcessPoolExecutor(3) for url in urls: p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一個future對象obj,須要調用obj.result()拿到結果。 ''' 等價於如下異步調用操做 p=Pool(3) for url in urls: p.apply_async(get_page,args=(url,),callback=parse_page) p.close() p.join() ''' ''' 進程7804 get https://www.baidu.com 進程4116 get https://www.taobao.com 進程8196 get https://www.jd.com 進程4116 get https://www.python.org 進程:8884 parse https://www.taobao.com 進程8196 get https://www.sina.com.cn 進程:8884 parse https://www.jd.com 進程:8884 parse https://www.baidu.com 進程:8884 parse https://www.sina.com.cn 進程:8884 parse https://www.python.org '''